Skip to content

Commit 4b51376

Browse files
author
Devdutt Shenoi
committed
refactor: Pending type to pending request and promise_tx
1 parent 267ae8f commit 4b51376

File tree

7 files changed

+247
-147
lines changed

7 files changed

+247
-147
lines changed

rumqttc/src/client.rs

Lines changed: 44 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::time::Duration;
44

55
use crate::mqttbytes::{v4::*, QoS};
66
use crate::{
7-
valid_filter, valid_topic, AckPromise, ConnectionError, Event, EventLoop, MqttOptions,
7+
valid_filter, valid_topic, AckPromise, ConnectionError, Event, EventLoop, MqttOptions, Pending,
88
PromiseTx, Request,
99
};
1010

@@ -23,15 +23,15 @@ pub enum ClientError {
2323
TryRequest(Request),
2424
}
2525

26-
impl From<SendError<(Request, Option<PromiseTx>)>> for ClientError {
27-
fn from(e: SendError<(Request, Option<PromiseTx>)>) -> Self {
28-
Self::Request(e.into_inner().0)
26+
impl From<SendError<Pending<Request>>> for ClientError {
27+
fn from(e: SendError<Pending<Request>>) -> Self {
28+
Self::Request(e.into_inner().request)
2929
}
3030
}
3131

32-
impl From<TrySendError<(Request, Option<PromiseTx>)>> for ClientError {
33-
fn from(e: TrySendError<(Request, Option<PromiseTx>)>) -> Self {
34-
Self::TryRequest(e.into_inner().0)
32+
impl From<TrySendError<Pending<Request>>> for ClientError {
33+
fn from(e: TrySendError<Pending<Request>>) -> Self {
34+
Self::TryRequest(e.into_inner().request)
3535
}
3636
}
3737

@@ -44,7 +44,7 @@ impl From<TrySendError<(Request, Option<PromiseTx>)>> for ClientError {
4444
/// from the broker, i.e. move ahead.
4545
#[derive(Clone, Debug)]
4646
pub struct AsyncClient {
47-
request_tx: Sender<(Request, Option<PromiseTx>)>,
47+
request_tx: Sender<Pending<Request>>,
4848
}
4949

5050
impl AsyncClient {
@@ -64,7 +64,7 @@ impl AsyncClient {
6464
///
6565
/// This is mostly useful for creating a test instance where you can
6666
/// listen on the corresponding receiver.
67-
pub fn from_senders(request_tx: Sender<(Request, Option<PromiseTx>)>) -> AsyncClient {
67+
pub fn from_senders(request_tx: Sender<Pending<Request>>) -> AsyncClient {
6868
AsyncClient { request_tx }
6969
}
7070

@@ -84,12 +84,11 @@ impl AsyncClient {
8484
let topic = topic.into();
8585
let mut publish = Publish::new(&topic, qos, payload);
8686
publish.retain = retain;
87-
let publish = Request::Publish(publish);
8887
if !valid_topic(&topic) {
89-
return Err(ClientError::Request(publish));
88+
return Err(ClientError::Request(publish.into()));
9089
}
9190
self.request_tx
92-
.send_async((publish, Some(promise_tx)))
91+
.send_async(Pending::new(publish.into(), promise_tx))
9392
.await?;
9493

9594
Ok(promise)
@@ -111,11 +110,11 @@ impl AsyncClient {
111110
let topic = topic.into();
112111
let mut publish = Publish::new(&topic, qos, payload);
113112
publish.retain = retain;
114-
let publish = Request::Publish(publish);
115113
if !valid_topic(&topic) {
116-
return Err(ClientError::TryRequest(publish));
114+
return Err(ClientError::TryRequest(publish.into()));
117115
}
118-
self.request_tx.try_send((publish, Some(promise_tx)))?;
116+
self.request_tx
117+
.try_send(Pending::new(publish.into(), promise_tx))?;
119118

120119
Ok(promise)
121120
}
@@ -125,7 +124,9 @@ impl AsyncClient {
125124
let ack = get_ack_req(publish);
126125

127126
if let Some(ack) = ack {
128-
self.request_tx.send_async((ack, None)).await?;
127+
self.request_tx
128+
.send_async(Pending::no_promises(ack))
129+
.await?;
129130
}
130131

131132
Ok(())
@@ -135,7 +136,7 @@ impl AsyncClient {
135136
pub fn try_ack(&self, publish: &Publish) -> Result<(), ClientError> {
136137
let ack = get_ack_req(publish);
137138
if let Some(ack) = ack {
138-
self.request_tx.try_send((ack, None))?;
139+
self.request_tx.try_send(Pending::no_promises(ack))?;
139140
}
140141

141142
Ok(())
@@ -155,9 +156,8 @@ impl AsyncClient {
155156
let (promise_tx, promise) = PromiseTx::new();
156157
let mut publish = Publish::from_bytes(topic, qos, payload);
157158
publish.retain = retain;
158-
let publish = Request::Publish(publish);
159159
self.request_tx
160-
.send_async((publish, Some(promise_tx)))
160+
.send_async(Pending::new(publish.into(), promise_tx))
161161
.await?;
162162

163163
Ok(promise)
@@ -175,7 +175,7 @@ impl AsyncClient {
175175
return Err(ClientError::Request(subscribe.into()));
176176
}
177177
self.request_tx
178-
.send_async((subscribe.into(), Some(promise_tx)))
178+
.send_async(Pending::new(subscribe.into(), promise_tx))
179179
.await?;
180180

181181
Ok(promise)
@@ -193,7 +193,7 @@ impl AsyncClient {
193193
return Err(ClientError::TryRequest(subscribe.into()));
194194
}
195195
self.request_tx
196-
.try_send((subscribe.into(), Some(promise_tx)))?;
196+
.try_send(Pending::new(subscribe.into(), promise_tx))?;
197197

198198
Ok(promise)
199199
}
@@ -209,7 +209,7 @@ impl AsyncClient {
209209
return Err(ClientError::Request(subscribe.into()));
210210
}
211211
self.request_tx
212-
.send_async((subscribe.into(), Some(promise_tx)))
212+
.send_async(Pending::new(subscribe.into(), promise_tx))
213213
.await?;
214214

215215
Ok(promise)
@@ -226,7 +226,7 @@ impl AsyncClient {
226226
return Err(ClientError::TryRequest(subscribe.into()));
227227
}
228228
self.request_tx
229-
.try_send((subscribe.into(), Some(promise_tx)))?;
229+
.try_send(Pending::new(subscribe.into(), promise_tx))?;
230230

231231
Ok(promise)
232232
}
@@ -236,7 +236,7 @@ impl AsyncClient {
236236
let (promise_tx, promise) = PromiseTx::new();
237237
let unsubscribe = Unsubscribe::new(topic.into());
238238
self.request_tx
239-
.send_async((unsubscribe.into(), Some(promise_tx)))
239+
.send_async(Pending::new(unsubscribe.into(), promise_tx))
240240
.await?;
241241

242242
Ok(promise)
@@ -247,23 +247,25 @@ impl AsyncClient {
247247
let (promise_tx, promise) = PromiseTx::new();
248248
let unsubscribe = Unsubscribe::new(topic.into());
249249
self.request_tx
250-
.try_send((unsubscribe.into(), Some(promise_tx)))?;
250+
.try_send(Pending::new(unsubscribe.into(), promise_tx))?;
251251

252252
Ok(promise)
253253
}
254254

255255
/// Sends a MQTT disconnect to the `EventLoop`
256256
pub async fn disconnect(&self) -> Result<(), ClientError> {
257257
let request = Request::Disconnect(Disconnect);
258-
self.request_tx.send_async((request, None)).await?;
258+
self.request_tx
259+
.send_async(Pending::no_promises(request))
260+
.await?;
259261

260262
Ok(())
261263
}
262264

263265
/// Attempts to send a MQTT disconnect to the `EventLoop`
264266
pub fn try_disconnect(&self) -> Result<(), ClientError> {
265267
let request = Request::Disconnect(Disconnect);
266-
self.request_tx.try_send((request, None))?;
268+
self.request_tx.try_send(Pending::no_promises(request))?;
267269

268270
Ok(())
269271
}
@@ -313,7 +315,7 @@ impl Client {
313315
///
314316
/// This is mostly useful for creating a test instance where you can
315317
/// listen on the corresponding receiver.
316-
pub fn from_sender(request_tx: Sender<(Request, Option<PromiseTx>)>) -> Client {
318+
pub fn from_sender(request_tx: Sender<Pending<Request>>) -> Client {
317319
Client {
318320
client: AsyncClient::from_senders(request_tx),
319321
}
@@ -335,11 +337,13 @@ impl Client {
335337
let topic = topic.into();
336338
let mut publish = Publish::new(&topic, qos, payload);
337339
publish.retain = retain;
338-
let publish = Request::Publish(publish);
340+
let request = Request::Publish(publish);
339341
if !valid_topic(&topic) {
340-
return Err(ClientError::Request(publish));
342+
return Err(ClientError::Request(request));
341343
}
342-
self.client.request_tx.send((publish, Some(promise_tx)))?;
344+
self.client
345+
.request_tx
346+
.send(Pending::new(request, promise_tx))?;
343347

344348
Ok(promise)
345349
}
@@ -363,7 +367,7 @@ impl Client {
363367
let ack = get_ack_req(publish);
364368

365369
if let Some(ack) = ack {
366-
self.client.request_tx.send((ack, None))?;
370+
self.client.request_tx.send(Pending::no_promises(ack))?;
367371
}
368372

369373
Ok(())
@@ -387,7 +391,7 @@ impl Client {
387391
}
388392
self.client
389393
.request_tx
390-
.send((subscribe.into(), Some(promise_tx)))?;
394+
.send(Pending::new(subscribe.into(), promise_tx))?;
391395

392396
Ok(promise)
393397
}
@@ -413,7 +417,7 @@ impl Client {
413417
}
414418
self.client
415419
.request_tx
416-
.send((subscribe.into(), Some(promise_tx)))?;
420+
.send(Pending::new(subscribe.into(), promise_tx))?;
417421

418422
Ok(promise)
419423
}
@@ -429,8 +433,9 @@ impl Client {
429433
pub fn unsubscribe<S: Into<String>>(&self, topic: S) -> Result<AckPromise, ClientError> {
430434
let (promise_tx, promise) = PromiseTx::new();
431435
let unsubscribe = Unsubscribe::new(topic.into());
432-
let request = Request::Unsubscribe(unsubscribe);
433-
self.client.request_tx.send((request, Some(promise_tx)))?;
436+
self.client
437+
.request_tx
438+
.send(Pending::new(unsubscribe.into(), promise_tx))?;
434439

435440
Ok(promise)
436441
}
@@ -444,7 +449,9 @@ impl Client {
444449
pub fn disconnect(&self) -> Result<AckPromise, ClientError> {
445450
let (promise_tx, promise) = PromiseTx::new();
446451
let request = Request::Disconnect(Disconnect);
447-
self.client.request_tx.send((request, Some(promise_tx)))?;
452+
self.client
453+
.request_tx
454+
.send(Pending::new(request, promise_tx))?;
448455

449456
Ok(promise)
450457
}

rumqttc/src/eventloop.rs

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{framed::Network, Transport};
2-
use crate::{Incoming, MqttState, NetworkOptions, Packet, PromiseTx, Request, StateError};
2+
use crate::{Incoming, MqttState, NetworkOptions, Packet, Pending, Request, StateError};
33
use crate::{MqttOptions, Outgoing};
44

55
use crate::framed::AsyncReadWrite;
@@ -75,11 +75,11 @@ pub struct EventLoop {
7575
/// Current state of the connection
7676
pub state: MqttState,
7777
/// Request stream
78-
requests_rx: Receiver<(Request, Option<PromiseTx>)>,
78+
requests_rx: Receiver<Pending<Request>>,
7979
/// Requests handle to send requests
80-
pub(crate) requests_tx: Sender<(Request, Option<PromiseTx>)>,
80+
pub(crate) requests_tx: Sender<Pending<Request>>,
8181
/// Pending packets from last session
82-
pub pending: VecDeque<(Request, Option<PromiseTx>)>,
82+
pub pending: VecDeque<Pending<Request>>,
8383
/// Network connection to the broker
8484
pub network: Option<Network>,
8585
/// Keep alive time
@@ -132,7 +132,7 @@ impl EventLoop {
132132
// drain requests from channel which weren't yet received
133133
let mut requests_in_channel: Vec<_> = self.requests_rx.drain().collect();
134134

135-
requests_in_channel.retain(|(request, _)| {
135+
requests_in_channel.retain(|Pending { request, .. }| {
136136
match request {
137137
Request::PubAck(_) => false, // Wait for publish retransmission, else the broker could be confused by an unexpected ack
138138
_ => true,
@@ -184,10 +184,10 @@ impl EventLoop {
184184
/// Select on network and requests and generate keepalive pings when necessary
185185
async fn select(&mut self) -> Result<Event, ConnectionError> {
186186
let network = self.network.as_mut().unwrap();
187+
let network_timeout = Duration::from_secs(self.network_options.connection_timeout());
187188
// let await_acks = self.state.await_acks;
188189
let inflight_full = self.state.inflight >= self.mqtt_options.inflight;
189190
let collision = self.state.collision.is_some();
190-
let network_timeout = Duration::from_secs(self.network_options.connection_timeout());
191191

192192
// Read buffered events from previous polls before calling a new poll
193193
if let Some(event) = self.state.events.pop_front() {
@@ -241,8 +241,8 @@ impl EventLoop {
241241
&self.requests_rx,
242242
self.mqtt_options.pending_throttle
243243
), if !self.pending.is_empty() || (!inflight_full && !collision) => match o {
244-
Ok((request, tx)) => {
245-
if let Some(outgoing) = self.state.handle_outgoing_packet(request, tx)? {
244+
Ok(pending) => {
245+
if let Some(outgoing) = self.state.handle_outgoing_packet(pending)? {
246246
network.write(outgoing).await?;
247247
}
248248
match time::timeout(network_timeout, network.flush()).await {
@@ -257,21 +257,37 @@ impl EventLoop {
257257
// simple. We can change this behavior in future if necessary (to prevent extra pings)
258258
_ = self.keepalive_timeout.as_mut().unwrap_or(&mut no_sleep),
259259
if self.keepalive_timeout.is_some() && !self.mqtt_options.keep_alive.is_zero() => {
260-
let timeout = self.keepalive_timeout.as_mut().unwrap();
261-
timeout.as_mut().reset(Instant::now() + self.mqtt_options.keep_alive);
262-
263-
if let Some(outgoing) = self.state.handle_outgoing_packet(Request::PingReq(PingReq), None)? {
264-
network.write(outgoing).await?;
265-
}
266-
match time::timeout(network_timeout, network.flush()).await {
267-
Ok(inner) => inner?,
268-
Err(_)=> return Err(ConnectionError::FlushTimeout),
269-
};
260+
self.handle_timeout().await?;
270261
Ok(self.state.events.pop_front().unwrap())
271262
}
272263
}
273264
}
274265

266+
async fn handle_timeout(&mut self) -> Result<(), ConnectionError> {
267+
let network = self.network.as_mut().unwrap();
268+
let network_timeout = Duration::from_secs(self.network_options.connection_timeout());
269+
270+
// Set to next timeout instant
271+
self.keepalive_timeout
272+
.as_mut()
273+
.unwrap()
274+
.as_mut()
275+
.reset(Instant::now() + self.mqtt_options.keep_alive);
276+
277+
if let Some(outgoing) = self
278+
.state
279+
.handle_outgoing_packet(Pending::no_promises(Request::PingReq(PingReq)))?
280+
{
281+
network.write(outgoing).await?;
282+
}
283+
match time::timeout(network_timeout, network.flush()).await {
284+
Ok(inner) => inner?,
285+
Err(_) => return Err(ConnectionError::FlushTimeout),
286+
};
287+
288+
Ok(())
289+
}
290+
275291
pub fn network_options(&self) -> NetworkOptions {
276292
self.network_options.clone()
277293
}
@@ -282,10 +298,10 @@ impl EventLoop {
282298
}
283299

284300
async fn next_request(
285-
pending: &mut VecDeque<(Request, Option<PromiseTx>)>,
286-
rx: &Receiver<(Request, Option<PromiseTx>)>,
301+
pending: &mut VecDeque<Pending<Request>>,
302+
rx: &Receiver<Pending<Request>>,
287303
pending_throttle: Duration,
288-
) -> Result<(Request, Option<PromiseTx>), ConnectionError> {
304+
) -> Result<Pending<Request>, ConnectionError> {
289305
if !pending.is_empty() {
290306
time::sleep(pending_throttle).await;
291307
// We must call .pop_front() AFTER sleep() otherwise we would have

0 commit comments

Comments
 (0)