Skip to content

Commit 284ffc5

Browse files
pzhan9facebook-github-bot
authored andcommitted
Add re-ordering buffer to the actor side (#1244)
Summary: In Rust V1, we want to move the re-ordering buffer from [comm actor](https://www.internalfb.com/code/fbsource/[aea72188b23ef1539c1a04996566a8f188e42644]/fbcode/monarch/hyperactor_mesh/src/comm.rs?lines=383-384) to the actor side. The basic idea is: * client assigns the seq no for messages it sends to actors; * the `(client_name, seq_no)` tuple is added to the message's header. * for direct send, client will add that header; * for multicast, a actor->seq map is passed to comm actors, and comm actor will add that header before delivering the message to actors. * Each actor instance maintains its own re-ordering buffer, and use the `seq_no` to buffer and re-order the message, before passing the message to the actor loop. This diff implements the last part, the re-ordering buffer in actor instance. Specifically, this diff wrap the re-ordering buffer under the `OrderChannel` abstraction, so it can fit to the current implementation seamlessly. The change is currently disabled behind the `ENABLE_CLIENT_SEQ_ASSIGNMENT` gate. Reviewed By: mariusae, shayne-fletcher Differential Revision: D82573932
1 parent 3c0c5a4 commit 284ffc5

File tree

4 files changed

+383
-4
lines changed

4 files changed

+383
-4
lines changed

hyperactor/src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ declare_attrs! {
6666
/// Sampling rate for logging message latency
6767
/// Set to 0.01 for 1% sampling, 0.1 for 10% sampling, 0.90 for 90% sampling, etc.
6868
pub attr MESSAGE_LATENCY_SAMPLING_RATE: f32 = 0.01;
69+
70+
/// Whether to enable client sequence assignment.
71+
pub attr ENABLE_CLIENT_SEQ_ASSIGNMENT: bool = false;
6972
}
7073

7174
/// Load configuration from environment variables

hyperactor/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ mod init;
8383
pub mod mailbox;
8484
pub mod message;
8585
pub mod metrics;
86+
mod ordering;
8687
pub mod panic_handler;
8788
mod parse;
8889
pub mod proc;

hyperactor/src/ordering.rs

Lines changed: 344 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,344 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
//! This module contains utilities used to help messages are delivered in order
10+
//! for any given sender and receiver actor pair.
11+
12+
use std::collections::HashMap;
13+
use std::ops::DerefMut;
14+
use std::sync::Arc;
15+
use std::sync::Mutex;
16+
17+
use tokio::sync::mpsc;
18+
use tokio::sync::mpsc::error::SendError;
19+
20+
use crate::dashmap::DashMap;
21+
22+
/// A client's re-ordering buffer state.
23+
struct BufferState<T> {
24+
/// the last sequence number sent to receiver for this client. seq starts
25+
/// with 1 and 0 mean no message has been sent.
26+
last_seq: usize,
27+
/// Buffer out-of-order messages in order to ensures messages are delivered
28+
/// strictly in per-client sequence order.
29+
///
30+
/// Map's key is seq_no, value is msg.
31+
buffer: HashMap<usize, T>,
32+
}
33+
34+
impl<T> Default for BufferState<T> {
35+
fn default() -> Self {
36+
Self {
37+
last_seq: 0,
38+
buffer: HashMap::new(),
39+
}
40+
}
41+
}
42+
43+
/// A sender that ensures messages are delivered in per-client sequence order.
44+
pub(crate) struct OrderedSender<T> {
45+
tx: mpsc::UnboundedSender<T>,
46+
// map's key is name client which sens messages through this channel. Map's
47+
// value is the buffer state of that client.
48+
states: Arc<DashMap<String, Arc<Mutex<BufferState<T>>>>>,
49+
pub(crate) enable_buffering: bool,
50+
/// The identify of this object, which is used to distiguish it in debugging.
51+
log_id: String,
52+
}
53+
54+
/// A receiver that receives messages in per-client sequence order.
55+
pub(crate) fn ordered_channel<T>(
56+
log_id: String,
57+
enable_buffering: bool,
58+
) -> (OrderedSender<T>, mpsc::UnboundedReceiver<T>) {
59+
let (tx, rx) = mpsc::unbounded_channel();
60+
(
61+
OrderedSender {
62+
tx,
63+
states: Arc::new(DashMap::new()),
64+
enable_buffering,
65+
log_id,
66+
},
67+
rx,
68+
)
69+
}
70+
71+
#[derive(Debug)]
72+
pub(crate) enum OrderedSenderError<T> {
73+
InvalidZeroSeq(T),
74+
SendError(SendError<T>),
75+
FlushError(anyhow::Error),
76+
}
77+
78+
impl<T> Clone for OrderedSender<T> {
79+
fn clone(&self) -> Self {
80+
Self {
81+
tx: self.tx.clone(),
82+
states: self.states.clone(),
83+
enable_buffering: self.enable_buffering,
84+
log_id: self.log_id.clone(),
85+
}
86+
}
87+
}
88+
89+
impl<T> OrderedSender<T> {
90+
/// Buffer msgs if necessary, and deliver them to receiver based on their
91+
/// seqs in monotonically increasing order. Note seq is scoped by `sender`
92+
/// so the ordering is also scoped by it.
93+
///
94+
/// Locking behavior:
95+
///
96+
/// For the same channel,
97+
/// * Calls from the same client will be serialized with a lock.
98+
/// * calls from different clients will be executed concurrently.
99+
pub(crate) fn send(
100+
&self,
101+
client: String,
102+
seq_no: usize,
103+
msg: T,
104+
) -> Result<(), OrderedSenderError<T>> {
105+
use std::cmp::Ordering;
106+
107+
assert!(self.enable_buffering);
108+
if seq_no == 0 {
109+
return Err(OrderedSenderError::InvalidZeroSeq(msg));
110+
}
111+
112+
// Make sure only this client's state is locked, not all states.
113+
let state = match self.states.get(&client) {
114+
Some(state) => state.value().clone(),
115+
None => self
116+
.states
117+
.entry(client.clone())
118+
.or_default()
119+
.value()
120+
.clone(),
121+
};
122+
let mut state_guard = state.lock().unwrap();
123+
let BufferState { last_seq, buffer } = state_guard.deref_mut();
124+
125+
match seq_no.cmp(&(*last_seq + 1)) {
126+
Ordering::Less => {
127+
tracing::warn!(
128+
"{} duplicate message from {} with seq no: {}",
129+
self.log_id,
130+
client,
131+
seq_no,
132+
);
133+
}
134+
Ordering::Greater => {
135+
// Future message: buffer until the gap is filled.
136+
let old = buffer.insert(seq_no, msg);
137+
assert!(
138+
old.is_none(),
139+
"{}: same seq is insert to buffer twice: {}",
140+
self.log_id,
141+
seq_no
142+
);
143+
}
144+
Ordering::Equal => {
145+
// In-order: deliver, then flush consecutives from buffer until
146+
// it reaches a gap.
147+
self.tx.send(msg).map_err(OrderedSenderError::SendError)?;
148+
*last_seq += 1;
149+
150+
while let Some(m) = buffer.remove(&(*last_seq + 1)) {
151+
match self.tx.send(m) {
152+
Ok(()) => *last_seq += 1,
153+
Err(err) => {
154+
let flush_err = OrderedSenderError::FlushError(anyhow::anyhow!(
155+
"failed to flush buffered message: {}",
156+
err
157+
));
158+
buffer.insert(*last_seq + 1, err.0);
159+
return Err(flush_err);
160+
}
161+
}
162+
}
163+
// We do not remove a client's state even if its buffer becomes
164+
// empty. This is because a duplicate message might arrive after
165+
// the buffer became empty. Removing the state would cause the
166+
// duplicate message to be delivered.
167+
}
168+
}
169+
170+
Ok(())
171+
}
172+
173+
pub(crate) fn direct_send(&self, msg: T) -> Result<(), SendError<T>> {
174+
assert!(!self.enable_buffering);
175+
self.tx.send(msg)
176+
}
177+
}
178+
179+
#[cfg(test)]
180+
mod tests {
181+
use super::*;
182+
183+
fn drain_try_recv<T: std::fmt::Debug + Clone>(rx: &mut mpsc::UnboundedReceiver<T>) -> Vec<T> {
184+
let mut out = Vec::new();
185+
while let Ok(m) = rx.try_recv() {
186+
out.push(m);
187+
}
188+
out
189+
}
190+
191+
#[test]
192+
fn test_ordered_channel_single_client_send_in_order() {
193+
let (tx, mut rx) = ordered_channel::<usize>("test".to_string(), true);
194+
for s in 1..=10 {
195+
tx.send("A".into(), s, s).unwrap();
196+
let got = drain_try_recv(&mut rx);
197+
assert_eq!(got, vec![s]);
198+
}
199+
}
200+
201+
#[test]
202+
fn test_ordered_channel_single_client_send_out_of_order() {
203+
let (tx, mut rx) = ordered_channel::<usize>("test".to_string(), true);
204+
205+
// Send 2 to 4 in descending order: all should buffer until 1 arrives.
206+
for s in (2..=4).rev() {
207+
tx.send("A".into(), s, s).unwrap();
208+
}
209+
210+
// Send 7 to 9 in descending order: all should buffer until 1 - 6 arrives.
211+
for s in (7..=9).rev() {
212+
tx.send("A".into(), s, s).unwrap();
213+
}
214+
215+
assert!(
216+
drain_try_recv(&mut rx).is_empty(),
217+
"nothing should be delivered yet"
218+
);
219+
220+
// Now send 1: should deliver 1 then flush 2 - 4.
221+
tx.send("A".into(), 1, 1).unwrap();
222+
assert_eq!(drain_try_recv(&mut rx), vec![1, 2, 3, 4]);
223+
224+
// Now send 5: should deliver immediately but not flush 7 - 9.
225+
tx.send("A".into(), 5, 5).unwrap();
226+
assert_eq!(drain_try_recv(&mut rx), vec![5]);
227+
228+
// Now send 6: should deliver 6 then flush 7 - 9.
229+
tx.send("A".into(), 6, 6).unwrap();
230+
assert_eq!(drain_try_recv(&mut rx), vec![6, 7, 8, 9]);
231+
232+
// Send 10: should deliver immediately.
233+
tx.send("A".into(), 10, 10).unwrap();
234+
let got = drain_try_recv(&mut rx);
235+
assert_eq!(got, vec![10]);
236+
}
237+
238+
#[test]
239+
fn test_ordered_channel_multi_clients() {
240+
let (tx, mut rx) = ordered_channel::<(String, usize)>("test".to_string(), true);
241+
242+
// A1 -> deliver
243+
tx.send("A".into(), 1, ("A".into(), 1)).unwrap();
244+
assert_eq!(drain_try_recv(&mut rx), vec![("A".into(), 1)]);
245+
// B1 -> deliver
246+
tx.send("B".into(), 1, ("B".into(), 1)).unwrap();
247+
assert_eq!(drain_try_recv(&mut rx), vec![("B".into(), 1)]);
248+
for s in (3..=5).rev() {
249+
// A3-5 -> buffer (waiting for A2)
250+
tx.send("A".into(), s, ("A".into(), s)).unwrap();
251+
// B3-5 -> buffer (waiting for B2)
252+
tx.send("B".into(), s, ("B".into(), s)).unwrap();
253+
}
254+
for s in (7..=9).rev() {
255+
// A7-9 -> buffer (waiting for A1-6)
256+
tx.send("A".into(), s, ("A".into(), s)).unwrap();
257+
// B7-9 -> buffer (waiting for B1-6)
258+
tx.send("B".into(), s, ("B".into(), s)).unwrap();
259+
}
260+
assert!(
261+
drain_try_recv(&mut rx).is_empty(),
262+
"nothing should be delivered yet"
263+
);
264+
265+
// A2 -> deliver A2 then flush A3
266+
tx.send("A".into(), 2, ("A".into(), 2)).unwrap();
267+
assert_eq!(
268+
drain_try_recv(&mut rx),
269+
vec![
270+
("A".into(), 2),
271+
("A".into(), 3),
272+
("A".into(), 4),
273+
("A".into(), 5),
274+
]
275+
);
276+
// B2 -> deliver B2 then flush B3
277+
tx.send("B".into(), 2, ("B".into(), 2)).unwrap();
278+
assert_eq!(
279+
drain_try_recv(&mut rx),
280+
vec![
281+
("B".into(), 2),
282+
("B".into(), 3),
283+
("B".into(), 4),
284+
("B".into(), 5),
285+
]
286+
);
287+
288+
// A6 -> should deliver immediately and flush A7-9
289+
tx.send("A".into(), 6, ("A".into(), 6)).unwrap();
290+
assert_eq!(
291+
drain_try_recv(&mut rx),
292+
vec![
293+
("A".into(), 6),
294+
("A".into(), 7),
295+
("A".into(), 8),
296+
("A".into(), 9)
297+
]
298+
);
299+
// B6 -> should deliver immediately and flush B7-9
300+
tx.send("B".into(), 6, ("B".into(), 6)).unwrap();
301+
assert_eq!(
302+
drain_try_recv(&mut rx),
303+
vec![
304+
("B".into(), 6),
305+
("B".into(), 7),
306+
("B".into(), 8),
307+
("B".into(), 9)
308+
]
309+
);
310+
}
311+
312+
#[test]
313+
fn test_ordered_channel_duplicates() {
314+
fn verify_empty_buffers<T>(states: &DashMap<String, Arc<Mutex<BufferState<T>>>>) {
315+
for entry in states.iter() {
316+
assert!(entry.value().lock().unwrap().buffer.is_empty());
317+
}
318+
}
319+
320+
let (tx, mut rx) = ordered_channel::<(String, usize)>("test".to_string(), true);
321+
// A1 -> deliver
322+
tx.send("A".into(), 1, ("A".into(), 1)).unwrap();
323+
assert_eq!(drain_try_recv(&mut rx), vec![("A".into(), 1)]);
324+
verify_empty_buffers(&tx.states);
325+
// duplicate A1 -> drop even if the message is different.
326+
tx.send("A".into(), 1, ("A".into(), 1_000)).unwrap();
327+
assert!(
328+
drain_try_recv(&mut rx).is_empty(),
329+
"nothing should be delivered yet"
330+
);
331+
verify_empty_buffers(&tx.states);
332+
// A2 -> deliver
333+
tx.send("A".into(), 2, ("A".into(), 2)).unwrap();
334+
assert_eq!(drain_try_recv(&mut rx), vec![("A".into(), 2)]);
335+
verify_empty_buffers(&tx.states);
336+
// late A1 duplicate -> drop
337+
tx.send("A".into(), 1, ("A".into(), 1_001)).unwrap();
338+
assert!(
339+
drain_try_recv(&mut rx).is_empty(),
340+
"nothing should be delivered yet"
341+
);
342+
verify_empty_buffers(&tx.states);
343+
}
344+
}

0 commit comments

Comments
 (0)