11use alloy_json_rpc:: {
2- transform_response, try_deserialize_ok, Request , RequestPacket , ResponsePacket , RpcParam ,
3- RpcResult , RpcReturn ,
2+ transform_response, try_deserialize_ok, Request , ResponsePacket , RpcParam , RpcReturn ,
43} ;
54use alloy_transport:: { BoxTransport , IntoBoxTransport , RpcFut , TransportError , TransportResult } ;
65use core:: panic;
7- use futures:: FutureExt ;
8- use serde_json:: value:: RawValue ;
96use std:: {
107 fmt,
11- future:: Future ,
8+ future:: { Future , IntoFuture } ,
129 marker:: PhantomData ,
13- pin:: Pin ,
14- task:: { self , ready, Poll :: Ready } ,
1510} ;
1611use tower:: Service ;
1712
18- /// The states of the [`RpcCall`] future.
19- #[ must_use = "futures do nothing unless you `.await` or poll them" ]
20- #[ pin_project:: pin_project( project = CallStateProj ) ]
21- enum CallState < Params >
22- where
23- Params : RpcParam ,
24- {
25- Prepared {
26- request : Option < Request < Params > > ,
27- connection : BoxTransport ,
28- } ,
29- AwaitingResponse {
30- #[ pin]
31- fut : <BoxTransport as Service < RequestPacket > >:: Future ,
32- } ,
33- Complete ,
34- }
35-
36- impl < Params > Clone for CallState < Params >
37- where
38- Params : RpcParam ,
39- {
40- fn clone ( & self ) -> Self {
41- match self {
42- Self :: Prepared { request, connection } => {
43- Self :: Prepared { request : request. clone ( ) , connection : connection. clone ( ) }
44- }
45- _ => panic ! ( "cloned after dispatch" ) ,
46- }
47- }
48- }
49-
50- impl < Params > fmt:: Debug for CallState < Params >
51- where
52- Params : RpcParam ,
53- {
54- fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
55- f. write_str ( match self {
56- Self :: Prepared { .. } => "Prepared" ,
57- Self :: AwaitingResponse { .. } => "AwaitingResponse" ,
58- Self :: Complete => "Complete" ,
59- } )
60- }
61- }
62-
63- impl < Params > Future for CallState < Params >
64- where
65- Params : RpcParam ,
66- {
67- type Output = TransportResult < Box < RawValue > > ;
68-
69- fn poll ( mut self : Pin < & mut Self > , cx : & mut task:: Context < ' _ > ) -> task:: Poll < Self :: Output > {
70- loop {
71- match self . as_mut ( ) . project ( ) {
72- CallStateProj :: Prepared { connection, request } => {
73- if let Err ( e) =
74- task:: ready!( Service :: <RequestPacket >:: poll_ready( connection, cx) )
75- {
76- self . set ( Self :: Complete ) ;
77- return Ready ( RpcResult :: Err ( e) ) ;
78- }
79-
80- let request = request. take ( ) . expect ( "no request" ) ;
81- debug ! ( method=%request. meta. method, id=%request. meta. id, "sending request" ) ;
82- trace ! ( params_ty=%std:: any:: type_name:: <Params >( ) , ?request, "full request" ) ;
83- let request = request. serialize ( ) ;
84- let fut = match request {
85- Ok ( request) => {
86- trace ! ( request=%request. serialized( ) , "serialized request" ) ;
87- connection. call ( request. into ( ) )
88- }
89- Err ( err) => {
90- trace ! ( ?err, "failed to serialize request" ) ;
91- self . set ( Self :: Complete ) ;
92- return Ready ( RpcResult :: Err ( TransportError :: ser_err ( err) ) ) ;
93- }
94- } ;
95- self . set ( Self :: AwaitingResponse { fut } ) ;
96- }
97- CallStateProj :: AwaitingResponse { fut } => {
98- let res = match task:: ready!( fut. poll( cx) ) {
99- Ok ( ResponsePacket :: Single ( res) ) => Ready ( transform_response ( res) ) ,
100- Err ( e) => Ready ( RpcResult :: Err ( e) ) ,
101- _ => panic ! ( "received batch response from single request" ) ,
102- } ;
103- self . set ( Self :: Complete ) ;
104- return res;
105- }
106- CallStateProj :: Complete => {
107- panic ! ( "Polled after completion" ) ;
108- }
109- }
110- }
111- }
112- }
113-
11413/// A prepared, but unsent, RPC call.
11514///
11615/// This is a future that will send the request when polled. It contains a
@@ -130,26 +29,21 @@ where
13029/// batch request must immediately erase the `Param` type to allow batching of
13130/// requests with different `Param` types, while the `RpcCall` may do so lazily.
13231#[ must_use = "futures do nothing unless you `.await` or poll them" ]
133- #[ pin_project:: pin_project]
13432#[ derive( Clone ) ]
135- pub struct RpcCall < Params , Resp , Output = Resp , Map = fn ( Resp ) -> Output >
136- where
137- Params : RpcParam ,
138- Map : FnOnce ( Resp ) -> Output ,
139- {
140- #[ pin]
141- state : CallState < Params > ,
142- map : Option < Map > ,
33+ pub struct RpcCall < Params , Resp , Output = Resp , Map = fn ( Resp ) -> Output > {
34+ request : Request < Params > ,
35+ connection : BoxTransport ,
36+ map : Map ,
14337 _pd : core:: marker:: PhantomData < fn ( ) -> ( Resp , Output ) > ,
14438}
14539
146- impl < Params , Resp , Output , Map > core :: fmt:: Debug for RpcCall < Params , Resp , Output , Map >
40+ impl < Params , Resp , Output , Map > fmt:: Debug for RpcCall < Params , Resp , Output , Map >
14741where
14842 Params : RpcParam ,
14943 Map : FnOnce ( Resp ) -> Output ,
15044{
151- fn fmt ( & self , f : & mut core :: fmt:: Formatter < ' _ > ) -> core :: fmt:: Result {
152- f. debug_struct ( "RpcCall" ) . field ( "state" , & self . state ) . finish ( )
45+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
46+ f. debug_struct ( "RpcCall" ) . finish_non_exhaustive ( )
15347 }
15448}
15549
@@ -158,13 +52,11 @@ where
15852 Params : RpcParam ,
15953{
16054 #[ doc( hidden) ]
161- pub fn new ( req : Request < Params > , connection : impl IntoBoxTransport ) -> Self {
55+ pub fn new ( request : Request < Params > , connection : impl IntoBoxTransport ) -> Self {
16256 Self {
163- state : CallState :: Prepared {
164- request : Some ( req) ,
165- connection : connection. into_box_transport ( ) ,
166- } ,
167- map : Some ( std:: convert:: identity) ,
57+ request,
58+ connection : connection. into_box_transport ( ) ,
59+ map : std:: convert:: identity,
16860 _pd : PhantomData ,
16961 }
17062 }
@@ -193,24 +85,16 @@ where
19385 where
19486 NewMap : FnOnce ( Resp ) -> NewOutput ,
19587 {
196- RpcCall { state : self . state , map : Some ( map) , _pd : PhantomData }
88+ RpcCall { request : self . request , connection : self . connection , map, _pd : PhantomData }
19789 }
19890
19991 /// Returns `true` if the request is a subscription.
200- ///
201- /// # Panics
202- ///
203- /// Panics if called after the request has been sent.
20492 pub fn is_subscription ( & self ) -> bool {
20593 self . request ( ) . meta . is_subscription ( )
20694 }
20795
20896 /// Set the request to be a non-standard subscription (i.e. not
20997 /// "eth_subscribe").
210- ///
211- /// # Panics
212- ///
213- /// Panics if called after the request has been sent.
21498 pub fn set_is_subscription ( & mut self ) {
21599 self . request_mut ( ) . meta . set_is_subscription ( ) ;
216100 }
@@ -224,49 +108,28 @@ where
224108 ///
225109 /// This is useful for modifying the params after the request has been
226110 /// prepared.
227- ///
228- /// # Panics
229- ///
230- /// Panics if called after the request has been sent.
231111 pub fn params ( & mut self ) -> & mut Params {
232112 & mut self . request_mut ( ) . params
233113 }
234114
235115 /// Returns a reference to the request.
236- ///
237- /// # Panics
238- ///
239- /// Panics if called after the request has been sent.
240116 pub fn request ( & self ) -> & Request < Params > {
241- let CallState :: Prepared { request, .. } = & self . state else {
242- panic ! ( "Cannot get request after request has been sent" ) ;
243- } ;
244- request. as_ref ( ) . expect ( "no request in prepared" )
117+ & self . request
245118 }
246119
247120 /// Returns a mutable reference to the request.
248- ///
249- /// # Panics
250- ///
251- /// Panics if called after the request has been sent.
252121 pub fn request_mut ( & mut self ) -> & mut Request < Params > {
253- let CallState :: Prepared { request, .. } = & mut self . state else {
254- panic ! ( "Cannot get request after request has been sent" ) ;
255- } ;
256- request. as_mut ( ) . expect ( "no request in prepared" )
122+ & mut self . request
257123 }
258124
259125 /// Map the params of the request into a new type.
260126 pub fn map_params < NewParams : RpcParam > (
261127 self ,
262- map : impl Fn ( Params ) -> NewParams ,
128+ map : impl FnOnce ( Params ) -> NewParams ,
263129 ) -> RpcCall < NewParams , Resp , Output , Map > {
264- let CallState :: Prepared { request, connection } = self . state else {
265- panic ! ( "Cannot get request after request has been sent" ) ;
266- } ;
267- let request = request. expect ( "no request in prepared" ) . map_params ( map) ;
268130 RpcCall {
269- state : CallState :: Prepared { request : Some ( request) , connection } ,
131+ request : self . request . map_params ( map) ,
132+ connection : self . connection ,
270133 map : self . map ,
271134 _pd : PhantomData ,
272135 }
@@ -285,13 +148,9 @@ where
285148 ///
286149 /// Panics if called after the request has been polled.
287150 pub fn into_owned_params ( self ) -> RpcCall < Params :: Owned , Resp , Output , Map > {
288- let CallState :: Prepared { request, connection } = self . state else {
289- panic ! ( "Cannot get params after request has been sent" ) ;
290- } ;
291- let request = request. expect ( "no request in prepared" ) . into_owned_params ( ) ;
292-
293151 RpcCall {
294- state : CallState :: Prepared { request : Some ( request) , connection } ,
152+ request : self . request . into_owned_params ( ) ,
153+ connection : self . connection ,
295154 map : self . map ,
296155 _pd : PhantomData ,
297156 }
@@ -302,30 +161,37 @@ impl<'a, Params, Resp, Output, Map> RpcCall<Params, Resp, Output, Map>
302161where
303162 Params : RpcParam + ' a ,
304163 Resp : RpcReturn ,
305- Output : ' static ,
164+ Output : ' a ,
306165 Map : FnOnce ( Resp ) -> Output + Send + ' a ,
307166{
308167 /// Convert this future into a boxed, pinned future, erasing its type.
309168 pub fn boxed ( self ) -> RpcFut < ' a , Output > {
310- Box :: pin ( self )
169+ self . into_future ( )
170+ }
171+
172+ async fn do_call ( self ) -> TransportResult < Output > {
173+ let Self { request, mut connection, map, _pd : PhantomData } = self ;
174+ std:: future:: poll_fn ( |cx| connection. poll_ready ( cx) ) . await ?;
175+ let serialized_request = request. serialize ( ) . map_err ( TransportError :: ser_err) ?;
176+ let response_packet = connection. call ( serialized_request. into ( ) ) . await ?;
177+ let ResponsePacket :: Single ( response) = response_packet else {
178+ panic ! ( "received batch response from single request" )
179+ } ;
180+ try_deserialize_ok ( transform_response ( response) ) . map ( map)
311181 }
312182}
313183
314- impl < Params , Resp , Output , Map > Future for RpcCall < Params , Resp , Output , Map >
184+ impl < ' a , Params , Resp , Output , Map > IntoFuture for RpcCall < Params , Resp , Output , Map >
315185where
316- Params : RpcParam ,
186+ Params : RpcParam + ' a ,
317187 Resp : RpcReturn ,
318- Output : ' static ,
319- Map : FnOnce ( Resp ) -> Output ,
188+ Output : ' a ,
189+ Map : FnOnce ( Resp ) -> Output + Send + ' a ,
320190{
321- type Output = TransportResult < Output > ;
322-
323- fn poll ( self : Pin < & mut Self > , cx : & mut task:: Context < ' _ > ) -> task:: Poll < Self :: Output > {
324- trace ! ( ?self . state, "polling RpcCall" ) ;
325-
326- let this = self . get_mut ( ) ;
327- let resp = try_deserialize_ok ( ready ! ( this. state. poll_unpin( cx) ) ) ;
191+ type IntoFuture = RpcFut < ' a , Output > ;
192+ type Output = <Self :: IntoFuture as Future >:: Output ;
328193
329- Ready ( resp. map ( this. map . take ( ) . expect ( "polled after completion" ) ) )
194+ fn into_future ( self ) -> Self :: IntoFuture {
195+ Box :: pin ( self . do_call ( ) )
330196 }
331197}
0 commit comments