@@ -128,6 +128,9 @@ pub struct Raft<T: Storage> {
128
128
/// The list of messages.
129
129
pub msgs : Vec < Message > ,
130
130
131
+ /// The list of heartbeat messages.
132
+ pub heartbeats : Vec < Message > ,
133
+
131
134
/// The leader id
132
135
pub leader_id : u64 ,
133
136
@@ -246,6 +249,7 @@ impl<T: Storage> Raft<T> {
246
249
election_timeout : c. election_tick ,
247
250
votes : Default :: default ( ) ,
248
251
msgs : Default :: default ( ) ,
252
+ heartbeats : Default :: default ( ) ,
249
253
leader_id : Default :: default ( ) ,
250
254
lead_transferee : None ,
251
255
term : Default :: default ( ) ,
@@ -710,10 +714,12 @@ impl<T: Storage> Raft<T> {
710
714
m. set_msg_type ( MessageType :: MsgHeartbeat ) ;
711
715
let commit = cmp:: min ( pr. matched , self . raft_log . committed ) ;
712
716
m. commit = commit;
717
+ m. from = self . id ;
718
+ m. term = self . term ;
713
719
if let Some ( context) = ctx {
714
720
m. context = context;
715
721
}
716
- self . send ( m) ;
722
+ self . heartbeats . push ( m) ;
717
723
}
718
724
719
725
/// Sends RPC, with entries to all peers that are not up-to-date
@@ -2005,7 +2011,16 @@ impl<T: Storage> Raft<T> {
2005
2011
to_send. to = m. from ;
2006
2012
to_send. context = m. take_context ( ) ;
2007
2013
to_send. commit = self . raft_log . committed ;
2008
- self . send ( to_send) ;
2014
+ if self . raft_log . committed > self . raft_log . store . last_index ( ) . unwrap ( )
2015
+ {
2016
+ // If there is some entries that has committed in memory but not persisted, the message
2017
+ // shall not be sent until all entries before committed_index have been persisted.
2018
+ self . send ( to_send) ;
2019
+ } else {
2020
+ to_send. term = self . term ;
2021
+ to_send. from = self . id ;
2022
+ self . heartbeats . push ( to_send) ;
2023
+ }
2009
2024
}
2010
2025
2011
2026
fn handle_snapshot ( & mut self , mut m : Message ) {
0 commit comments