diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestConnectionClosureRace.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestConnectionClosureRace.java new file mode 100644 index 000000000..e579febf7 --- /dev/null +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestConnectionClosureRace.java @@ -0,0 +1,448 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.testing.async; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.config.ConnectionConfig; +import org.apache.hc.client5.http.config.TlsConfig; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy; +import org.apache.hc.client5.testing.SSLTestContexts; +import org.apache.hc.core5.http.ConnectionClosedException; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.RequestNotExecutedException; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.ssl.TLS; +import org.apache.hc.core5.http2.H2Error; +import org.apache.hc.core5.http2.H2PseudoResponseHeaders; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.config.H2Param; +import org.apache.hc.core5.http2.config.H2Setting; +import org.apache.hc.core5.http2.frame.DefaultFrameFactory; +import org.apache.hc.core5.http2.frame.FrameType; +import org.apache.hc.core5.http2.hpack.HPackEncoder; +import org.apache.hc.core5.http2.impl.io.FrameInputBuffer; +import org.apache.hc.core5.http2.impl.io.FrameOutputBuffer; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.ByteArrayBuffer; +import org.apache.hc.core5.util.TimeValue; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.MethodOrderer.OrderAnnotation; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import javax.net.ServerSocketFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLServerSocketFactory; +import javax.net.ssl.SSLSocket; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hc.core5.http2.HttpVersionPolicy.FORCE_HTTP_1; +import static org.apache.hc.core5.http2.HttpVersionPolicy.FORCE_HTTP_2; +import static org.apache.hc.core5.util.TimeValue.MAX_VALUE; +import static org.apache.hc.core5.util.TimeValue.NEG_ONE_MILLISECOND; +import static org.apache.hc.core5.util.TimeValue.ZERO_MILLISECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * This test exercises a race condition between client connection reuse and server-initiated connection closure. The + * test matrix consists of two protocols (HTTP/1.1 and HTTP/2), two connection layers (TCP and TLS 1.2), and two, + * settings for {@link ConnectionConfig#getValidateAfterInactivity}: -1 ms (never validate) and 0 ms (always validate). + *

+ * The tests work by sending simple ping requests to a test server over a single connection. Requests are batched in + * various ways and sent at various rates. The test server closes the connection immediately upon sending a response. + * HTTP/1.1 connections are closed silently (without any {@code Connection: close} header), in order to simulate + * connection closure caused by idleness or by the network itself. HTTP/2 connections are closed with a {@code GOAWAY} + * frame. Logically, a TCP {@code FIN} on HTTP/1.1 and a {@code GOAWAY} frame on HTTP/2 both indicate that no additional + * requests should be sent on the connection, and this test case investigates under what circumstances we can handle + * that signal before leasing the connection to a new request. + *

+ * The only thing the test actually asserts is that the requests don't time out; the client should never hang due to + * connection closure. Any additional assertions (such as minimum success rates) would be difficult to make reliably, + * since server-initiated connection closure is inherently prone to race conditions, even when testing with a single + * process talking to itself over localhost. However, each test case prints statistics showing the request success rate + * before and after enabling connection validation. This shows the effectiveness of our inactive connection validation + * strategy in mitigating races within the client. + */ +@TestMethodOrder(OrderAnnotation.class) +abstract class AbstractTestConnectionClosureRace { + final AtomicInteger connectionsEstablished = new AtomicInteger(0); + final String scheme; + final HttpVersionPolicy httpVersionPolicy; + + volatile ServerSocket serverSocket; + volatile int port; + volatile Thread serverThread; + + AbstractTestConnectionClosureRace(final String scheme, final HttpVersionPolicy httpVersionPolicy) { + this.scheme = scheme; + this.httpVersionPolicy = httpVersionPolicy; + } + + @BeforeAll + static void newline() { + System.out.println(); + } + + @BeforeEach + void setup() throws Exception { + if ("http".equals(scheme)) { + serverSocket = ServerSocketFactory.getDefault().createServerSocket(0); + } else { + final SSLContext serverSSLContext = SSLTestContexts.createServerSSLContext(); + final SSLServerSocketFactory sslServerSocketFactory = serverSSLContext.getServerSocketFactory(); + serverSocket = sslServerSocketFactory.createServerSocket(0); + } + port = serverSocket.getLocalPort(); + + serverThread = new Thread(this::runServer); + serverThread.setDaemon(true); + serverThread.start(); + } + + @AfterEach + void tearDown() throws Exception { + if (serverSocket != null) { + serverSocket.close(); + } + if (serverThread != null) { + serverThread.interrupt(); + serverThread.join(1000); + } + } + + @Test + @Timeout(5) + @Order(1) + void smokeTest() throws Exception { + try (final CloseableHttpAsyncClient client = asyncClient(false)) { + final SimpleHttpResponse simpleHttpResponse = sendPing(client).get(); + assertEquals(200, simpleHttpResponse.getCode()); + } + } + + @ParameterizedTest(name = "Validation: {0}") + @ValueSource(booleans = { false, true }) + @Timeout(5) + @Order(2) + void testSlowSequentialRequests(final boolean validateConnections) throws Exception { + try (final CloseableHttpAsyncClient client = asyncClient(validateConnections)) { + final List> futures = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + futures.add(sendPing(client)); + Thread.sleep(25); + } + + checkResults(getValidationPrefix(validateConnections) + "Sequential requests (slow)", futures); + } + } + + @ParameterizedTest(name = "Validation: {0}") + @ValueSource(booleans = { false, true }) + @Timeout(10) + @Order(3) + void testRapidSequentialRequests(final boolean validateConnections) throws Exception { + try (final CloseableHttpAsyncClient client = asyncClient(validateConnections)) { + final List> futures = new ArrayList<>(); + for (int i = 0; i < 2500; i++) { + final Future f = sendPing(client); + try { + f.get(); + } catch (final ExecutionException ignore) { + } + futures.add(f); + } + + checkResults(getValidationPrefix(validateConnections) + "Sequential requests (rapid)", futures); + } + } + + @ParameterizedTest(name = "Validation: {0}") + @ValueSource(booleans = { false, true }) + @Timeout(5) + @Order(4) + void testOneLargeBatchOfRequests(final boolean validateConnections) throws Exception { + try (final CloseableHttpAsyncClient client = asyncClient(validateConnections)) { + final List> futures = new ArrayList<>(); + for (int i = 0; i < 30; i++) { + futures.add(sendPing(client)); + } + + checkResults(getValidationPrefix(validateConnections) + "Single large batch", futures); + } + } + + @ParameterizedTest(name = "Validation: {0}") + @ValueSource(booleans = { false, true }) + @Timeout(5) + @Order(5) + void testSpacedOutBatchesOfRequests(final boolean validateConnections) throws Exception { + try (final CloseableHttpAsyncClient client = asyncClient(validateConnections)) { + final List> futures = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + for (int j = 0; j < 3; j++) { + futures.add(sendPing(client)); + } + Thread.sleep(25); + } + + checkResults(getValidationPrefix(validateConnections) + "Multiple small batches", futures); + } + } + + private static String getValidationPrefix(final boolean validateConnections) { + if (validateConnections) { + return "Validation enabled: "; + } else { + return "Validation disabled: "; + } + } + + private void checkResults(final String name, final List> futures) { + int ok = 0, error = 0, notExecuted = 0, closed = 0, reset = 0, other = 0; + for (final Future future : futures) { + try { + future.get(); + ok++; + } catch (final Exception ex) { + error++; + if (ex.getCause() instanceof RequestNotExecutedException) { + notExecuted++; + } else if (ex.getCause() instanceof ConnectionClosedException) { + closed++; + } else if (ex.getCause() instanceof SocketException && ex.getCause().getMessage().contains("reset")) { + reset++; + } else { + other++; + } + } + } + + if (error > 0) { + System.out.printf("%s: %s: %,d succeeded; %,d failed (%.2f%% success rate, %.2f%% retriable)%n", + getClass().getSimpleName().toLowerCase(), name, ok, error, + (double) ok / (ok + error) * 100d, + (double) notExecuted / (ok + error) * 100d); + } else { + System.out.printf("%s: %s: %,d succeeded; %,d failed (%.2f%% success rate)%n", + getClass().getSimpleName().toLowerCase(), name, ok, error, (((double) ok) / (ok + error)) * 100d); + } + if (false) { + System.out.printf(" %,d not executed, %,d closed, %,d reset, %,d other%n", notExecuted, closed, reset, + other); + } + } + + private Future sendPing(final CloseableHttpAsyncClient client) { + final HttpHost target = new HttpHost(scheme, "localhost", port); + final SimpleHttpRequest request = SimpleRequestBuilder.get().setHttpHost(target).setPath("/ping").build(); + + return client.execute(request, null); + } + + private void runServer() { + try { + while (!Thread.currentThread().isInterrupted() && !serverSocket.isClosed()) { + final Socket socket = serverSocket.accept(); + connectionsEstablished.incrementAndGet(); + handleConnection(socket); + } + } catch (final IOException e) { + if (!Thread.currentThread().isInterrupted() && !serverSocket.isClosed()) { + System.err.println("Server error: " + e.getClass() + e.getMessage()); + } + } + } + + private void handleConnection(final Socket socket) throws IOException { + try { + if (socket instanceof SSLSocket) { + ((SSLSocket) socket).startHandshake(); + } + + final InputStream inputStream = socket.getInputStream(); + final OutputStream outputStream = socket.getOutputStream(); + + if (httpVersionPolicy == FORCE_HTTP_2) { + sendHttp2Response(inputStream, outputStream); + } else { + sendHttp1Response(inputStream, outputStream); + } + outputStream.flush(); + socket.close(); + } catch (final SocketException ignore) { + // Connection closure was initiated on the server's end + } catch (final IOException ex) { + if (ex.getMessage() != null && ex.getMessage().startsWith("Connection reset")) { + System.err.println("Server saw connection closed by client"); + return; + } + throw ex; + } + } + + private static void sendHttp1Response(final InputStream inputStream, final OutputStream outputStream) throws IOException { + final byte[] buffer = new byte[4096]; + final int bytesRead = inputStream.read(buffer); + if (bytesRead <= 0) { + return; + } + + final String response = "HTTP/1.1 200 OK\r\n" + + "Content-Type: text/plain\r\n" + + "Content-Length: 2\r\n" + + "\r\n" + + "OK"; + + final byte[] responseBytes = response.getBytes(UTF_8); + outputStream.write(responseBytes); + } + + private static void sendHttp2Response(final InputStream inputStream, final OutputStream outputStream) throws IOException { + final FrameOutputBuffer out = new FrameOutputBuffer(8192); + final FrameInputBuffer frameInputBuffer = new FrameInputBuffer(8192); + final DefaultFrameFactory ff = new DefaultFrameFactory(); + + for (int i = 0; i < 24; i++) inputStream.read(); // Read magic + while (!frameInputBuffer.read(inputStream).isType(FrameType.SETTINGS)) { + // Do nothing + } + out.write(ff.createSettingsAck(), outputStream); + + out.write(ff.createSettings( + new H2Setting(H2Param.HEADER_TABLE_SIZE, 8192), + new H2Setting(H2Param.ENABLE_PUSH, 0), + new H2Setting(H2Param.MAX_CONCURRENT_STREAMS, 1), + new H2Setting(H2Param.INITIAL_WINDOW_SIZE, 65535), + new H2Setting(H2Param.MAX_FRAME_SIZE, 65536), + new H2Setting(H2Param.MAX_HEADER_LIST_SIZE, 16 * 1048576) + ), outputStream); + + while (!frameInputBuffer.read(inputStream).isType(FrameType.HEADERS)) { + // Do nothing + } + + final HPackEncoder hPackEncoder = new HPackEncoder(8192, UTF_8); + final ByteArrayBuffer headerBuffer = new ByteArrayBuffer(8192); + hPackEncoder.encodeHeaders(headerBuffer, Arrays.asList( + new BasicHeader(H2PseudoResponseHeaders.STATUS, Integer.toString(200), false), + new BasicHeader("content-type", "text/plain") + ), true); + + out.write(ff.createHeaders(1, ByteBuffer.wrap(headerBuffer.toByteArray()), true, false), outputStream); + out.write(ff.createData(1, ByteBuffer.wrap("OK".getBytes(UTF_8)), true), outputStream); + out.write(ff.createGoAway(1, H2Error.NO_ERROR, null), outputStream); + } + + private CloseableHttpAsyncClient asyncClient(final boolean validateConnections) throws Exception { + final PoolingAsyncClientConnectionManager connManager = PoolingAsyncClientConnectionManagerBuilder.create() + .setTlsStrategy(new DefaultClientTlsStrategy(SSLTestContexts.createClientSSLContext())) + .setDefaultTlsConfig(TlsConfig.custom() + .setSupportedProtocols(TLS.V_1_2) + .setVersionPolicy(httpVersionPolicy) + .build()) + .build(); + connManager.setDefaultConnectionConfig(getConnectionConfig(validateConnections)); + connManager.setDefaultMaxPerRoute(1); + connManager.setMaxTotal(1); + final CloseableHttpAsyncClient client = HttpAsyncClients.custom() + .setIOReactorConfig(IOReactorConfig.custom() + .setSelectInterval(TimeValue.ofMilliseconds(1000)) + .setIoThreadCount(1) + .build()) + .setConnectionManager(connManager) + .disableAutomaticRetries() + .build(); + client.start(); + return client; + } + + private static ConnectionConfig getConnectionConfig(final boolean validateConnections) { + return ConnectionConfig.custom() + .setTimeToLive(MAX_VALUE) + .setValidateAfterInactivity(validateConnections ? ZERO_MILLISECONDS : NEG_ONE_MILLISECOND) + .build(); + } +} + +@Tag("slow") +public class TestConnectionClosureRace { + @Nested + class Http extends AbstractTestConnectionClosureRace { + public Http() { + super("http", FORCE_HTTP_1); + } + } + + @Nested + class Https extends AbstractTestConnectionClosureRace { + public Https() { + super("https", FORCE_HTTP_1); + } + } + + @Nested + class H2c extends AbstractTestConnectionClosureRace { + public H2c() { + super("http", FORCE_HTTP_2); + } + } + + @Nested + class H2 extends AbstractTestConnectionClosureRace { + public H2() { + super("https", FORCE_HTTP_2); + } + } +} diff --git a/pom.xml b/pom.xml index fac588f7b..ea3089a56 100644 --- a/pom.xml +++ b/pom.xml @@ -356,9 +356,38 @@ + + org.apache.maven.plugins + maven-surefire-plugin + ${hc.surefire.version} + + slow + + + + + slow-tests + + + + + org.apache.maven.plugins + maven-surefire-plugin + ${hc.surefire.version} + + slow + + + + + + + + +