Skip to content

Commit 512d1fc

Browse files
committed
Raft batching: Reduce lock contention on raft mutex
This is an attempt to reduce contention between Propose() and sendAppendEntry(). Change Propose() to acquire a read lock on Raft, and avoid locking Raft during storeToWAL() (which potentially does IO and may take a long time). This works as long as sendAppendEntry() is called from the Raft's goroutine only, unless the entry does not require to be stored to the Raft log. So the rest of the changes are for enforcing the above requirement: * Change EntryLeaderTransfer so that it is not store to the Raft log. * Push EntryPeerState and EntrySnapshot entries to the proposal queue. * Make sure EntrySnapshot entries skip the leader check, so make sure those are not batched together with other entries. For performance testing only at this point.
1 parent 2a19475 commit 512d1fc

File tree

7 files changed

+134
-76
lines changed

7 files changed

+134
-76
lines changed

server/filestore.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8195,6 +8195,10 @@ func fileStoreMsgSizeEstimate(slen, maxPayload int) uint64 {
81958195
return uint64(emptyRecordLen + slen + 4 + maxPayload)
81968196
}
81978197

8198+
func (fs *fileStore) MsgSize(msg []byte) uint64 {
8199+
return fileStoreMsgSizeRaw(0, 0, len(msg))
8200+
}
8201+
81988202
// ResetState resets any state that's temporary. For example when changing leaders.
81998203
func (fs *fileStore) ResetState() {
82008204
fs.mu.Lock()

server/jetstream_cluster_1_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6973,7 +6973,7 @@ func TestJetStreamClusterCatchupMustStallWhenBehindOnApplies(t *testing.T) {
69736973
n := mset.node.(*raft)
69746974
n.Lock()
69756975
ae := n.buildAppendEntry(nil)
6976-
err = n.storeToWAL(ae)
6976+
_, _, err = n.storeToWAL(ae)
69776977
n.Unlock()
69786978
index, commit, applied := n.Progress()
69796979
require_NoError(t, err)

server/jetstream_cluster_4_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4269,7 +4269,7 @@ func TestJetStreamClusterConsumerDontSendSnapshotOnLeaderChange(t *testing.T) {
42694269
rn.Lock()
42704270
entries := []*Entry{{EntryNormal, updateDeliveredBuffer()}, {EntryNormal, updateAcksBuffer()}}
42714271
ae := encode(t, rn.buildAppendEntry(entries))
4272-
err = rn.storeToWAL(ae)
4272+
_, _, err = rn.storeToWAL(ae)
42734273
minPindex := rn.pindex
42744274
rn.Unlock()
42754275
require_NoError(t, err)

server/memstore.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2068,6 +2068,10 @@ func memStoreMsgSize(subj string, hdr, msg []byte) uint64 {
20682068
return memStoreMsgSizeRaw(len(subj), len(hdr), len(msg))
20692069
}
20702070

2071+
func (ms *memStore) MsgSize(msg []byte) uint64 {
2072+
return memStoreMsgSizeRaw(0, 0, len(msg))
2073+
}
2074+
20712075
// ResetState resets any state that's temporary. For example when changing leaders.
20722076
func (ms *memStore) ResetState() {
20732077
ms.mu.Lock()

server/raft.go

Lines changed: 116 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ type RaftNode interface {
9191
type WAL interface {
9292
Type() StorageType
9393
StoreMsg(subj string, hdr, msg []byte, ttl int64) (uint64, int64, error)
94+
MsgSize(msg []byte) uint64
9495
LoadMsg(index uint64, sm *StoreMsg) (*StoreMsg, error)
9596
RemoveMsg(index uint64) (bool, error)
9697
Compact(index uint64) (uint64, error)
@@ -867,19 +868,22 @@ func (s *Server) transferRaftLeaders() bool {
867868
// Propose will propose a new entry to the group.
868869
// This should only be called on the leader.
869870
func (n *raft) Propose(data []byte) error {
870-
n.Lock()
871-
defer n.Unlock()
872-
// Check state under lock, we might not be leader anymore.
873-
if state := n.State(); state != Leader {
871+
n.RLock()
872+
state := n.State()
873+
writeError := n.werr
874+
prop := n.prop
875+
n.RUnlock()
876+
877+
if state != Leader {
874878
n.debug("Proposal ignored, not leader (state: %v)", state)
875879
return errNotLeader
876880
}
877881

878-
// Error if we had a previous write error.
879-
if werr := n.werr; werr != nil {
880-
return werr
882+
if writeError != nil {
883+
return writeError
881884
}
882-
n.prop.push(newProposedEntry(newEntry(EntryNormal, data), _EMPTY_))
885+
886+
prop.push(newProposedEntry(newEntry(EntryNormal, data), _EMPTY_))
883887
return nil
884888
}
885889

@@ -1014,10 +1018,9 @@ func (n *raft) AdjustBootClusterSize(csz int) error {
10141018
// Must be the leader.
10151019
func (n *raft) AdjustClusterSize(csz int) error {
10161020
n.Lock()
1017-
defer n.Unlock()
1018-
10191021
// Check state under lock, we might not be leader anymore.
10201022
if n.State() != Leader {
1023+
n.Unlock()
10211024
return errNotLeader
10221025
}
10231026
// Same floor as bootstrap.
@@ -1029,8 +1032,10 @@ func (n *raft) AdjustClusterSize(csz int) error {
10291032
// a quorum.
10301033
n.csz = csz
10311034
n.qn = n.csz/2 + 1
1035+
n.Unlock()
10321036

1033-
n.sendPeerState()
1037+
n.prop.push(newProposedEntry(
1038+
newEntry(EntryPeerState, encodePeerState(n.currentPeerState())), _EMPTY_))
10341039
return nil
10351040
}
10361041

@@ -1213,10 +1218,8 @@ func (n *raft) encodeSnapshot(snap *snapshot) []byte {
12131218
// Should only be used when the upper layers know this is most recent.
12141219
// Used when restoring streams, moving a stream from R1 to R>1, etc.
12151220
func (n *raft) SendSnapshot(data []byte) error {
1216-
n.Lock()
1217-
defer n.Unlock()
1218-
// Don't check if we're leader before sending and storing, this is used on scaleup.
1219-
n.sendAppendEntryLocked([]*Entry{{EntrySnapshot, data}}, false)
1221+
// TODO Need to copy data?
1222+
n.prop.push(newProposedEntry(newEntry(EntrySnapshot, data), _EMPTY_))
12201223
return nil
12211224
}
12221225

@@ -1714,6 +1717,8 @@ func (n *raft) StepDown(preferred ...string) error {
17141717
// Send the append entry directly rather than via the proposals queue,
17151718
// as we will switch to follower state immediately and will blow away
17161719
// the contents of the proposal queue in the process.
1720+
// Also, we won't store the entry in the Raft log, so it is OK ot call
1721+
// into sendAppendEntry() directly from here.
17171722
if maybeLeader != noLeader {
17181723
n.debug("Selected %q for new leader, stepping down due to leadership transfer", maybeLeader)
17191724
ae := newEntry(EntryLeaderTransfer, []byte(maybeLeader))
@@ -1829,13 +1834,16 @@ func (n *raft) Peers() []*Peer {
18291834
// Update and propose our known set of peers.
18301835
func (n *raft) ProposeKnownPeers(knownPeers []string) {
18311836
n.Lock()
1832-
defer n.Unlock()
18331837
// If we are the leader update and send this update out.
18341838
if n.State() != Leader {
1839+
n.Unlock()
18351840
return
18361841
}
18371842
n.updateKnownPeersLocked(knownPeers)
1838-
n.sendPeerState()
1843+
n.Unlock()
1844+
1845+
n.prop.push(newProposedEntry(
1846+
newEntry(EntryPeerState, encodePeerState(n.currentPeerState())), _EMPTY_))
18391847
}
18401848

18411849
// Update our known set of peers.
@@ -2630,12 +2638,11 @@ func (n *raft) runAsLeader() {
26302638
n.unsubscribe(rpsub)
26312639
n.Unlock()
26322640
}()
2633-
2634-
// To send out our initial peer state.
2635-
n.sendPeerState()
26362641
n.Unlock()
26372642

26382643
var propBatch []*proposedEntry
2644+
n.sendAppendEntry(
2645+
[]*Entry{{EntryPeerState, encodePeerState(n.currentPeerState())}})
26392646

26402647
hb := time.NewTicker(hbInterval)
26412648
defer hb.Stop()
@@ -2739,6 +2746,14 @@ func (n *raft) composeBatch(allProposals []*proposedEntry) ([]*Entry, []*propose
27392746
end := 0
27402747
for end < len(allProposals) {
27412748
p := allProposals[end]
2749+
// If we have a snapshot do not batch with anything else.
2750+
if p.Type == EntrySnapshot {
2751+
if end == 0 {
2752+
sz = len(p.Data) + 1
2753+
end = 1
2754+
}
2755+
break
2756+
}
27422757
sz += len(p.Data) + 1
27432758
end++
27442759
if sz < maxBatch && end < maxEntries {
@@ -3812,13 +3827,24 @@ CONTINUE:
38123827
if ae.shouldStore() {
38133828
// Only store if an original which will have sub != nil
38143829
if sub != nil {
3815-
if err := n.storeToWAL(ae); err != nil {
3830+
n.Unlock()
3831+
size, seq, err := n.storeToWAL(ae)
3832+
n.Lock()
3833+
if err != nil {
38163834
if err != ErrStoreClosed {
38173835
n.warn("Error storing entry to WAL: %v", err)
38183836
}
3837+
if err == errEntryStoreFailed {
3838+
n.resetWAL()
3839+
n.cancelCatchup()
3840+
}
38193841
n.Unlock()
38203842
return
38213843
}
3844+
n.bytes += size
3845+
n.pterm = ae.term
3846+
n.pindex = seq
3847+
n.active = time.Now()
38223848
n.cachePendingEntry(ae)
38233849
n.resetInitializing()
38243850
} else {
@@ -3979,50 +4005,54 @@ func (n *raft) buildAppendEntry(entries []*Entry) *appendEntry {
39794005
return newAppendEntry(n.id, n.term, n.commit, n.pterm, n.pindex, entries)
39804006
}
39814007

3982-
// Determine if we should store an entry. This stops us from storing
3983-
// heartbeat messages.
4008+
// Determine if we should store an entry.
4009+
// This stops us from storing heartbeat and leader transfer messages.
39844010
func (ae *appendEntry) shouldStore() bool {
3985-
return ae != nil && len(ae.entries) > 0
4011+
if ae == nil {
4012+
return false
4013+
}
4014+
l := len(ae.entries)
4015+
if l == 0 {
4016+
return false
4017+
}
4018+
if l == 1 {
4019+
return ae.entries[0].Type != EntryLeaderTransfer
4020+
}
4021+
return true
4022+
}
4023+
4024+
func (ae *appendEntry) shouldCheckLeader() bool {
4025+
if ae != nil && len(ae.entries) == 1 &&
4026+
ae.entries[0].Type == EntrySnapshot {
4027+
return true
4028+
}
4029+
return false
39864030
}
39874031

39884032
// Store our append entry to our WAL.
3989-
// lock should be held.
3990-
func (n *raft) storeToWAL(ae *appendEntry) error {
4033+
// Returns the number of bytes written and the sequence number
4034+
// assigned to the message.
4035+
func (n *raft) storeToWAL(ae *appendEntry) (uint64, uint64, error) {
39914036
if ae == nil {
3992-
return fmt.Errorf("raft: Missing append entry for storage")
4037+
return 0, 0, fmt.Errorf("raft: Missing append entry for storage")
39934038
}
4039+
39944040
if n.werr != nil {
3995-
return n.werr
4041+
return 0, 0, n.werr
39964042
}
39974043

39984044
seq, _, err := n.wal.StoreMsg(_EMPTY_, nil, ae.buf, 0)
39994045
if err != nil {
4000-
n.setWriteErrLocked(err)
4001-
return err
4046+
return 0, 0, err
40024047
}
4003-
40044048
// Sanity checking for now.
40054049
if index := ae.pindex + 1; index != seq {
40064050
n.warn("Wrong index, ae is %+v, index stored was %d, n.pindex is %d, will reset", ae, seq, n.pindex)
4007-
if n.State() == Leader {
4008-
n.stepdownLocked(n.selectNextLeader())
4009-
}
4010-
// Reset and cancel any catchup.
4011-
n.resetWAL()
4012-
n.cancelCatchup()
4013-
return errEntryStoreFailed
4051+
return 0, 0, errEntryStoreFailed
40144052
}
40154053

4016-
var sz uint64
4017-
if n.wtype == FileStorage {
4018-
sz = fileStoreMsgSize(_EMPTY_, nil, ae.buf)
4019-
} else {
4020-
sz = memStoreMsgSize(_EMPTY_, nil, ae.buf)
4021-
}
4022-
n.bytes += sz
4023-
n.pterm = ae.term
4024-
n.pindex = seq
4025-
return nil
4054+
sz := n.wal.MsgSize(ae.buf)
4055+
return sz, seq, nil
40264056
}
40274057

40284058
const (
@@ -4031,19 +4061,25 @@ const (
40314061
paeWarnModulo = 5_000
40324062
)
40334063

4064+
// sendAppendEntry builds a appendEntry and stores it to the WAL,
4065+
// before sending it to the followers.
4066+
// It is expected for this method to be called from Raft's main
4067+
// goroutine, unless the appendEntry does not need to be stored
4068+
// to the WAL (heartbeat or EntryLeaderTransfer)
40344069
func (n *raft) sendAppendEntry(entries []*Entry) {
4035-
n.Lock()
4036-
defer n.Unlock()
4037-
n.sendAppendEntryLocked(entries, true)
4038-
}
4039-
func (n *raft) sendAppendEntryLocked(entries []*Entry, checkLeader bool) {
4040-
// Safeguard against sending an append entry right after a stepdown from a different goroutine.
4041-
// Specifically done while holding the lock to not race.
4042-
if checkLeader && n.State() != Leader {
4070+
// Safeguard against sending an append entry right after a stepdown
4071+
// from a different goroutine. Specifically done while holding the
4072+
// lock to not race.
4073+
n.RLock()
4074+
state := n.State()
4075+
ae := n.buildAppendEntry(entries)
4076+
n.RUnlock()
4077+
4078+
if ae.shouldCheckLeader() && state != Leader {
40434079
n.debug("Not sending append entry, not leader")
4080+
ae.returnToPool()
40444081
return
40454082
}
4046-
ae := n.buildAppendEntry(entries)
40474083

40484084
var err error
40494085
var scratch [1024]byte
@@ -4055,12 +4091,30 @@ func (n *raft) sendAppendEntryLocked(entries []*Entry, checkLeader bool) {
40554091
// If we have entries store this in our wal.
40564092
shouldStore := ae.shouldStore()
40574093
if shouldStore {
4058-
if err := n.storeToWAL(ae); err != nil {
4094+
size, seq, err := n.storeToWAL(ae)
4095+
n.Lock()
4096+
if err != nil {
4097+
n.setWriteErrLocked(err)
4098+
if err == errEntryStoreFailed {
4099+
if n.State() == Leader {
4100+
n.stepdownLocked(n.selectNextLeader())
4101+
}
4102+
// are we sure we want this?
4103+
n.resetWAL()
4104+
n.cancelCatchup()
4105+
}
4106+
n.Unlock()
40594107
return
40604108
}
4109+
4110+
n.bytes += size
4111+
n.pterm = ae.term
4112+
n.pindex = seq
40614113
n.active = time.Now()
40624114
n.cachePendingEntry(ae)
4115+
n.Unlock()
40634116
}
4117+
40644118
n.sendRPC(n.asubj, n.areply, ae.buf)
40654119
if !shouldStore {
40664120
ae.returnToPool()
@@ -4149,23 +4203,15 @@ func (n *raft) peerNames() []string {
41494203

41504204
func (n *raft) currentPeerState() *peerState {
41514205
n.RLock()
4152-
ps := n.currentPeerStateLocked()
4153-
n.RUnlock()
4154-
return ps
4155-
}
4156-
4157-
func (n *raft) currentPeerStateLocked() *peerState {
4206+
defer n.RUnlock()
41584207
return &peerState{n.peerNames(), n.csz, n.extSt}
41594208
}
41604209

4161-
// sendPeerState will send our current peer state to the cluster.
4162-
// Lock should be held.
4163-
func (n *raft) sendPeerState() {
4164-
n.sendAppendEntryLocked([]*Entry{{EntryPeerState, encodePeerState(n.currentPeerStateLocked())}}, true)
4165-
}
4166-
41674210
// Send a heartbeat.
41684211
func (n *raft) sendHeartbeat() {
4212+
// OK to call sendAppendEntry() directly here.
4213+
// No need to push heardbeats into prop queue
4214+
// because we don't store those into the log.
41694215
n.sendAppendEntry(nil)
41704216
}
41714217

0 commit comments

Comments
 (0)