1- mod envelope;
2- mod state;
1+ pub ( crate ) mod envelope;
2+ pub ( crate ) mod state;
33
44pub mod message;
5- pub mod traits;
65
76use std:: sync:: atomic:: AtomicBool ;
87use std:: sync:: Arc ;
@@ -11,14 +10,14 @@ use async_channel::{unbounded, Receiver, Sender};
1110
1211use crate :: error:: { BastionError , Result } ;
1312use crate :: mailbox:: envelope:: Envelope ;
13+ use crate :: mailbox:: message:: Message ;
1414use crate :: mailbox:: state:: MailboxState ;
15- use crate :: mailbox:: traits:: TypedMessage ;
1615
1716/// Struct that represents a message sender.
1817#[ derive( Clone ) ]
1918pub struct MailboxTx < T >
2019where
21- T : TypedMessage ,
20+ T : Message ,
2221{
2322 /// Indicated the transmitter part of the actor's channel
2423 /// which is using for passing messages.
3029
3130impl < T > MailboxTx < T >
3231where
33- T : TypedMessage ,
32+ T : Message ,
3433{
3534 /// Return a new instance of MailboxTx that indicates sender.
3635 pub ( crate ) fn new ( tx : Sender < Envelope < T > > ) -> Self {
@@ -57,110 +56,66 @@ where
5756#[ derive( Clone ) ]
5857pub struct Mailbox < T >
5958where
60- T : TypedMessage ,
59+ T : Message ,
6160{
62- /// User guardian sender
63- user_tx : MailboxTx < T > ,
64- /// User guardian receiver
65- user_rx : Receiver < Envelope < T > > ,
61+ /// Actor guardian sender
62+ actor_tx : MailboxTx < T > ,
63+ /// Actor guardian receiver
64+ actor_rx : Receiver < Envelope < T > > ,
6665 /// System guardian receiver
6766 system_rx : Receiver < Envelope < T > > ,
68- /// The current processing message, received from the
69- /// latest call to the user's queue
70- last_user_message : Option < Envelope < T > > ,
71- /// The current processing message, received from the
72- /// latest call to the system's queue
73- last_system_message : Option < Envelope < T > > ,
7467 /// Mailbox state machine
7568 state : Arc < MailboxState > ,
7669}
7770
7871// TODO: Add calls with recv with timeout
7972impl < T > Mailbox < T >
8073where
81- T : TypedMessage ,
74+ T : Message ,
8275{
8376 /// Creates a new mailbox for the actor.
8477 pub ( crate ) fn new ( system_rx : Receiver < Envelope < T > > ) -> Self {
85- let ( tx, user_rx) = unbounded ( ) ;
86- let user_tx = MailboxTx :: new ( tx) ;
87- let last_user_message = None ;
88- let last_system_message = None ;
78+ let ( tx, actor_rx) = unbounded ( ) ;
79+ let actor_tx = MailboxTx :: new ( tx) ;
8980 let state = Arc :: new ( MailboxState :: new ( ) ) ;
9081
9182 Mailbox {
92- user_tx ,
93- user_rx ,
83+ actor_tx ,
84+ actor_rx ,
9485 system_rx,
95- last_user_message,
96- last_system_message,
9786 state,
9887 }
9988 }
10089
101- /// Forced receive message from user queue
90+ /// Forced receive message from the actor's queue.
10291 pub async fn recv ( & mut self ) -> Envelope < T > {
103- let message = self
104- . user_rx
92+ self . actor_rx
10593 . recv ( )
10694 . await
10795 . map_err ( |e| BastionError :: ChanRecv ( e. to_string ( ) ) )
108- . unwrap ( ) ;
109-
110- self . last_user_message = Some ( message) ;
111- self . last_user_message . clone ( ) . unwrap ( )
96+ . unwrap ( )
11297 }
11398
114- /// Try receiving message from user queue
99+ /// Try receiving message from the actor's queue.
115100 pub async fn try_recv ( & mut self ) -> Result < Envelope < T > > {
116- if self . last_user_message . is_some ( ) {
117- return Err ( BastionError :: UnackedMessage ) ;
118- }
119-
120- match self . user_rx . try_recv ( ) {
121- Ok ( message) => {
122- self . last_user_message = Some ( message) ;
123- Ok ( self . last_user_message . clone ( ) . unwrap ( ) )
124- }
125- Err ( e) => Err ( BastionError :: ChanRecv ( e. to_string ( ) ) ) ,
126- }
101+ self . actor_rx
102+ . try_recv ( )
103+ . map_err ( |e| BastionError :: ChanRecv ( e. to_string ( ) ) )
127104 }
128105
129- /// Forced receive message from system queue
106+ /// Forced receive message from the internal system queue.
130107 pub async fn sys_recv ( & mut self ) -> Envelope < T > {
131- let message = self
132- . system_rx
108+ self . system_rx
133109 . recv ( )
134110 . await
135111 . map_err ( |e| BastionError :: ChanRecv ( e. to_string ( ) ) )
136- . unwrap ( ) ;
137-
138- self . last_system_message = Some ( message) ;
139- self . last_system_message . clone ( ) . unwrap ( )
112+ . unwrap ( )
140113 }
141114
142- /// Try receiving message from system queue
115+ /// Try receiving message from the internal system queue.
143116 pub async fn try_sys_recv ( & mut self ) -> Result < Envelope < T > > {
144- if self . last_system_message . is_some ( ) {
145- return Err ( BastionError :: UnackedMessage ) ;
146- }
147-
148- match self . system_rx . try_recv ( ) {
149- Ok ( message) => {
150- self . last_system_message = Some ( message) ;
151- Ok ( self . last_system_message . clone ( ) . unwrap ( ) )
152- }
153- Err ( e) => Err ( BastionError :: ChanRecv ( e. to_string ( ) ) ) ,
154- }
155- }
156-
157- /// Returns the last retrieved message from the user channel
158- pub async fn get_last_user_message ( & self ) -> Option < Envelope < T > > {
159- self . last_user_message . clone ( )
160- }
161-
162- /// Returns the last retrieved message from the system channel
163- pub async fn get_last_system_message ( & self ) -> Option < Envelope < T > > {
164- self . last_system_message . clone ( )
117+ self . system_rx
118+ . try_recv ( )
119+ . map_err ( |e| BastionError :: ChanRecv ( e. to_string ( ) ) )
165120 }
166121}
0 commit comments