From ad6e841ea83630443d2e740ce835ca7dd4e759de Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 25 Sep 2025 17:31:57 +0200 Subject: [PATCH 1/2] Fixes #763 - Optimize send of DATA frames. Optimized StringContent to immediately send a last DATA frame for the whole string. In this way we avoid sending a last, empty, DATA frame. Signed-off-by: Simone Bordet --- .../client/internal/StringContent.java | 4 +- .../jetty/reactive/client/HTTP2Test.java | 149 ++++++++++++++++++ 2 files changed, 151 insertions(+), 2 deletions(-) create mode 100644 src/test/java/org/eclipse/jetty/reactive/client/HTTP2Test.java diff --git a/src/main/java/org/eclipse/jetty/reactive/client/internal/StringContent.java b/src/main/java/org/eclipse/jetty/reactive/client/internal/StringContent.java index 53799dca..e19161e7 100644 --- a/src/main/java/org/eclipse/jetty/reactive/client/internal/StringContent.java +++ b/src/main/java/org/eclipse/jetty/reactive/client/internal/StringContent.java @@ -18,7 +18,6 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Objects; - import org.eclipse.jetty.io.Content; import org.eclipse.jetty.reactive.client.ReactiveRequest; import org.reactivestreams.Subscriber; @@ -59,7 +58,8 @@ protected void onRequest(Subscriber subscriber, long n) { switch (state) { case INITIAL: { state = State.CONTENT; - emitOnNext(subscriber, Content.Chunk.from(ByteBuffer.wrap(bytes), false)); + // The whole string is sent at once, so this is the last chunk. + emitOnNext(subscriber, Content.Chunk.from(ByteBuffer.wrap(bytes), true)); break; } case CONTENT: { diff --git a/src/test/java/org/eclipse/jetty/reactive/client/HTTP2Test.java b/src/test/java/org/eclipse/jetty/reactive/client/HTTP2Test.java new file mode 100644 index 00000000..0a3bb016 --- /dev/null +++ b/src/test/java/org/eclipse/jetty/reactive/client/HTTP2Test.java @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.jetty.reactive.client; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.client.HTTP2Client; +import org.eclipse.jetty.http2.client.transport.HttpClientTransportOverHTTP2; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory; +import org.eclipse.jetty.io.ClientConnector; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class HTTP2Test { + private Server server; + private ServerConnector connector; + private HttpClient httpClient; + + private void start(ServerSessionListener listener) throws Exception { + QueuedThreadPool serverThreads = new QueuedThreadPool(); + serverThreads.setName("server"); + server = new Server(serverThreads); + RawHTTP2ServerConnectionFactory h2c = new RawHTTP2ServerConnectionFactory(listener); + connector = new ServerConnector(server, 1, 1, h2c); + server.addConnector(connector); + server.start(); + + QueuedThreadPool clientThreads = new QueuedThreadPool(); + clientThreads.setName("client"); + ClientConnector clientConnector = new ClientConnector(); + clientConnector.setExecutor(clientThreads); + clientConnector.setSelectors(1); + httpClient = new HttpClient(new HttpClientTransportOverHTTP2(new HTTP2Client(clientConnector))); + httpClient.start(); + } + + @AfterEach + public void dispose() { + LifeCycle.stop(httpClient); + LifeCycle.stop(server); + } + + @Test + public void testOptimizeLastEmptyDataFrame() throws Exception { + List datas = new ArrayList<>(); + start(new ServerSessionListener() { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) { + MetaData.Response response = new MetaData.Response(HttpStatus.OK_200, null, HttpVersion.HTTP_2, HttpFields.EMPTY); + stream.headers(new HeadersFrame(stream.getId(), response, null, false)); + stream.demand(); + return new Stream.Listener() { + @Override + public void onDataAvailable(Stream stream) { + Stream.Data data = stream.readData(); + if (data == null) { + stream.demand(); + return; + } + datas.add(data); + stream.data(data.frame()) + .thenRun(() -> + { + data.release(); + if (!data.frame().isEndStream()) { + stream.demand(); + } + }); + } + }; + } + }); + + String content = "hello world"; + String uri = "http://localhost:" + connector.getLocalPort(); + Publisher> publisher = ReactiveRequest.newBuilder(httpClient, uri) + .content(ReactiveRequest.Content.fromString(content, "text/plain", StandardCharsets.UTF_8)) + .build() + .response(ReactiveResponse.Content.asStringResult()); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference> resultRef = new AtomicReference<>(); + publisher.subscribe(new Subscriber<>() { + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(1); + } + + @Override + public void onNext(ReactiveResponse.Result result) { + resultRef.set(result); + } + + @Override + public void onError(Throwable failure) { + } + + @Override + public void onComplete() { + latch.countDown(); + } + }); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + ReactiveResponse.Result result = resultRef.get(); + assertNotNull(result); + ReactiveResponse response = result.response(); + assertNotNull(response); + assertEquals(HttpStatus.OK_200, response.getStatus()); + assertEquals(content, result.content()); + assertEquals(1, datas.size()); + } +} From cc58af3bb0303230c871199e5ba0e21cdef6dbb9 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 25 Sep 2025 18:32:55 +0200 Subject: [PATCH 2/2] Further simplifications. Signed-off-by: Simone Bordet --- .../client/internal/StringContent.java | 25 +++---------------- 1 file changed, 3 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/eclipse/jetty/reactive/client/internal/StringContent.java b/src/main/java/org/eclipse/jetty/reactive/client/internal/StringContent.java index e19161e7..9cf7bf2f 100644 --- a/src/main/java/org/eclipse/jetty/reactive/client/internal/StringContent.java +++ b/src/main/java/org/eclipse/jetty/reactive/client/internal/StringContent.java @@ -29,7 +29,6 @@ public class StringContent extends AbstractSinglePublisher implem private final String mediaType; private final Charset encoding; private final byte[] bytes; - private State state = State.INITIAL; public StringContent(String string, String mediaType, Charset encoding) { this.mediaType = Objects.requireNonNull(mediaType); @@ -49,31 +48,13 @@ public String getContentType() { @Override public boolean rewind() { - state = State.INITIAL; return true; } @Override protected void onRequest(Subscriber subscriber, long n) { - switch (state) { - case INITIAL: { - state = State.CONTENT; - // The whole string is sent at once, so this is the last chunk. - emitOnNext(subscriber, Content.Chunk.from(ByteBuffer.wrap(bytes), true)); - break; - } - case CONTENT: { - state = State.COMPLETE; - emitOnComplete(subscriber); - break; - } - default: { - break; - } - } - } - - private enum State { - INITIAL, CONTENT, COMPLETE + // The whole string is sent at once, so this is the last chunk. + emitOnNext(subscriber, Content.Chunk.from(ByteBuffer.wrap(bytes), true)); + emitOnComplete(subscriber); } }