Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 137 additions & 23 deletions rumqttc/src/v5/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,84 @@ use futures_util::FutureExt;
use tokio::runtime::{self, Runtime};
use tokio::time::timeout;

/// An error returned when a topic string fails validation against the MQTT specification.
#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
#[error("Invalid MQTT topic: '{0}'")]
pub struct InvalidTopic(String);

/// A newtype wrapper that guarantees its inner `String` is a valid MQTT topic.
///
/// This type prevents the cost of repeated validation for topics that are used
/// frequently. It can only be constructed via [`ValidatedTopic::new`], which
/// performs a one-time validation check.
#[derive(Debug, Clone, PartialEq)]
pub struct ValidatedTopic(String);

impl ValidatedTopic {
/// Constructs a new `ValidatedTopic` after validating the input string.
///
/// # Errors
///
/// Returns [`InvalidTopic`] if the topic string does not conform to the MQTT specification.
pub fn new<S: Into<String>>(topic: S) -> Result<Self, InvalidTopic> {
let topic_string = topic.into();
if valid_topic(&topic_string) {
Ok(Self(topic_string))
} else {
Err(InvalidTopic(topic_string))
}
}
}

impl From<ValidatedTopic> for String {
fn from(topic: ValidatedTopic) -> Self {
topic.0
}
}

/// A private module to seal the [`Topic`] trait.
/// Sealing the trait prevents users from implementing [`Topic`]
/// for their own type, which would circumvent validation
mod private {
use super::ValidatedTopic;
pub trait Sealed {}
impl Sealed for ValidatedTopic {}
impl Sealed for String {}
impl<'a> Sealed for &'a str {}
}

/// Abstracts over topic types for publishing (as opposed to filters).
///
/// This sealed trait is implemented for string types (`String`, `&str`) and
/// for [`ValidatedTopic`]. It allows client methods to efficiently handle
/// both pre-validated and unvalidated topic inputs.
pub trait Topic: private::Sealed {
/// Indicates whether the topic requires validation.
const NEEDS_VALIDATION: bool;
fn into_string(self) -> String;
}

impl Topic for ValidatedTopic {
const NEEDS_VALIDATION: bool = false;
fn into_string(self) -> String {
self.0
}
}

impl Topic for String {
const NEEDS_VALIDATION: bool = true;
fn into_string(self) -> String {
self
}
}

impl<'a> Topic for &'a str {
const NEEDS_VALIDATION: bool = true;
fn into_string(self) -> String {
self.to_owned()
}
}

/// Client Error
#[derive(Debug, thiserror::Error)]
pub enum ClientError {
Expand Down Expand Up @@ -80,16 +158,21 @@ impl AsyncClient {
properties: Option<PublishProperties>,
) -> Result<(), ClientError>
where
S: Into<String>,
S: Topic,
P: Into<Bytes>,
{
let topic = topic.into();
let mut publish = Publish::new(&topic, qos, payload, properties);
let topic = topic.into_string();
let mut publish = Publish::new(topic.as_str(), qos, payload, properties);
publish.retain = retain;
let publish = Request::Publish(publish);
if !valid_topic(&topic) {

// This is zero-cost for `ValidatedTopic`,
// `S::NEEDS_VALIDATION` is false, and the entire conditional is
// removed.
if S::NEEDS_VALIDATION && !valid_topic(&topic) {
return Err(ClientError::Request(publish));
}

self.request_tx.send_async(publish).await?;
Ok(())
}
Expand All @@ -103,7 +186,7 @@ impl AsyncClient {
properties: PublishProperties,
) -> Result<(), ClientError>
where
S: Into<String>,
S: Topic,
P: Into<Bytes>,
{
self.handle_publish(topic, qos, retain, payload, Some(properties))
Expand All @@ -118,7 +201,7 @@ impl AsyncClient {
payload: P,
) -> Result<(), ClientError>
where
S: Into<String>,
S: Topic,
P: Into<Bytes>,
{
self.handle_publish(topic, qos, retain, payload, None).await
Expand All @@ -134,16 +217,18 @@ impl AsyncClient {
properties: Option<PublishProperties>,
) -> Result<(), ClientError>
where
S: Into<String>,
S: Topic,
P: Into<Bytes>,
{
let topic = topic.into();
let mut publish = Publish::new(&topic, qos, payload, properties);
let topic = topic.into_string();
let mut publish = Publish::new(topic.as_str(), qos, payload, properties);
publish.retain = retain;
let publish = Request::Publish(publish);
if !valid_topic(&topic) {

if S::NEEDS_VALIDATION && !valid_topic(&topic) {
return Err(ClientError::TryRequest(publish));
}

self.request_tx.try_send(publish)?;
Ok(())
}
Expand All @@ -157,7 +242,7 @@ impl AsyncClient {
properties: PublishProperties,
) -> Result<(), ClientError>
where
S: Into<String>,
S: Topic,
P: Into<Bytes>,
{
self.handle_try_publish(topic, qos, retain, payload, Some(properties))
Expand All @@ -171,7 +256,7 @@ impl AsyncClient {
payload: P,
) -> Result<(), ClientError>
where
S: Into<String>,
S: Topic,
P: Into<Bytes>,
{
self.handle_try_publish(topic, qos, retain, payload, None)
Expand Down Expand Up @@ -505,17 +590,19 @@ impl Client {
properties: Option<PublishProperties>,
) -> Result<(), ClientError>
where
S: Into<String>,
S: Topic,
P: Into<Bytes>,
{
let topic = topic.into();
let mut publish = Publish::new(&topic, qos, payload, properties);
let topic = topic.into_string();
let mut publish = Publish::new(topic.as_str(), qos, payload, properties);
publish.retain = retain;
let publish = Request::Publish(publish);
if !valid_topic(&topic) {
return Err(ClientError::Request(publish));
let request = Request::Publish(publish);

if S::NEEDS_VALIDATION && !valid_topic(&topic) {
return Err(ClientError::Request(request));
}
self.client.request_tx.send(publish)?;

self.client.request_tx.send(request)?;
Ok(())
}

Expand All @@ -528,7 +615,7 @@ impl Client {
properties: PublishProperties,
) -> Result<(), ClientError>
where
S: Into<String>,
S: Topic,
P: Into<Bytes>,
{
self.handle_publish(topic, qos, retain, payload, Some(properties))
Expand All @@ -542,7 +629,7 @@ impl Client {
payload: P,
) -> Result<(), ClientError>
where
S: Into<String>,
S: Topic,
P: Into<Bytes>,
{
self.handle_publish(topic, qos, retain, payload, None)
Expand All @@ -557,7 +644,7 @@ impl Client {
properties: PublishProperties,
) -> Result<(), ClientError>
where
S: Into<String>,
S: Topic,
P: Into<Bytes>,
{
self.client
Expand All @@ -572,7 +659,7 @@ impl Client {
payload: P,
) -> Result<(), ClientError>
where
S: Into<String>,
S: Topic,
P: Into<Bytes>,
{
self.client.try_publish(topic, qos, retain, payload)
Expand Down Expand Up @@ -896,4 +983,31 @@ mod test {
.expect("Should be able to publish");
let _ = rx.try_recv().expect("Should have message");
}

#[test]
fn can_publish_with_validated_topic() {
let (tx, rx) = flume::bounded(1);
let client = Client::from_sender(tx);
let valid_topic = ValidatedTopic::new("hello/world").unwrap();
client
.publish(valid_topic, QoS::ExactlyOnce, false, "good bye")
.expect("Should be able to publish");
let _ = rx.try_recv().expect("Should have message");
}

#[test]
fn validated_topic_ergonomics() {
let valid_topic = ValidatedTopic::new("hello/world").unwrap();
let valid_topic_can_be_cloned = valid_topic.clone();
// ValidatedTopic can be compared
assert_eq!(valid_topic, valid_topic_can_be_cloned);
}

#[test]
fn creating_invalid_validated_topic_fails() {
assert_eq!(
ValidatedTopic::new("a/+/b"),
Err(InvalidTopic("a/+/b".to_string()))
);
}
}