Skip to content

Commit a465169

Browse files
pzhan9facebook-github-bot
authored andcommitted
Add re-ordering buffer to the actor side (#1244)
Summary: In Rust V1, we want to move the re-ordering buffer from [comm actor](https://www.internalfb.com/code/fbsource/[aea72188b23ef1539c1a04996566a8f188e42644]/fbcode/monarch/hyperactor_mesh/src/comm.rs?lines=383-384) to the actor side. The basic idea is: * client assigns the seq no for messages it sends to actors; * the `(client_name, seq_no)` tuple is added to the message's header. * for direct send, client will add that header; * for multicast, a actor->seq map is passed to comm actors, and comm actor will add that header before delivering the message to actors. * Each actor instance maintains its own re-ordering buffer, and use the `seq_no` to buffer and re-order the message, before passing the message to the actor loop. This diff implements the last part, the re-ordering buffer in actor instance. Specifically, this diff wrap the re-ordering buffer under the `OrderChannel` abstraction, so it can fit to the current implementation seamlessly. The change is currently disabled behind the `ENABLE_CLIENT_SEQ_ASSIGNMENT` gate. Reviewed By: mariusae, shayne-fletcher Differential Revision: D82573932
1 parent 0d1f676 commit a465169

File tree

4 files changed

+366
-4
lines changed

4 files changed

+366
-4
lines changed

hyperactor/src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ declare_attrs! {
6666
/// Sampling rate for logging message latency
6767
/// Set to 0.01 for 1% sampling, 0.1 for 10% sampling, 0.90 for 90% sampling, etc.
6868
pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01;
69+
70+
/// Whether to enable client sequence assignment.
71+
pub attr ENABLE_CLIENT_SEQ_ASSIGNMENT: bool = false;
6972
}
7073

7174
/// Load configuration from environment variables

hyperactor/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ mod init;
8383
pub mod mailbox;
8484
pub mod message;
8585
pub mod metrics;
86+
mod ordering;
8687
pub mod panic_handler;
8788
mod parse;
8889
pub mod proc;

hyperactor/src/ordering.rs

Lines changed: 327 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,327 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
//! This module contains utilities used to help messages are delivered in order
10+
//! for any given sender and receiver actor pair.
11+
12+
use std::collections::HashMap;
13+
use std::ops::DerefMut;
14+
use std::sync::Arc;
15+
use std::sync::Mutex;
16+
17+
use tokio::sync::mpsc;
18+
use tokio::sync::mpsc::error::SendError;
19+
20+
use crate::dashmap::DashMap;
21+
22+
/// A client's re-ordering buffer state.
23+
struct BufferState<T> {
24+
/// the last sequence number sent to receiver for this client. seq starts
25+
/// with 1 and 0 mean no message has been sent.
26+
last_seq: usize,
27+
/// Buffer out-of-order messages in order to ensures messages are delivered
28+
/// strictly in per-client sequence order.
29+
///
30+
/// Map's key is seq_no, value is msg.
31+
buffer: HashMap<usize, T>,
32+
}
33+
34+
impl<T> Default for BufferState<T> {
35+
fn default() -> Self {
36+
Self {
37+
last_seq: 0,
38+
buffer: HashMap::new(),
39+
}
40+
}
41+
}
42+
43+
/// A sender that ensures messages are delivered in per-client sequence order.
44+
pub(crate) struct OrderedSender<T> {
45+
tx: mpsc::UnboundedSender<T>,
46+
// map's key is name client which sens messages through this channel. Map's
47+
// value is the buffer state of that client.
48+
states: Arc<DashMap<String, Arc<Mutex<BufferState<T>>>>>,
49+
pub(crate) enable_buffering: bool,
50+
/// The identify of this object, which is used to distiguish it in debugging.
51+
log_id: String,
52+
}
53+
54+
/// A receiver that receives messages in per-client sequence order.
55+
pub(crate) fn ordered_channel<T>(
56+
log_id: String,
57+
enable_buffering: bool,
58+
) -> (OrderedSender<T>, mpsc::UnboundedReceiver<T>) {
59+
let (tx, rx) = mpsc::unbounded_channel();
60+
(
61+
OrderedSender {
62+
tx,
63+
states: Arc::new(DashMap::new()),
64+
enable_buffering,
65+
log_id,
66+
},
67+
rx,
68+
)
69+
}
70+
71+
#[derive(Debug)]
72+
pub(crate) enum OrderedSenderError<T> {
73+
InvalidZeroSeq(T),
74+
SendError(SendError<T>),
75+
FlushError(anyhow::Error),
76+
}
77+
78+
impl<T> Clone for OrderedSender<T> {
79+
fn clone(&self) -> Self {
80+
Self {
81+
tx: self.tx.clone(),
82+
states: self.states.clone(),
83+
enable_buffering: self.enable_buffering,
84+
log_id: self.log_id.clone(),
85+
}
86+
}
87+
}
88+
89+
impl<T> OrderedSender<T> {
90+
/// Buffer msgs if necessary, and deliver them to receiver based on their
91+
/// seqs in monotonically increasing order. Note seq is scoped by `sender`
92+
/// so the ordering is also scoped by it.
93+
///
94+
/// Locking behavior:
95+
///
96+
/// For the same channel,
97+
/// * Calls from the same client will be serialized with a lock.
98+
/// * calls from different clients will be executed concurrently.
99+
pub(crate) fn send(
100+
&self,
101+
client: String,
102+
seq_no: usize,
103+
msg: T,
104+
) -> Result<(), OrderedSenderError<T>> {
105+
use std::cmp::Ordering;
106+
107+
assert!(self.enable_buffering);
108+
if seq_no == 0 {
109+
return Err(OrderedSenderError::InvalidZeroSeq(msg));
110+
}
111+
112+
// Make sure only this client's state is locked, not all states.
113+
let state = match self.states.get(&client) {
114+
Some(state) => state.value().clone(),
115+
None => self
116+
.states
117+
.entry(client.clone())
118+
.or_default()
119+
.value()
120+
.clone(),
121+
};
122+
let mut state_guard = state.lock().unwrap();
123+
let BufferState { last_seq, buffer } = state_guard.deref_mut();
124+
125+
match seq_no.cmp(&(*last_seq + 1)) {
126+
Ordering::Less => {
127+
tracing::warn!(
128+
"{} duplicate message from {} with seq no: {}",
129+
self.log_id,
130+
client,
131+
seq_no,
132+
);
133+
}
134+
Ordering::Greater => {
135+
// Future message: buffer until the gap is filled.
136+
let old = buffer.insert(seq_no, msg);
137+
assert!(
138+
old.is_none(),
139+
"{}: same seq is insert to buffer twice: {}",
140+
self.log_id,
141+
seq_no
142+
);
143+
}
144+
Ordering::Equal => {
145+
// In-order: deliver, then flush consecutives from buffer until
146+
// it reaches a gap.
147+
self.tx.send(msg).map_err(OrderedSenderError::SendError)?;
148+
*last_seq += 1;
149+
150+
while let Some(m) = buffer.remove(&(*last_seq + 1)) {
151+
match self.tx.send(m) {
152+
Ok(()) => *last_seq += 1,
153+
Err(err) => {
154+
let flush_err = OrderedSenderError::FlushError(anyhow::anyhow!(
155+
"failed to flush buffered message: {}",
156+
err
157+
));
158+
buffer.insert(*last_seq + 1, err.0);
159+
return Err(flush_err);
160+
}
161+
}
162+
}
163+
// We do not remove a client's state even if its buffer becomes
164+
// empty, because in practice, normally that client will send
165+
// message again.
166+
}
167+
}
168+
169+
Ok(())
170+
}
171+
172+
pub(crate) fn direct_send(&self, msg: T) -> Result<(), SendError<T>> {
173+
assert!(!self.enable_buffering);
174+
self.tx.send(msg)
175+
}
176+
}
177+
178+
#[cfg(test)]
179+
mod tests {
180+
use super::*;
181+
182+
fn drain_try_recv<T: std::fmt::Debug + Clone>(rx: &mut mpsc::UnboundedReceiver<T>) -> Vec<T> {
183+
let mut out = Vec::new();
184+
while let Ok(m) = rx.try_recv() {
185+
out.push(m);
186+
}
187+
out
188+
}
189+
190+
#[test]
191+
fn test_ordered_channel_single_client_send_in_order() {
192+
let (tx, mut rx) = ordered_channel::<usize>("test".to_string(), true);
193+
for s in 1..=10 {
194+
tx.send("A".into(), s, s).unwrap();
195+
let got = drain_try_recv(&mut rx);
196+
assert_eq!(got, vec![s]);
197+
}
198+
}
199+
200+
#[test]
201+
fn test_ordered_channel_single_client_send_out_of_order() {
202+
let (tx, mut rx) = ordered_channel::<usize>("test".to_string(), true);
203+
204+
// Send 2 to 4 in descending order: all should buffer until 1 arrives.
205+
for s in (2..=4).rev() {
206+
tx.send("A".into(), s, s).unwrap();
207+
}
208+
209+
// Send 7 to 9 in descending order: all should buffer until 1 - 6 arrives.
210+
for s in (7..=9).rev() {
211+
tx.send("A".into(), s, s).unwrap();
212+
}
213+
214+
assert!(
215+
drain_try_recv(&mut rx).is_empty(),
216+
"nothing should be delivered yet"
217+
);
218+
219+
// Now send 1: should deliver 1 then flush 2 - 4.
220+
tx.send("A".into(), 1, 1).unwrap();
221+
assert_eq!(drain_try_recv(&mut rx), vec![1, 2, 3, 4]);
222+
223+
// Now send 5: should deliver immediately but not flush 7 - 9.
224+
tx.send("A".into(), 5, 5).unwrap();
225+
assert_eq!(drain_try_recv(&mut rx), vec![5]);
226+
227+
// Now send 6: should deliver 6 then flush 7 - 9.
228+
tx.send("A".into(), 6, 6).unwrap();
229+
assert_eq!(drain_try_recv(&mut rx), vec![6, 7, 8, 9]);
230+
231+
// Send 10: should deliver immediately.
232+
tx.send("A".into(), 10, 10).unwrap();
233+
let got = drain_try_recv(&mut rx);
234+
assert_eq!(got, vec![10]);
235+
}
236+
237+
#[test]
238+
fn test_ordered_channel_multi_clients() {
239+
let (tx, mut rx) = ordered_channel::<(String, usize)>("test".to_string(), true);
240+
241+
// A1 -> deliver
242+
tx.send("A".into(), 1, ("A".into(), 1)).unwrap();
243+
assert_eq!(drain_try_recv(&mut rx), vec![("A".into(), 1)]);
244+
// B1 -> deliver
245+
tx.send("B".into(), 1, ("B".into(), 1)).unwrap();
246+
assert_eq!(drain_try_recv(&mut rx), vec![("B".into(), 1)]);
247+
for s in (3..=5).rev() {
248+
// A3-5 -> buffer (waiting for A2)
249+
tx.send("A".into(), s, ("A".into(), s)).unwrap();
250+
// B3-5 -> buffer (waiting for B2)
251+
tx.send("B".into(), s, ("B".into(), s)).unwrap();
252+
}
253+
for s in (7..=9).rev() {
254+
// A7-9 -> buffer (waiting for A1-6)
255+
tx.send("A".into(), s, ("A".into(), s)).unwrap();
256+
// B7-9 -> buffer (waiting for B1-6)
257+
tx.send("B".into(), s, ("B".into(), s)).unwrap();
258+
}
259+
assert!(
260+
drain_try_recv(&mut rx).is_empty(),
261+
"nothing should be delivered yet"
262+
);
263+
264+
// A2 -> deliver A2 then flush A3
265+
tx.send("A".into(), 2, ("A".into(), 2)).unwrap();
266+
assert_eq!(
267+
drain_try_recv(&mut rx),
268+
vec![
269+
("A".into(), 2),
270+
("A".into(), 3),
271+
("A".into(), 4),
272+
("A".into(), 5),
273+
]
274+
);
275+
// B2 -> deliver B2 then flush B3
276+
tx.send("B".into(), 2, ("B".into(), 2)).unwrap();
277+
assert_eq!(
278+
drain_try_recv(&mut rx),
279+
vec![
280+
("B".into(), 2),
281+
("B".into(), 3),
282+
("B".into(), 4),
283+
("B".into(), 5),
284+
]
285+
);
286+
287+
// A6 -> should deliver immediately and flush A7-9
288+
tx.send("A".into(), 6, ("A".into(), 6)).unwrap();
289+
assert_eq!(
290+
drain_try_recv(&mut rx),
291+
vec![
292+
("A".into(), 6),
293+
("A".into(), 7),
294+
("A".into(), 8),
295+
("A".into(), 9)
296+
]
297+
);
298+
// B6 -> should deliver immediately and flush B7-9
299+
tx.send("B".into(), 6, ("B".into(), 6)).unwrap();
300+
assert_eq!(
301+
drain_try_recv(&mut rx),
302+
vec![
303+
("B".into(), 6),
304+
("B".into(), 7),
305+
("B".into(), 8),
306+
("B".into(), 9)
307+
]
308+
);
309+
}
310+
311+
#[test]
312+
fn test_ordered_channel_duplicates() {
313+
let (tx, mut rx) = ordered_channel::<(String, usize)>("test".to_string(), true);
314+
315+
// A1 -> deliver
316+
tx.send("A".into(), 1, ("A".into(), 1)).unwrap();
317+
// duplicate A1 -> drop even if the message is different.
318+
tx.send("A".into(), 1, ("A".into(), 1_000)).unwrap();
319+
// A2 -> deliver
320+
tx.send("A".into(), 2, ("A".into(), 2)).unwrap();
321+
// late A1 duplicate -> drop
322+
tx.send("A".into(), 1, ("A".into(), 1_001)).unwrap();
323+
324+
let got = drain_try_recv(&mut rx);
325+
assert_eq!(got, vec![("A".into(), 1), ("A".into(), 2),]);
326+
}
327+
}

0 commit comments

Comments
 (0)