Skip to content

Commit ad6e841

Browse files
committed
Fixes #763 - Optimize send of DATA frames.
Optimized StringContent to immediately send a last DATA frame for the whole string. In this way we avoid sending a last, empty, DATA frame. Signed-off-by: Simone Bordet <[email protected]>
1 parent 6b483e4 commit ad6e841

File tree

2 files changed

+151
-2
lines changed

2 files changed

+151
-2
lines changed

src/main/java/org/eclipse/jetty/reactive/client/internal/StringContent.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import java.nio.ByteBuffer;
1919
import java.nio.charset.Charset;
2020
import java.util.Objects;
21-
2221
import org.eclipse.jetty.io.Content;
2322
import org.eclipse.jetty.reactive.client.ReactiveRequest;
2423
import org.reactivestreams.Subscriber;
@@ -59,7 +58,8 @@ protected void onRequest(Subscriber<? super Content.Chunk> subscriber, long n) {
5958
switch (state) {
6059
case INITIAL: {
6160
state = State.CONTENT;
62-
emitOnNext(subscriber, Content.Chunk.from(ByteBuffer.wrap(bytes), false));
61+
// The whole string is sent at once, so this is the last chunk.
62+
emitOnNext(subscriber, Content.Chunk.from(ByteBuffer.wrap(bytes), true));
6363
break;
6464
}
6565
case CONTENT: {
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Copyright (c) 2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.eclipse.jetty.reactive.client;
17+
18+
import java.nio.charset.StandardCharsets;
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.atomic.AtomicReference;
24+
import org.eclipse.jetty.client.HttpClient;
25+
import org.eclipse.jetty.http.HttpFields;
26+
import org.eclipse.jetty.http.HttpStatus;
27+
import org.eclipse.jetty.http.HttpVersion;
28+
import org.eclipse.jetty.http.MetaData;
29+
import org.eclipse.jetty.http2.api.Stream;
30+
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
31+
import org.eclipse.jetty.http2.client.HTTP2Client;
32+
import org.eclipse.jetty.http2.client.transport.HttpClientTransportOverHTTP2;
33+
import org.eclipse.jetty.http2.frames.HeadersFrame;
34+
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
35+
import org.eclipse.jetty.io.ClientConnector;
36+
import org.eclipse.jetty.server.Server;
37+
import org.eclipse.jetty.server.ServerConnector;
38+
import org.eclipse.jetty.util.component.LifeCycle;
39+
import org.eclipse.jetty.util.thread.QueuedThreadPool;
40+
import org.junit.jupiter.api.AfterEach;
41+
import org.junit.jupiter.api.Test;
42+
import org.reactivestreams.Publisher;
43+
import org.reactivestreams.Subscriber;
44+
import org.reactivestreams.Subscription;
45+
46+
import static org.junit.jupiter.api.Assertions.assertEquals;
47+
import static org.junit.jupiter.api.Assertions.assertNotNull;
48+
import static org.junit.jupiter.api.Assertions.assertTrue;
49+
50+
public class HTTP2Test {
51+
private Server server;
52+
private ServerConnector connector;
53+
private HttpClient httpClient;
54+
55+
private void start(ServerSessionListener listener) throws Exception {
56+
QueuedThreadPool serverThreads = new QueuedThreadPool();
57+
serverThreads.setName("server");
58+
server = new Server(serverThreads);
59+
RawHTTP2ServerConnectionFactory h2c = new RawHTTP2ServerConnectionFactory(listener);
60+
connector = new ServerConnector(server, 1, 1, h2c);
61+
server.addConnector(connector);
62+
server.start();
63+
64+
QueuedThreadPool clientThreads = new QueuedThreadPool();
65+
clientThreads.setName("client");
66+
ClientConnector clientConnector = new ClientConnector();
67+
clientConnector.setExecutor(clientThreads);
68+
clientConnector.setSelectors(1);
69+
httpClient = new HttpClient(new HttpClientTransportOverHTTP2(new HTTP2Client(clientConnector)));
70+
httpClient.start();
71+
}
72+
73+
@AfterEach
74+
public void dispose() {
75+
LifeCycle.stop(httpClient);
76+
LifeCycle.stop(server);
77+
}
78+
79+
@Test
80+
public void testOptimizeLastEmptyDataFrame() throws Exception {
81+
List<Stream.Data> datas = new ArrayList<>();
82+
start(new ServerSessionListener() {
83+
@Override
84+
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) {
85+
MetaData.Response response = new MetaData.Response(HttpStatus.OK_200, null, HttpVersion.HTTP_2, HttpFields.EMPTY);
86+
stream.headers(new HeadersFrame(stream.getId(), response, null, false));
87+
stream.demand();
88+
return new Stream.Listener() {
89+
@Override
90+
public void onDataAvailable(Stream stream) {
91+
Stream.Data data = stream.readData();
92+
if (data == null) {
93+
stream.demand();
94+
return;
95+
}
96+
datas.add(data);
97+
stream.data(data.frame())
98+
.thenRun(() ->
99+
{
100+
data.release();
101+
if (!data.frame().isEndStream()) {
102+
stream.demand();
103+
}
104+
});
105+
}
106+
};
107+
}
108+
});
109+
110+
String content = "hello world";
111+
String uri = "http://localhost:" + connector.getLocalPort();
112+
Publisher<ReactiveResponse.Result<String>> publisher = ReactiveRequest.newBuilder(httpClient, uri)
113+
.content(ReactiveRequest.Content.fromString(content, "text/plain", StandardCharsets.UTF_8))
114+
.build()
115+
.response(ReactiveResponse.Content.asStringResult());
116+
117+
CountDownLatch latch = new CountDownLatch(1);
118+
AtomicReference<ReactiveResponse.Result<String>> resultRef = new AtomicReference<>();
119+
publisher.subscribe(new Subscriber<>() {
120+
@Override
121+
public void onSubscribe(Subscription subscription) {
122+
subscription.request(1);
123+
}
124+
125+
@Override
126+
public void onNext(ReactiveResponse.Result<String> result) {
127+
resultRef.set(result);
128+
}
129+
130+
@Override
131+
public void onError(Throwable failure) {
132+
}
133+
134+
@Override
135+
public void onComplete() {
136+
latch.countDown();
137+
}
138+
});
139+
140+
assertTrue(latch.await(5, TimeUnit.SECONDS));
141+
ReactiveResponse.Result<String> result = resultRef.get();
142+
assertNotNull(result);
143+
ReactiveResponse response = result.response();
144+
assertNotNull(response);
145+
assertEquals(HttpStatus.OK_200, response.getStatus());
146+
assertEquals(content, result.content());
147+
assertEquals(1, datas.size());
148+
}
149+
}

0 commit comments

Comments
 (0)