diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestConnectionClosureRace.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestConnectionClosureRace.java
new file mode 100644
index 000000000..e579febf7
--- /dev/null
+++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestConnectionClosureRace.java
@@ -0,0 +1,448 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+
+package org.apache.hc.client5.testing.async;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
+import org.apache.hc.client5.http.config.ConnectionConfig;
+import org.apache.hc.client5.http.config.TlsConfig;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
+import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
+import org.apache.hc.client5.testing.SSLTestContexts;
+import org.apache.hc.core5.http.ConnectionClosedException;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.RequestNotExecutedException;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.ssl.TLS;
+import org.apache.hc.core5.http2.H2Error;
+import org.apache.hc.core5.http2.H2PseudoResponseHeaders;
+import org.apache.hc.core5.http2.HttpVersionPolicy;
+import org.apache.hc.core5.http2.config.H2Param;
+import org.apache.hc.core5.http2.config.H2Setting;
+import org.apache.hc.core5.http2.frame.DefaultFrameFactory;
+import org.apache.hc.core5.http2.frame.FrameType;
+import org.apache.hc.core5.http2.hpack.HPackEncoder;
+import org.apache.hc.core5.http2.impl.io.FrameInputBuffer;
+import org.apache.hc.core5.http2.impl.io.FrameOutputBuffer;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.util.ByteArrayBuffer;
+import org.apache.hc.core5.util.TimeValue;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.net.ServerSocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSocket;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hc.core5.http2.HttpVersionPolicy.FORCE_HTTP_1;
+import static org.apache.hc.core5.http2.HttpVersionPolicy.FORCE_HTTP_2;
+import static org.apache.hc.core5.util.TimeValue.MAX_VALUE;
+import static org.apache.hc.core5.util.TimeValue.NEG_ONE_MILLISECOND;
+import static org.apache.hc.core5.util.TimeValue.ZERO_MILLISECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * This test exercises a race condition between client connection reuse and server-initiated connection closure. The
+ * test matrix consists of two protocols (HTTP/1.1 and HTTP/2), two connection layers (TCP and TLS 1.2), and two,
+ * settings for {@link ConnectionConfig#getValidateAfterInactivity}: -1 ms (never validate) and 0 ms (always validate).
+ *
+ * The tests work by sending simple ping requests to a test server over a single connection. Requests are batched in
+ * various ways and sent at various rates. The test server closes the connection immediately upon sending a response.
+ * HTTP/1.1 connections are closed silently (without any {@code Connection: close} header), in order to simulate
+ * connection closure caused by idleness or by the network itself. HTTP/2 connections are closed with a {@code GOAWAY}
+ * frame. Logically, a TCP {@code FIN} on HTTP/1.1 and a {@code GOAWAY} frame on HTTP/2 both indicate that no additional
+ * requests should be sent on the connection, and this test case investigates under what circumstances we can handle
+ * that signal before leasing the connection to a new request.
+ *
+ * The only thing the test actually asserts is that the requests don't time out; the client should never hang due to
+ * connection closure. Any additional assertions (such as minimum success rates) would be difficult to make reliably,
+ * since server-initiated connection closure is inherently prone to race conditions, even when testing with a single
+ * process talking to itself over localhost. However, each test case prints statistics showing the request success rate
+ * before and after enabling connection validation. This shows the effectiveness of our inactive connection validation
+ * strategy in mitigating races within the client.
+ */
+@TestMethodOrder(OrderAnnotation.class)
+abstract class AbstractTestConnectionClosureRace {
+ final AtomicInteger connectionsEstablished = new AtomicInteger(0);
+ final String scheme;
+ final HttpVersionPolicy httpVersionPolicy;
+
+ volatile ServerSocket serverSocket;
+ volatile int port;
+ volatile Thread serverThread;
+
+ AbstractTestConnectionClosureRace(final String scheme, final HttpVersionPolicy httpVersionPolicy) {
+ this.scheme = scheme;
+ this.httpVersionPolicy = httpVersionPolicy;
+ }
+
+ @BeforeAll
+ static void newline() {
+ System.out.println();
+ }
+
+ @BeforeEach
+ void setup() throws Exception {
+ if ("http".equals(scheme)) {
+ serverSocket = ServerSocketFactory.getDefault().createServerSocket(0);
+ } else {
+ final SSLContext serverSSLContext = SSLTestContexts.createServerSSLContext();
+ final SSLServerSocketFactory sslServerSocketFactory = serverSSLContext.getServerSocketFactory();
+ serverSocket = sslServerSocketFactory.createServerSocket(0);
+ }
+ port = serverSocket.getLocalPort();
+
+ serverThread = new Thread(this::runServer);
+ serverThread.setDaemon(true);
+ serverThread.start();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (serverSocket != null) {
+ serverSocket.close();
+ }
+ if (serverThread != null) {
+ serverThread.interrupt();
+ serverThread.join(1000);
+ }
+ }
+
+ @Test
+ @Timeout(5)
+ @Order(1)
+ void smokeTest() throws Exception {
+ try (final CloseableHttpAsyncClient client = asyncClient(false)) {
+ final SimpleHttpResponse simpleHttpResponse = sendPing(client).get();
+ assertEquals(200, simpleHttpResponse.getCode());
+ }
+ }
+
+ @ParameterizedTest(name = "Validation: {0}")
+ @ValueSource(booleans = { false, true })
+ @Timeout(5)
+ @Order(2)
+ void testSlowSequentialRequests(final boolean validateConnections) throws Exception {
+ try (final CloseableHttpAsyncClient client = asyncClient(validateConnections)) {
+ final List> futures = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ futures.add(sendPing(client));
+ Thread.sleep(25);
+ }
+
+ checkResults(getValidationPrefix(validateConnections) + "Sequential requests (slow)", futures);
+ }
+ }
+
+ @ParameterizedTest(name = "Validation: {0}")
+ @ValueSource(booleans = { false, true })
+ @Timeout(10)
+ @Order(3)
+ void testRapidSequentialRequests(final boolean validateConnections) throws Exception {
+ try (final CloseableHttpAsyncClient client = asyncClient(validateConnections)) {
+ final List> futures = new ArrayList<>();
+ for (int i = 0; i < 2500; i++) {
+ final Future f = sendPing(client);
+ try {
+ f.get();
+ } catch (final ExecutionException ignore) {
+ }
+ futures.add(f);
+ }
+
+ checkResults(getValidationPrefix(validateConnections) + "Sequential requests (rapid)", futures);
+ }
+ }
+
+ @ParameterizedTest(name = "Validation: {0}")
+ @ValueSource(booleans = { false, true })
+ @Timeout(5)
+ @Order(4)
+ void testOneLargeBatchOfRequests(final boolean validateConnections) throws Exception {
+ try (final CloseableHttpAsyncClient client = asyncClient(validateConnections)) {
+ final List> futures = new ArrayList<>();
+ for (int i = 0; i < 30; i++) {
+ futures.add(sendPing(client));
+ }
+
+ checkResults(getValidationPrefix(validateConnections) + "Single large batch", futures);
+ }
+ }
+
+ @ParameterizedTest(name = "Validation: {0}")
+ @ValueSource(booleans = { false, true })
+ @Timeout(5)
+ @Order(5)
+ void testSpacedOutBatchesOfRequests(final boolean validateConnections) throws Exception {
+ try (final CloseableHttpAsyncClient client = asyncClient(validateConnections)) {
+ final List> futures = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 3; j++) {
+ futures.add(sendPing(client));
+ }
+ Thread.sleep(25);
+ }
+
+ checkResults(getValidationPrefix(validateConnections) + "Multiple small batches", futures);
+ }
+ }
+
+ private static String getValidationPrefix(final boolean validateConnections) {
+ if (validateConnections) {
+ return "Validation enabled: ";
+ } else {
+ return "Validation disabled: ";
+ }
+ }
+
+ private void checkResults(final String name, final List> futures) {
+ int ok = 0, error = 0, notExecuted = 0, closed = 0, reset = 0, other = 0;
+ for (final Future future : futures) {
+ try {
+ future.get();
+ ok++;
+ } catch (final Exception ex) {
+ error++;
+ if (ex.getCause() instanceof RequestNotExecutedException) {
+ notExecuted++;
+ } else if (ex.getCause() instanceof ConnectionClosedException) {
+ closed++;
+ } else if (ex.getCause() instanceof SocketException && ex.getCause().getMessage().contains("reset")) {
+ reset++;
+ } else {
+ other++;
+ }
+ }
+ }
+
+ if (error > 0) {
+ System.out.printf("%s: %s: %,d succeeded; %,d failed (%.2f%% success rate, %.2f%% retriable)%n",
+ getClass().getSimpleName().toLowerCase(), name, ok, error,
+ (double) ok / (ok + error) * 100d,
+ (double) notExecuted / (ok + error) * 100d);
+ } else {
+ System.out.printf("%s: %s: %,d succeeded; %,d failed (%.2f%% success rate)%n",
+ getClass().getSimpleName().toLowerCase(), name, ok, error, (((double) ok) / (ok + error)) * 100d);
+ }
+ if (false) {
+ System.out.printf(" %,d not executed, %,d closed, %,d reset, %,d other%n", notExecuted, closed, reset,
+ other);
+ }
+ }
+
+ private Future sendPing(final CloseableHttpAsyncClient client) {
+ final HttpHost target = new HttpHost(scheme, "localhost", port);
+ final SimpleHttpRequest request = SimpleRequestBuilder.get().setHttpHost(target).setPath("/ping").build();
+
+ return client.execute(request, null);
+ }
+
+ private void runServer() {
+ try {
+ while (!Thread.currentThread().isInterrupted() && !serverSocket.isClosed()) {
+ final Socket socket = serverSocket.accept();
+ connectionsEstablished.incrementAndGet();
+ handleConnection(socket);
+ }
+ } catch (final IOException e) {
+ if (!Thread.currentThread().isInterrupted() && !serverSocket.isClosed()) {
+ System.err.println("Server error: " + e.getClass() + e.getMessage());
+ }
+ }
+ }
+
+ private void handleConnection(final Socket socket) throws IOException {
+ try {
+ if (socket instanceof SSLSocket) {
+ ((SSLSocket) socket).startHandshake();
+ }
+
+ final InputStream inputStream = socket.getInputStream();
+ final OutputStream outputStream = socket.getOutputStream();
+
+ if (httpVersionPolicy == FORCE_HTTP_2) {
+ sendHttp2Response(inputStream, outputStream);
+ } else {
+ sendHttp1Response(inputStream, outputStream);
+ }
+ outputStream.flush();
+ socket.close();
+ } catch (final SocketException ignore) {
+ // Connection closure was initiated on the server's end
+ } catch (final IOException ex) {
+ if (ex.getMessage() != null && ex.getMessage().startsWith("Connection reset")) {
+ System.err.println("Server saw connection closed by client");
+ return;
+ }
+ throw ex;
+ }
+ }
+
+ private static void sendHttp1Response(final InputStream inputStream, final OutputStream outputStream) throws IOException {
+ final byte[] buffer = new byte[4096];
+ final int bytesRead = inputStream.read(buffer);
+ if (bytesRead <= 0) {
+ return;
+ }
+
+ final String response = "HTTP/1.1 200 OK\r\n" +
+ "Content-Type: text/plain\r\n" +
+ "Content-Length: 2\r\n" +
+ "\r\n" +
+ "OK";
+
+ final byte[] responseBytes = response.getBytes(UTF_8);
+ outputStream.write(responseBytes);
+ }
+
+ private static void sendHttp2Response(final InputStream inputStream, final OutputStream outputStream) throws IOException {
+ final FrameOutputBuffer out = new FrameOutputBuffer(8192);
+ final FrameInputBuffer frameInputBuffer = new FrameInputBuffer(8192);
+ final DefaultFrameFactory ff = new DefaultFrameFactory();
+
+ for (int i = 0; i < 24; i++) inputStream.read(); // Read magic
+ while (!frameInputBuffer.read(inputStream).isType(FrameType.SETTINGS)) {
+ // Do nothing
+ }
+ out.write(ff.createSettingsAck(), outputStream);
+
+ out.write(ff.createSettings(
+ new H2Setting(H2Param.HEADER_TABLE_SIZE, 8192),
+ new H2Setting(H2Param.ENABLE_PUSH, 0),
+ new H2Setting(H2Param.MAX_CONCURRENT_STREAMS, 1),
+ new H2Setting(H2Param.INITIAL_WINDOW_SIZE, 65535),
+ new H2Setting(H2Param.MAX_FRAME_SIZE, 65536),
+ new H2Setting(H2Param.MAX_HEADER_LIST_SIZE, 16 * 1048576)
+ ), outputStream);
+
+ while (!frameInputBuffer.read(inputStream).isType(FrameType.HEADERS)) {
+ // Do nothing
+ }
+
+ final HPackEncoder hPackEncoder = new HPackEncoder(8192, UTF_8);
+ final ByteArrayBuffer headerBuffer = new ByteArrayBuffer(8192);
+ hPackEncoder.encodeHeaders(headerBuffer, Arrays.asList(
+ new BasicHeader(H2PseudoResponseHeaders.STATUS, Integer.toString(200), false),
+ new BasicHeader("content-type", "text/plain")
+ ), true);
+
+ out.write(ff.createHeaders(1, ByteBuffer.wrap(headerBuffer.toByteArray()), true, false), outputStream);
+ out.write(ff.createData(1, ByteBuffer.wrap("OK".getBytes(UTF_8)), true), outputStream);
+ out.write(ff.createGoAway(1, H2Error.NO_ERROR, null), outputStream);
+ }
+
+ private CloseableHttpAsyncClient asyncClient(final boolean validateConnections) throws Exception {
+ final PoolingAsyncClientConnectionManager connManager = PoolingAsyncClientConnectionManagerBuilder.create()
+ .setTlsStrategy(new DefaultClientTlsStrategy(SSLTestContexts.createClientSSLContext()))
+ .setDefaultTlsConfig(TlsConfig.custom()
+ .setSupportedProtocols(TLS.V_1_2)
+ .setVersionPolicy(httpVersionPolicy)
+ .build())
+ .build();
+ connManager.setDefaultConnectionConfig(getConnectionConfig(validateConnections));
+ connManager.setDefaultMaxPerRoute(1);
+ connManager.setMaxTotal(1);
+ final CloseableHttpAsyncClient client = HttpAsyncClients.custom()
+ .setIOReactorConfig(IOReactorConfig.custom()
+ .setSelectInterval(TimeValue.ofMilliseconds(1000))
+ .setIoThreadCount(1)
+ .build())
+ .setConnectionManager(connManager)
+ .disableAutomaticRetries()
+ .build();
+ client.start();
+ return client;
+ }
+
+ private static ConnectionConfig getConnectionConfig(final boolean validateConnections) {
+ return ConnectionConfig.custom()
+ .setTimeToLive(MAX_VALUE)
+ .setValidateAfterInactivity(validateConnections ? ZERO_MILLISECONDS : NEG_ONE_MILLISECOND)
+ .build();
+ }
+}
+
+@Tag("slow")
+public class TestConnectionClosureRace {
+ @Nested
+ class Http extends AbstractTestConnectionClosureRace {
+ public Http() {
+ super("http", FORCE_HTTP_1);
+ }
+ }
+
+ @Nested
+ class Https extends AbstractTestConnectionClosureRace {
+ public Https() {
+ super("https", FORCE_HTTP_1);
+ }
+ }
+
+ @Nested
+ class H2c extends AbstractTestConnectionClosureRace {
+ public H2c() {
+ super("http", FORCE_HTTP_2);
+ }
+ }
+
+ @Nested
+ class H2 extends AbstractTestConnectionClosureRace {
+ public H2() {
+ super("https", FORCE_HTTP_2);
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index fac588f7b..ea3089a56 100644
--- a/pom.xml
+++ b/pom.xml
@@ -356,9 +356,38 @@
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ ${hc.surefire.version}
+
+ slow
+
+
+
+
+ slow-tests
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ ${hc.surefire.version}
+
+ slow
+
+
+
+
+
+
+
+
+