Skip to content

Commit ddc2d28

Browse files
committed
tmp
Signed-off-by: Matt Klein <[email protected]>
1 parent 415236f commit ddc2d28

File tree

8 files changed

+252
-86
lines changed

8 files changed

+252
-86
lines changed

Cargo.lock

Lines changed: 58 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ arbitrary = { version = "1.4.1", features = ["derive"] }
4848
arc-swap = "1.7.1"
4949
assert_matches = "1.5.0"
5050
async-trait = "0.1.85"
51-
axum = { version = "0.8.1", features = ["http2", "macros"] }
51+
axum = { version = "0.8.1", features = ["http2", "macros", "ws"] }
5252
axum-server = { version = "0.7.1", features = ["tls-rustls-no-provider"] }
5353
backoff = "0.4.0"
5454
base64 = "0.22.1"

bd-grpc/src/client.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,7 @@ use crate::error::{Error, Result};
1111
use crate::service::ServiceMethod;
1212
use crate::status::{Code, Status};
1313
use crate::{
14-
finalize_decompression,
15-
StreamingApi,
16-
StreamingApiReceiver,
17-
CONNECT_PROTOCOL_VERSION,
18-
CONTENT_ENCODING_SNAPPY,
19-
GRPC_STATUS,
20-
TRANSFER_ENCODING_TRAILERS,
14+
finalize_decompression, BodyFramer, SingleFrame, StreamingApi, StreamingApiReceiver, CONNECT_PROTOCOL_VERSION, CONTENT_ENCODING_SNAPPY, GRPC_STATUS, TRANSFER_ENCODING_TRAILERS
2115
};
2216
use assert_matches::debug_assert_matches;
2317
use axum::body::Body;
@@ -32,8 +26,10 @@ use bd_grpc_codec::{
3226
GRPC_ENCODING_HEADER,
3327
};
3428
use bd_time::TimeDurationExt;
29+
use futures::StreamExt;
3530
use http::header::{CONTENT_ENCODING, CONTENT_TYPE, TRANSFER_ENCODING};
3631
use http::{HeaderMap, Uri};
32+
use http_body::Frame;
3733
use http_body_util::{BodyExt, StreamBody};
3834
use hyper::body::Incoming;
3935
use hyper_util::client::legacy::connect::{Connect, HttpConnector};
@@ -296,9 +292,13 @@ impl<C: Connect + Clone + Send + Sync + 'static> Client<C> {
296292
validate: bool,
297293
compression: Option<bd_grpc_codec::Compression>,
298294
optimize_for: OptimizeFor,
299-
) -> Result<StreamingApi<OutgoingType, IncomingType>> {
295+
) -> Result<StreamingApi<OutgoingType, IncomingType, BodyFramer>> {
300296
let (tx, rx) = mpsc::channel(1);
301-
let body = StreamBody::new(ReceiverStream::new(rx));
297+
let body = StreamBody::new(ReceiverStream::new(rx).map(|f| match f {
298+
Ok(SingleFrame::Data(data)) => Ok(Frame::data(data)),
299+
Ok(SingleFrame::Trailers(trailers)) => Ok(Frame::trailers(trailers)),
300+
Err(e) => Err(e),
301+
}));
302302

303303
match compression {
304304
None => {},
@@ -347,7 +347,7 @@ impl<C: Connect + Clone + Send + Sync + 'static> Client<C> {
347347
validate: bool,
348348
optimize_for: OptimizeFor,
349349
connect_protocol: Option<ConnectProtocolType>,
350-
) -> Result<StreamingApiReceiver<IncomingType>> {
350+
) -> Result<StreamingApiReceiver<IncomingType, BodyFramer>> {
351351
let mut encoder = Encoder::new(None);
352352
let response = self
353353
.common_request(
@@ -360,7 +360,9 @@ impl<C: Connect + Clone + Send + Sync + 'static> Client<C> {
360360
let (parts, body) = response.into_parts();
361361
Ok(StreamingApiReceiver::new(
362362
parts.headers,
363-
Body::new(body),
363+
BodyFramer {
364+
body: Body::new(body),
365+
},
364366
validate,
365367
optimize_for,
366368
None,

bd-grpc/src/grpc_test.rs

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,7 @@ use crate::compression::{Compression, ConnectSafeCompressionLayer};
1010
use crate::connect_protocol::ConnectProtocolType;
1111
use crate::generated::proto::test::{EchoRequest, EchoResponse};
1212
use crate::{
13-
make_server_streaming_router,
14-
make_unary_router,
15-
new_grpc_response,
16-
Code,
17-
Error,
18-
Handler,
19-
Result,
20-
ServerStreamingHandler,
21-
ServiceMethod,
22-
Status,
23-
StreamStats,
24-
StreamingApi,
25-
StreamingApiSender,
26-
CONNECT_PROTOCOL_VERSION,
27-
CONTENT_TYPE,
28-
CONTENT_TYPE_PROTO,
13+
make_server_streaming_router, make_unary_router, new_grpc_response, BodyFramer, Code, Error, Handler, Result, ServerStreamingHandler, ServiceMethod, SingleFrame, Status, StreamStats, StreamingApi, StreamingApiSender, CONNECT_PROTOCOL_VERSION, CONTENT_TYPE, CONTENT_TYPE_PROTO
2914
};
3015
use assert_matches::assert_matches;
3116
use async_trait::async_trait;
@@ -39,8 +24,9 @@ use bd_server_stats::stats::CounterWrapper;
3924
use bd_server_stats::test::util::stats::{self, Helper};
4025
use bd_time::TimeDurationExt;
4126
use bytes::Bytes;
42-
use futures::poll;
27+
use futures::{poll, StreamExt};
4328
use http::{Extensions, HeaderMap};
29+
use http_body::Frame;
4430
use http_body_util::StreamBody;
4531
use parking_lot::Mutex;
4632
use prometheus::labels;
@@ -389,11 +375,17 @@ async fn read_stop() {
389375
let (parts, body) = request.into_parts();
390376
let (response_sender, response_body) = mpsc::channel(1);
391377
let response = new_grpc_response(
392-
Body::new(StreamBody::new(ReceiverStream::new(response_body))),
378+
Body::new(StreamBody::new(ReceiverStream::new(response_body).map(
379+
|f| match f {
380+
Ok(SingleFrame::Data(data)) => Ok(Frame::data(data)),
381+
Ok(SingleFrame::Trailers(trailers)) => Ok(Frame::trailers(trailers)),
382+
Err(e) => Err(e),
383+
},
384+
))),
393385
None,
394386
None,
395387
);
396-
let api = StreamingApi::<EchoResponse, EchoRequest>::new(
388+
let api = StreamingApi::<EchoResponse, EchoRequest, BodyFramer>::new(
397389
response_sender,
398390
parts.headers,
399391
body,

0 commit comments

Comments
 (0)