Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ To update your code simply remove `Key::ECC()` or `Key::RSA()` from the initiali
`rusttls-pemfile` to `2.0.0`, `async-tungstenite` to `0.24.0`, `ws_stream_tungstenite` to `0.12.0`
and `http` to `1.0.0`. This is a breaking change as types from some of these crates are part of
the public API.
- `publish` / `subscribe` / `unsubscribe` methods on `AsyncClient` and `Client` now return a `PkidPromise` which resolves into the identifier value chosen by the `EventLoop` when handling the packet.
- `publish` / `subscribe` / `unsubscribe` methods on `AsyncClient` and `Client` now return a `NoticeFuture` which is noticed after the packet is released (sent in QoS0, ACKed in QoS1, COMPed in QoS2).

### Deprecated

Expand Down
1 change: 1 addition & 0 deletions rumqttc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ bytes = "1.5"
log = "0.4"
flume = { version = "0.11", default-features = false, features = ["async"] }
thiserror = "1"
linked-hash-map = "0.5"

# Optional
# rustls
Expand Down
65 changes: 65 additions & 0 deletions rumqttc/examples/ack_notif_v5.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use tokio::task::{self, JoinSet};

use rumqttc::v5::{AsyncClient, MqttOptions, mqttbytes::QoS};
use std::error::Error;
use std::time::Duration;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// color_backtrace::install();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));

let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
task::spawn(async move {
loop {
let event = eventloop.poll().await;
match &event {
Ok(v) => {
println!("Event = {v:?}");
}
Err(e) => {
println!("Error = {e:?}");
}
}
}
});

// Subscribe and wait for broker acknowledgement
client
.subscribe("hello/world", QoS::AtMostOnce)
.await
.unwrap()
.wait_async()
.await
.unwrap();

// Publish and spawn wait for notification
let mut set = JoinSet::new();

let future = client
.publish("hello/world", QoS::AtMostOnce, false, vec![1; 1024])
.await
.unwrap();
set.spawn(future.wait_async());

let future = client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; 1024])
.await
.unwrap();
set.spawn(future.wait_async());

let future = client
.publish("hello/world", QoS::ExactlyOnce, false, vec![1; 1024])
.await
.unwrap();
set.spawn(future.wait_async());

while let Some(res) = set.join_next().await {
println!("Acknoledged = {:?}", res?);
}

Ok(())
}
70 changes: 0 additions & 70 deletions rumqttc/examples/pkid_promise.rs

This file was deleted.

70 changes: 0 additions & 70 deletions rumqttc/examples/pkid_promise_v5.rs

This file was deleted.

22 changes: 16 additions & 6 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{framed::Network, Transport};
use crate::{Incoming, MqttState, NetworkOptions, Packet, Request, StateError};
use crate::{MqttOptions, Outgoing};
use crate::NoticeError;

use crate::framed::AsyncReadWrite;
use crate::mqttbytes::v4::*;
Expand Down Expand Up @@ -149,13 +150,22 @@ impl EventLoop {
Ok(inner) => inner?,
Err(_) => return Err(ConnectionError::NetworkTimeout),
};
// Last session might contain packets which aren't acked. If it's a new session, clear the pending packets.
if !connack.session_present {
for request in self.pending.drain(..) {
// If the request is a publish request, send an error to the future that is waiting for the ack.
if let Request::Publish(Some(tx), _) = request {
tx.error(NoticeError::SessionReset)
}
}
}
self.network = Some(network);

if self.keepalive_timeout.is_none() && !self.mqtt_options.keep_alive.is_zero() {
self.keepalive_timeout = Some(Box::pin(time::sleep(self.mqtt_options.keep_alive)));
}

return Ok(Event::Incoming(connack));
return Ok(Event::Incoming(Packet::ConnAck(connack)));
}

match self.select().await {
Expand Down Expand Up @@ -294,14 +304,14 @@ impl EventLoop {
async fn connect(
mqtt_options: &MqttOptions,
network_options: NetworkOptions,
) -> Result<(Network, Incoming), ConnectionError> {
) -> Result<(Network, ConnAck), ConnectionError> {
// connect to the broker
let mut network = network_connect(mqtt_options, network_options).await?;

// make MQTT connection request (which internally awaits for ack)
let packet = mqtt_connect(mqtt_options, &mut network).await?;
let connack = mqtt_connect(mqtt_options, &mut network).await?;

Ok((network, packet))
Ok((network, connack))
}

pub(crate) async fn socket_connect(
Expand Down Expand Up @@ -469,7 +479,7 @@ async fn network_connect(
async fn mqtt_connect(
options: &MqttOptions,
network: &mut Network,
) -> Result<Incoming, ConnectionError> {
) -> Result<ConnAck, ConnectionError> {
let keep_alive = options.keep_alive().as_secs() as u16;
let clean_session = options.clean_session();
let last_will = options.last_will();
Expand All @@ -486,7 +496,7 @@ async fn mqtt_connect(
// validate connack
match network.read().await? {
Incoming::ConnAck(connack) if connack.code == ConnectReturnCode::Success => {
Ok(Packet::ConnAck(connack))
Ok(connack)
}
Incoming::ConnAck(connack) => Err(ConnectionError::ConnectionRefused(connack.code)),
packet => Err(ConnectionError::NotConnAck(packet)),
Expand Down
14 changes: 12 additions & 2 deletions rumqttc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ pub use proxy::{Proxy, ProxyAuth, ProxyType};

pub type Incoming = Packet;

use v5::mqttbytes::v5::{SubscribeReasonCode as V5SubscribeReasonCode, UnsubAckReason};
use v5::mqttbytes::v5::{SubscribeReasonCode as V5SubscribeReasonCode,
UnsubAckReason,
PubAckReason, PubRecReason, PubCompReason};

#[derive(Debug, thiserror::Error)]
pub enum NoticeError {
Expand All @@ -168,6 +170,14 @@ pub enum NoticeError {
V5Subscribe(V5SubscribeReasonCode),
#[error(" v5 Unsubscription Failure Reason: {0:?}")]
V5Unsubscribe(UnsubAckReason),
#[error(" v5 Publish Ack Failure Reason Code: {0:?}")]
V5PubAck(PubAckReason),
#[error(" v5 Publish Rec Failure Reason Code: {0:?}")]
V5PubRec(PubRecReason),
#[error(" v5 Publish Comp Failure Reason Code: {0:?}")]
V5PubComp(PubCompReason),
#[error(" Dropped due to session reconnect with previous state expire/lost")]
SessionReset,
}

impl From<oneshot::error::RecvError> for NoticeError {
Expand Down Expand Up @@ -245,7 +255,7 @@ pub enum Request {
PubAck(PubAck),
PubRec(PubRec),
PubComp(PubComp),
PubRel(Option<NoticeTx>, PubRel),
PubRel(PubRel),
PingReq(PingReq),
PingResp(PingResp),
Subscribe(Option<NoticeTx>, Subscribe),
Expand Down
Loading