Skip to content

Commit d526978

Browse files
Outgoing messages buffer reuse (#150)
* More reasonable defaults * Adds become_leader() for manual leader takeover. * try_become_leader() with Ballot increment. * Changes outgoing_messages() to take a buffer instead of allocating new vec. * `take_outgoing_messages` rename and cleaner buffered message logic. * Crash additional follower in sync tests to ensure entries remain non-decided. * Comment out GithuB action cargo caching * Update docs Co-authored-by: Harald Ng <[email protected]> * Renamed buffered to latest and fixed clippy warnings * Reintroduce GitHub actions cargo caching. * Update omnipaxos/tests/sync_test.rs * Revert "Update omnipaxos/tests/sync_test.rs" This reverts commit 6bfedc1. --------- Co-authored-by: Harald Ng <[email protected]>
1 parent c9d3ad5 commit d526978

File tree

16 files changed

+171
-130
lines changed

16 files changed

+171
-130
lines changed

docs/omnipaxos/communication.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ By handling incoming messages and local calls such as `append()`, our local `omn
1515

1616
```rust
1717
// send outgoing messages. This should be called periodically, e.g. every ms
18-
for out_msg in omni_paxos.outgoing_messages() {
18+
let mut msg_buffer = Vec::with_capacity(1000);
19+
omni_paxos.take_outgoing_messages(&mut msg_buffer);
20+
for out_msg in msg_buffer.drain(..) {
1921
let receiver = out_msg.get_receiver();
2022
// send out_msg to receiver on network layer
2123
}

examples/dashboard/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ fn main() {
6161
omni_paxos: Arc::clone(&omni_paxos),
6262
incoming: receiver_channels.remove(pid).unwrap(),
6363
outgoing: sender_channels.clone(),
64+
message_buffer: vec![],
6465
};
6566
let join_handle = runtime.spawn({
6667
async move {

examples/dashboard/src/server.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@ pub struct OmniPaxosServer {
1616
pub omni_paxos: Arc<Mutex<OmniPaxosLog>>,
1717
pub incoming: mpsc::Receiver<Message<LogEntry>>,
1818
pub outgoing: HashMap<NodeId, mpsc::Sender<Message<LogEntry>>>,
19+
pub message_buffer: Vec<Message<LogEntry>>,
1920
}
2021

2122
impl OmniPaxosServer {
2223
async fn send_outgoing_msgs(&mut self) {
23-
let messages = self.omni_paxos.lock().unwrap().outgoing_messages();
24-
for msg in messages {
24+
self.omni_paxos
25+
.lock()
26+
.unwrap()
27+
.take_outgoing_messages(&mut self.message_buffer);
28+
for msg in self.message_buffer.drain(..) {
2529
let receiver = msg.get_receiver();
2630
let channel = self
2731
.outgoing

examples/kv_store/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ fn main() {
6868
omni_paxos: Arc::clone(&omni_paxos),
6969
incoming: receiver_channels.remove(&pid).unwrap(),
7070
outgoing: sender_channels.clone(),
71+
message_buffer: vec![],
7172
};
7273
let join_handle = runtime.spawn({
7374
async move {

examples/kv_store/src/server.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,16 @@ pub struct OmniPaxosServer {
1515
pub omni_paxos: Arc<Mutex<OmniPaxosKV>>,
1616
pub incoming: mpsc::Receiver<Message<KeyValue>>,
1717
pub outgoing: HashMap<NodeId, mpsc::Sender<Message<KeyValue>>>,
18+
pub message_buffer: Vec<Message<KeyValue>>,
1819
}
1920

2021
impl OmniPaxosServer {
2122
async fn send_outgoing_msgs(&mut self) {
22-
let messages = self.omni_paxos.lock().unwrap().outgoing_messages();
23-
for msg in messages {
23+
self.omni_paxos
24+
.lock()
25+
.unwrap()
26+
.take_outgoing_messages(&mut self.message_buffer);
27+
for msg in self.message_buffer.drain(..) {
2428
let receiver = msg.get_receiver();
2529
let channel = self
2630
.outgoing

omnipaxos/src/ballot_leader_election.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,9 @@ impl BallotLeaderElection {
156156
self.current_ballot.priority = p;
157157
}
158158

159-
/// Returns outgoing messages
160-
pub(crate) fn get_outgoing_msgs(&mut self) -> Vec<BLEMessage> {
161-
std::mem::take(&mut self.outgoing)
159+
/// Returns reference to outgoing messages
160+
pub(crate) fn outgoing_mut(&mut self) -> &mut Vec<BLEMessage> {
161+
&mut self.outgoing
162162
}
163163

164164
/// Handle an incoming message.
@@ -205,7 +205,11 @@ impl BallotLeaderElection {
205205
self.new_hb_round();
206206
if seq_paxos_promise > self.leader {
207207
// Sync leader with Paxos promise in case ballot didn't make it to BLE followers
208+
// or become_leader() was called.
208209
self.leader = seq_paxos_promise;
210+
if seq_paxos_promise.pid == self.pid {
211+
self.current_ballot = seq_paxos_promise;
212+
}
209213
self.happy = true;
210214
}
211215
if self.leader == self.current_ballot {

omnipaxos/src/omni_paxos.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -289,19 +289,10 @@ where
289289
self.seq_paxos.get_promise()
290290
}
291291

292-
/// Returns the outgoing messages from this server. The messages should then be sent via the network implementation.
293-
pub fn outgoing_messages(&mut self) -> Vec<Message<T>> {
294-
let paxos_msgs = self
295-
.seq_paxos
296-
.get_outgoing_msgs()
297-
.into_iter()
298-
.map(|p| Message::SequencePaxos(p));
299-
let ble_msgs = self
300-
.ble
301-
.get_outgoing_msgs()
302-
.into_iter()
303-
.map(|b| Message::BLE(b));
304-
ble_msgs.chain(paxos_msgs).collect()
292+
/// Moves outgoing messages from this server into the buffer. The messages should then be sent via the network implementation.
293+
pub fn take_outgoing_messages(&mut self, buffer: &mut Vec<Message<T>>) {
294+
self.seq_paxos.take_outgoing_msgs(buffer);
295+
buffer.extend(self.ble.outgoing_mut().drain(..).map(|b| Message::BLE(b)));
305296
}
306297

307298
/// Read entry at index `idx` in the log. Returns `None` if `idx` is out of bounds.
@@ -394,6 +385,15 @@ where
394385
}
395386
}
396387

388+
/// Manually attempt to become the leader by incrementing this instance's Ballot. Calling this
389+
/// function may not result in gainig leadership if other instances are competing for
390+
/// leadership with higher Ballots.
391+
pub fn try_become_leader(&mut self) {
392+
let mut my_ballot = self.ble.get_current_ballot();
393+
my_ballot.n += 1;
394+
self.seq_paxos.handle_leader(my_ballot);
395+
}
396+
397397
/*** BLE calls ***/
398398
/// Update the custom priority used in the Ballot for this server. Note that changing the
399399
/// priority triggers a leader re-election.

omnipaxos/src/sequence_paxos/follower.rs

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@ where
4242
log_sync,
4343
};
4444
self.cached_promise_message = Some(promise.clone());
45-
self.outgoing.push(PaxosMessage {
45+
self.outgoing.push(Message::SequencePaxos(PaxosMessage {
4646
from: self.pid,
4747
to: from,
4848
msg: PaxosMsg::Promise(promise),
49-
});
49+
}));
5050
}
5151
}
5252

@@ -68,11 +68,11 @@ where
6868
self.current_seq_num = accsync.seq_num;
6969
let cached_idx = self.outgoing.len();
7070
self.latest_accepted_meta = Some((accsync.n, cached_idx));
71-
self.outgoing.push(PaxosMessage {
71+
self.outgoing.push(Message::SequencePaxos(PaxosMessage {
7272
from: self.pid,
7373
to: from,
7474
msg: PaxosMsg::Accepted(accepted),
75-
});
75+
}));
7676
#[cfg(feature = "unicache")]
7777
self.internal_storage.set_unicache(accsync.unicache);
7878
}
@@ -158,27 +158,38 @@ where
158158
}
159159

160160
fn reply_accepted(&mut self, n: Ballot, accepted_idx: usize) {
161-
match &self.latest_accepted_meta {
162-
Some((round, outgoing_idx)) if round == &n => {
163-
let PaxosMessage { msg, .. } = self.outgoing.get_mut(*outgoing_idx).unwrap();
164-
match msg {
165-
PaxosMsg::Accepted(a) => {
166-
a.accepted_idx = accepted_idx;
167-
}
168-
_ => panic!("Cached idx is not an Accepted Message<T>!"),
169-
}
170-
}
171-
_ => {
161+
let latest_accepted = self.get_latest_accepted_message(n);
162+
match latest_accepted {
163+
Some(acc) => acc.accepted_idx = accepted_idx,
164+
None => {
172165
let accepted = Accepted { n, accepted_idx };
173166
let cached_idx = self.outgoing.len();
174167
self.latest_accepted_meta = Some((n, cached_idx));
175-
self.outgoing.push(PaxosMessage {
168+
self.outgoing.push(Message::SequencePaxos(PaxosMessage {
176169
from: self.pid,
177170
to: n.pid,
178171
msg: PaxosMsg::Accepted(accepted),
179-
});
172+
}));
180173
}
181-
};
174+
}
175+
}
176+
177+
fn get_latest_accepted_message(&mut self, n: Ballot) -> Option<&mut Accepted> {
178+
if let Some((ballot, outgoing_idx)) = &self.latest_accepted_meta {
179+
if *ballot == n {
180+
if let Message::SequencePaxos(PaxosMessage {
181+
msg: PaxosMsg::Accepted(a),
182+
..
183+
}) = self.outgoing.get_mut(*outgoing_idx).unwrap()
184+
{
185+
return Some(a);
186+
} else {
187+
#[cfg(feature = "logging")]
188+
debug!(self.logger, "Cached idx is not an Accepted message!");
189+
}
190+
}
191+
}
192+
None
182193
}
183194

184195
/// Also returns whether the message's ballot was promised
@@ -195,11 +206,11 @@ where
195206
my_promise,
196207
message_ballot
197208
);
198-
self.outgoing.push(PaxosMessage {
209+
self.outgoing.push(Message::SequencePaxos(PaxosMessage {
199210
from: self.pid,
200211
to: message_ballot.pid,
201212
msg: PaxosMsg::NotAccepted(not_acc),
202-
});
213+
}));
203214
false
204215
}
205216
std::cmp::Ordering::Less => {
@@ -232,11 +243,11 @@ where
232243
// Resend Promise
233244
match &self.cached_promise_message {
234245
Some(promise) => {
235-
self.outgoing.push(PaxosMessage {
246+
self.outgoing.push(Message::SequencePaxos(PaxosMessage {
236247
from: self.pid,
237248
to: promise.n.pid,
238249
msg: PaxosMsg::Promise(promise.clone()),
239-
});
250+
}));
240251
}
241252
None => {
242253
// Shouldn't be possible to be in prepare phase without having
@@ -262,11 +273,11 @@ where
262273
n: self.get_promise(),
263274
};
264275
for peer in &self.peers {
265-
self.outgoing.push(PaxosMessage {
276+
self.outgoing.push(Message::SequencePaxos(PaxosMessage {
266277
from: self.pid,
267278
to: *peer,
268279
msg: PaxosMsg::PrepareReq(prepreq),
269-
});
280+
}));
270281
}
271282
}
272283

0 commit comments

Comments
 (0)