@@ -16,6 +16,84 @@ use futures_util::FutureExt;
16
16
use tokio:: runtime:: { self , Runtime } ;
17
17
use tokio:: time:: timeout;
18
18
19
+ /// An error returned when a topic string fails validation against the MQTT specification.
20
+ #[ derive( Debug , thiserror:: Error , Clone , PartialEq , Eq ) ]
21
+ #[ error( "Invalid MQTT topic: '{0}'" ) ]
22
+ pub struct InvalidTopic ( String ) ;
23
+
24
+ /// A newtype wrapper that guarantees its inner `String` is a valid MQTT topic.
25
+ ///
26
+ /// This type prevents the cost of repeated validation for topics that are used
27
+ /// frequently. It can only be constructed via [`ValidatedTopic::new`], which
28
+ /// performs a one-time validation check.
29
+ #[ derive( Debug , Clone , PartialEq ) ]
30
+ pub struct ValidatedTopic ( String ) ;
31
+
32
+ impl ValidatedTopic {
33
+ /// Constructs a new `ValidatedTopic` after validating the input string.
34
+ ///
35
+ /// # Errors
36
+ ///
37
+ /// Returns [`InvalidTopic`] if the topic string does not conform to the MQTT specification.
38
+ pub fn new < S : Into < String > > ( topic : S ) -> Result < Self , InvalidTopic > {
39
+ let topic_string = topic. into ( ) ;
40
+ if valid_topic ( & topic_string) {
41
+ Ok ( Self ( topic_string) )
42
+ } else {
43
+ Err ( InvalidTopic ( topic_string) )
44
+ }
45
+ }
46
+ }
47
+
48
+ impl From < ValidatedTopic > for String {
49
+ fn from ( topic : ValidatedTopic ) -> Self {
50
+ topic. 0
51
+ }
52
+ }
53
+
54
+ /// A private module to seal the [`Topic`] trait.
55
+ /// Sealing the trait prevents users from implementing [`Topic`]
56
+ /// for their own type, which would circumvent validation
57
+ mod private {
58
+ use super :: ValidatedTopic ;
59
+ pub trait Sealed { }
60
+ impl Sealed for ValidatedTopic { }
61
+ impl Sealed for String { }
62
+ impl < ' a > Sealed for & ' a str { }
63
+ }
64
+
65
+ /// Abstracts over topic types for publishing (as opposed to filters).
66
+ ///
67
+ /// This sealed trait is implemented for string types (`String`, `&str`) and
68
+ /// for [`ValidatedTopic`]. It allows client methods to efficiently handle
69
+ /// both pre-validated and unvalidated topic inputs.
70
+ pub trait Topic : private:: Sealed {
71
+ /// Indicates whether the topic requires validation.
72
+ const NEEDS_VALIDATION : bool ;
73
+ fn into_string ( self ) -> String ;
74
+ }
75
+
76
+ impl Topic for ValidatedTopic {
77
+ const NEEDS_VALIDATION : bool = false ;
78
+ fn into_string ( self ) -> String {
79
+ self . 0
80
+ }
81
+ }
82
+
83
+ impl Topic for String {
84
+ const NEEDS_VALIDATION : bool = true ;
85
+ fn into_string ( self ) -> String {
86
+ self
87
+ }
88
+ }
89
+
90
+ impl < ' a > Topic for & ' a str {
91
+ const NEEDS_VALIDATION : bool = true ;
92
+ fn into_string ( self ) -> String {
93
+ self . to_owned ( )
94
+ }
95
+ }
96
+
19
97
/// Client Error
20
98
#[ derive( Debug , thiserror:: Error ) ]
21
99
pub enum ClientError {
@@ -80,16 +158,21 @@ impl AsyncClient {
80
158
properties : Option < PublishProperties > ,
81
159
) -> Result < ( ) , ClientError >
82
160
where
83
- S : Into < String > ,
161
+ S : Topic ,
84
162
P : Into < Bytes > ,
85
163
{
86
- let topic = topic. into ( ) ;
87
- let mut publish = Publish :: new ( & topic, qos, payload, properties) ;
164
+ let topic = topic. into_string ( ) ;
165
+ let mut publish = Publish :: new ( topic. as_str ( ) , qos, payload, properties) ;
88
166
publish. retain = retain;
89
167
let publish = Request :: Publish ( publish) ;
90
- if !valid_topic ( & topic) {
168
+
169
+ // This is zero-cost for `ValidatedTopic`,
170
+ // `S::NEEDS_VALIDATION` is false, and the entire conditional is
171
+ // removed.
172
+ if S :: NEEDS_VALIDATION && !valid_topic ( & topic) {
91
173
return Err ( ClientError :: Request ( publish) ) ;
92
174
}
175
+
93
176
self . request_tx . send_async ( publish) . await ?;
94
177
Ok ( ( ) )
95
178
}
@@ -103,7 +186,7 @@ impl AsyncClient {
103
186
properties : PublishProperties ,
104
187
) -> Result < ( ) , ClientError >
105
188
where
106
- S : Into < String > ,
189
+ S : Topic ,
107
190
P : Into < Bytes > ,
108
191
{
109
192
self . handle_publish ( topic, qos, retain, payload, Some ( properties) )
@@ -118,7 +201,7 @@ impl AsyncClient {
118
201
payload : P ,
119
202
) -> Result < ( ) , ClientError >
120
203
where
121
- S : Into < String > ,
204
+ S : Topic ,
122
205
P : Into < Bytes > ,
123
206
{
124
207
self . handle_publish ( topic, qos, retain, payload, None ) . await
@@ -134,16 +217,18 @@ impl AsyncClient {
134
217
properties : Option < PublishProperties > ,
135
218
) -> Result < ( ) , ClientError >
136
219
where
137
- S : Into < String > ,
220
+ S : Topic ,
138
221
P : Into < Bytes > ,
139
222
{
140
- let topic = topic. into ( ) ;
141
- let mut publish = Publish :: new ( & topic, qos, payload, properties) ;
223
+ let topic = topic. into_string ( ) ;
224
+ let mut publish = Publish :: new ( topic. as_str ( ) , qos, payload, properties) ;
142
225
publish. retain = retain;
143
226
let publish = Request :: Publish ( publish) ;
144
- if !valid_topic ( & topic) {
227
+
228
+ if S :: NEEDS_VALIDATION && !valid_topic ( & topic) {
145
229
return Err ( ClientError :: TryRequest ( publish) ) ;
146
230
}
231
+
147
232
self . request_tx . try_send ( publish) ?;
148
233
Ok ( ( ) )
149
234
}
@@ -157,7 +242,7 @@ impl AsyncClient {
157
242
properties : PublishProperties ,
158
243
) -> Result < ( ) , ClientError >
159
244
where
160
- S : Into < String > ,
245
+ S : Topic ,
161
246
P : Into < Bytes > ,
162
247
{
163
248
self . handle_try_publish ( topic, qos, retain, payload, Some ( properties) )
@@ -171,7 +256,7 @@ impl AsyncClient {
171
256
payload : P ,
172
257
) -> Result < ( ) , ClientError >
173
258
where
174
- S : Into < String > ,
259
+ S : Topic ,
175
260
P : Into < Bytes > ,
176
261
{
177
262
self . handle_try_publish ( topic, qos, retain, payload, None )
@@ -505,17 +590,19 @@ impl Client {
505
590
properties : Option < PublishProperties > ,
506
591
) -> Result < ( ) , ClientError >
507
592
where
508
- S : Into < String > ,
593
+ S : Topic ,
509
594
P : Into < Bytes > ,
510
595
{
511
- let topic = topic. into ( ) ;
512
- let mut publish = Publish :: new ( & topic, qos, payload, properties) ;
596
+ let topic = topic. into_string ( ) ;
597
+ let mut publish = Publish :: new ( topic. as_str ( ) , qos, payload, properties) ;
513
598
publish. retain = retain;
514
- let publish = Request :: Publish ( publish) ;
515
- if !valid_topic ( & topic) {
516
- return Err ( ClientError :: Request ( publish) ) ;
599
+ let request = Request :: Publish ( publish) ;
600
+
601
+ if S :: NEEDS_VALIDATION && !valid_topic ( & topic) {
602
+ return Err ( ClientError :: Request ( request) ) ;
517
603
}
518
- self . client . request_tx . send ( publish) ?;
604
+
605
+ self . client . request_tx . send ( request) ?;
519
606
Ok ( ( ) )
520
607
}
521
608
@@ -528,7 +615,7 @@ impl Client {
528
615
properties : PublishProperties ,
529
616
) -> Result < ( ) , ClientError >
530
617
where
531
- S : Into < String > ,
618
+ S : Topic ,
532
619
P : Into < Bytes > ,
533
620
{
534
621
self . handle_publish ( topic, qos, retain, payload, Some ( properties) )
@@ -542,7 +629,7 @@ impl Client {
542
629
payload : P ,
543
630
) -> Result < ( ) , ClientError >
544
631
where
545
- S : Into < String > ,
632
+ S : Topic ,
546
633
P : Into < Bytes > ,
547
634
{
548
635
self . handle_publish ( topic, qos, retain, payload, None )
@@ -557,7 +644,7 @@ impl Client {
557
644
properties : PublishProperties ,
558
645
) -> Result < ( ) , ClientError >
559
646
where
560
- S : Into < String > ,
647
+ S : Topic ,
561
648
P : Into < Bytes > ,
562
649
{
563
650
self . client
@@ -572,7 +659,7 @@ impl Client {
572
659
payload : P ,
573
660
) -> Result < ( ) , ClientError >
574
661
where
575
- S : Into < String > ,
662
+ S : Topic ,
576
663
P : Into < Bytes > ,
577
664
{
578
665
self . client . try_publish ( topic, qos, retain, payload)
@@ -896,4 +983,31 @@ mod test {
896
983
. expect ( "Should be able to publish" ) ;
897
984
let _ = rx. try_recv ( ) . expect ( "Should have message" ) ;
898
985
}
986
+
987
+ #[ test]
988
+ fn can_publish_with_validated_topic ( ) {
989
+ let ( tx, rx) = flume:: bounded ( 1 ) ;
990
+ let client = Client :: from_sender ( tx) ;
991
+ let valid_topic = ValidatedTopic :: new ( "hello/world" ) . unwrap ( ) ;
992
+ client
993
+ . publish ( valid_topic, QoS :: ExactlyOnce , false , "good bye" )
994
+ . expect ( "Should be able to publish" ) ;
995
+ let _ = rx. try_recv ( ) . expect ( "Should have message" ) ;
996
+ }
997
+
998
+ #[ test]
999
+ fn validated_topic_ergonomics ( ) {
1000
+ let valid_topic = ValidatedTopic :: new ( "hello/world" ) . unwrap ( ) ;
1001
+ let valid_topic_can_be_cloned = valid_topic. clone ( ) ;
1002
+ // ValidatedTopic can be compared
1003
+ assert_eq ! ( valid_topic, valid_topic_can_be_cloned) ;
1004
+ }
1005
+
1006
+ #[ test]
1007
+ fn creating_invalid_validated_topic_fails ( ) {
1008
+ assert_eq ! (
1009
+ ValidatedTopic :: new( "a/+/b" ) ,
1010
+ Err ( InvalidTopic ( "a/+/b" . to_string( ) ) )
1011
+ ) ;
1012
+ }
899
1013
}
0 commit comments