Skip to content

Commit b7a4936

Browse files
committed
Make Raft groups quiet when idle
This patch adds the ability to make Raft groups quiescent, so that leaders stop sending heartbeats after a long idle time. It works as follows: - When a leader detects a long idle time it sends a Quiesce message to its followers, and stops sending heartbeats. - Upon receiving a Quiesce message, followers no longer expect heartbeats from the leader, by stopping their election timers. - A leader or follower unquiesces upon receiving any kind of request: a new proposal, append entries, vote requests, and so on. Potential advantages and uses cases: - Fewer messages over the network and less ticking in deployments with a large number of inactive Raft groups - Read-only streams Disadvantes: - Failures may go unnoticed for a long time (Could be avoided if Raft layer could be notified of dropped connections?) - The first request to a quiesced group may experience higher latency should nodes find out that they need to go through a new election efore proposal can be pushed forward. This is a proof of concept, it's incomplete and meant for experimentation only. Signed-off-by: Daniele Sciascia <[email protected]>
1 parent d034f64 commit b7a4936

File tree

3 files changed

+249
-17
lines changed

3 files changed

+249
-17
lines changed

server/jetstream_api.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,10 @@ const (
198198
JSApiStreamRemovePeer = "$JS.API.STREAM.PEER.REMOVE.*"
199199
JSApiStreamRemovePeerT = "$JS.API.STREAM.PEER.REMOVE.%s"
200200

201+
// JSApiStreamLeaderQuiesce is the endpoint to have stream leader quiesce.
202+
// Will return JSON response.
203+
JSApiStreamLeaderQuiesce = "$JS.API.STREAM.LEADER.QUIESCE.*"
204+
201205
// JSApiStreamLeaderStepDown is the endpoint to have stream leader stepdown.
202206
// Will return JSON response.
203207
JSApiStreamLeaderStepDown = "$JS.API.STREAM.LEADER.STEPDOWN.*"
@@ -612,6 +616,14 @@ type JSApiStreamRemovePeerResponse struct {
612616

613617
const JSApiStreamRemovePeerResponseType = "io.nats.jetstream.api.v1.stream_remove_peer_response"
614618

619+
// JSApiStreamLeaderQuiesceResponse is the response to a leader stepdown request.
620+
type JSApiStreamLeaderQuiesceResponse struct {
621+
ApiResponse
622+
Success bool `json:"success,omitempty"`
623+
}
624+
625+
const JSApiStreamLeaderQuiesceResponseType = "io.nats.jetstream.api.v1.stream_leader_quiesce_response"
626+
615627
// JSApiStreamLeaderStepDownResponse is the response to a leader stepdown request.
616628
type JSApiStreamLeaderStepDownResponse struct {
617629
ApiResponse
@@ -1006,6 +1018,7 @@ func (s *Server) setJetStreamExportSubs() error {
10061018
{JSApiStreamSnapshot, s.jsStreamSnapshotRequest},
10071019
{JSApiStreamRestore, s.jsStreamRestoreRequest},
10081020
{JSApiStreamRemovePeer, s.jsStreamRemovePeerRequest},
1021+
{JSApiStreamLeaderQuiesce, s.jsStreamLeaderQuiesceRequest},
10091022
{JSApiStreamLeaderStepDown, s.jsStreamLeaderStepDownRequest},
10101023
{JSApiConsumerLeaderStepDown, s.jsConsumerLeaderStepDownRequest},
10111024
{JSApiMsgDelete, s.jsMsgDeleteRequest},
@@ -2267,6 +2280,93 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
22672280
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
22682281
}
22692282

2283+
// Request a stream leader to quiesce.
2284+
func (s *Server) jsStreamLeaderQuiesceRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
2285+
if c == nil || !s.JetStreamEnabled() {
2286+
return
2287+
}
2288+
2289+
ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
2290+
if err != nil {
2291+
s.Warnf(badAPIRequestT, msg)
2292+
return
2293+
}
2294+
2295+
// Have extra token for this one.
2296+
name := tokenAt(subject, 6)
2297+
2298+
var resp = JSApiStreamLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiStreamLeaderQuiesceResponseType}}
2299+
if errorOnRequiredApiLevel(hdr) {
2300+
resp.Error = NewJSRequiredApiLevelError()
2301+
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2302+
return
2303+
}
2304+
2305+
js, cc := s.getJetStreamCluster()
2306+
if js == nil || cc == nil {
2307+
return
2308+
}
2309+
if js.isLeaderless() {
2310+
resp.Error = NewJSClusterNotAvailError()
2311+
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2312+
return
2313+
}
2314+
2315+
js.mu.RLock()
2316+
isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, name)
2317+
js.mu.RUnlock()
2318+
2319+
if isLeader && sa == nil {
2320+
resp.Error = NewJSStreamNotFoundError()
2321+
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2322+
return
2323+
} else if sa == nil {
2324+
return
2325+
}
2326+
2327+
if hasJS, doErr := acc.checkJetStream(); !hasJS {
2328+
if doErr {
2329+
resp.Error = NewJSNotEnabledForAccountError()
2330+
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2331+
}
2332+
return
2333+
}
2334+
2335+
// Check to see if we are a member of the group and if the group has no leader.
2336+
if js.isGroupLeaderless(sa.Group) {
2337+
resp.Error = NewJSClusterNotAvailError()
2338+
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2339+
return
2340+
}
2341+
2342+
// We have the stream assigned and a leader, so only the stream leader should answer.
2343+
if !acc.JetStreamIsStreamLeader(name) {
2344+
return
2345+
}
2346+
2347+
mset, err := acc.lookupStream(name)
2348+
if err != nil || mset == nil {
2349+
resp.Error = NewJSStreamNotFoundError(Unless(err))
2350+
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
2351+
return
2352+
}
2353+
2354+
node := mset.raftNode()
2355+
if node == nil {
2356+
resp.Success = true
2357+
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
2358+
return
2359+
}
2360+
2361+
err = node.Quiesce()
2362+
if err != nil {
2363+
resp.Error = NewJSRaftGeneralError(err, Unless(err))
2364+
} else {
2365+
resp.Success = true
2366+
}
2367+
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
2368+
}
2369+
22702370
// Request to have a stream leader stepdown.
22712371
func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
22722372
if c == nil || !s.JetStreamEnabled() {

server/raft.go

Lines changed: 116 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ type RaftNode interface {
8282
Delete()
8383
RecreateInternalSubs() error
8484
IsSystemAccount() bool
85+
Quiesce() error
8586
}
8687

8788
type WAL interface {
@@ -225,6 +226,9 @@ type raft struct {
225226
observer bool // The node is observing, i.e. not able to become leader
226227
initializing bool // The node is new, and "empty log" checks can be temporarily relaxed.
227228
scaleUp bool // The node is part of a scale up, puts us in observer mode until the log contains data.
229+
230+
quiesce chan bool // Channel to notify leader loop to quiesc
231+
quiesced bool // The node is quiesced
228232
}
229233

230234
type proposedEntry struct {
@@ -260,6 +264,7 @@ const (
260264
lostQuorumCheckIntervalDefault = hbIntervalDefault * 10 // 10 seconds
261265
observerModeIntervalDefault = 48 * time.Hour
262266
peerRemoveTimeoutDefault = 5 * time.Minute
267+
quiesceIntervalDefault = 15 * time.Minute
263268
)
264269

265270
var (
@@ -272,6 +277,7 @@ var (
272277
lostQuorumCheck = lostQuorumCheckIntervalDefault
273278
observerModeInterval = observerModeIntervalDefault
274279
peerRemoveTimeout = peerRemoveTimeoutDefault
280+
quiesceInterval = quiesceIntervalDefault
275281
)
276282

277283
type RaftConfig struct {
@@ -426,6 +432,7 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
426432
leadc: make(chan bool, 32),
427433
observer: cfg.Observer,
428434
extSt: ps.domainExt,
435+
quiesce: make(chan bool),
429436
}
430437

431438
// Setup our internal subscriptions for proposals, votes and append entries.
@@ -1601,6 +1608,44 @@ func (n *raft) selectNextLeader() string {
16011608
return nextLeader
16021609
}
16031610

1611+
func (n *raft) Quiesce() error {
1612+
if n.State() != Leader {
1613+
return errNotLeader
1614+
}
1615+
n.quiesce <- true
1616+
return nil
1617+
}
1618+
1619+
// Return true if the node can be quiesced
1620+
func (n *raft) mayQuiesce() bool {
1621+
n.RLock()
1622+
defer n.RUnlock()
1623+
// TODO this test should be strengthened:
1624+
// must check that followers are up-to-date
1625+
return !n.quiesced && n.State() == Leader && n.hasQuorumLocked()
1626+
}
1627+
1628+
func (n *raft) doQuiesce() bool {
1629+
if n.mayQuiesce() {
1630+
n.sendQuiesce()
1631+
n.setQuiesced(true)
1632+
return true
1633+
}
1634+
return false
1635+
}
1636+
1637+
func (n *raft) isQuiesced() bool {
1638+
n.RLock()
1639+
defer n.RUnlock()
1640+
return n.quiesced
1641+
}
1642+
1643+
func (n *raft) setQuiesced(quiesced bool) {
1644+
n.Lock()
1645+
defer n.Unlock()
1646+
n.quiesced = quiesced
1647+
}
1648+
16041649
// StepDown will have a leader stepdown and optionally do a leader transfer.
16051650
func (n *raft) StepDown(preferred ...string) error {
16061651
if n.State() != Leader {
@@ -2140,8 +2185,13 @@ func (n *raft) runAsFollower() {
21402185

21412186
select {
21422187
case <-n.entry.ch:
2188+
wasQuiesced := n.isQuiesced()
21432189
// New append entries have arrived over the network.
21442190
n.processAppendEntries()
2191+
if !wasQuiesced && n.isQuiesced() {
2192+
// Avoid unquiescing immediately
2193+
continue
2194+
}
21452195
case <-n.s.quitCh:
21462196
// The server is shutting down.
21472197
return
@@ -2188,6 +2238,11 @@ func (n *raft) runAsFollower() {
21882238
n.processVoteRequest(voteReq)
21892239
}
21902240
}
2241+
2242+
if n.isQuiesced() {
2243+
n.setQuiesced(false)
2244+
n.debug("Follower unquiesced")
2245+
}
21912246
}
21922247
}
21932248

@@ -2308,6 +2363,7 @@ const (
23082363
EntryRemovePeer
23092364
EntryLeaderTransfer
23102365
EntrySnapshot
2366+
EntryQuiesce
23112367
)
23122368

23132369
func (t EntryType) String() string {
@@ -2326,6 +2382,8 @@ func (t EntryType) String() string {
23262382
return "LeaderTransfer"
23272383
case EntrySnapshot:
23282384
return "Snapshot"
2385+
case EntryQuiesce:
2386+
return "Quiesce"
23292387
}
23302388
return fmt.Sprintf("Unknown [%d]", uint8(t))
23312389
}
@@ -2585,10 +2643,15 @@ func (n *raft) runAsLeader() {
25852643
n.sendPeerState()
25862644

25872645
hb := time.NewTicker(hbInterval)
2588-
defer hb.Stop()
2589-
25902646
lq := time.NewTicker(lostQuorumCheck)
2591-
defer lq.Stop()
2647+
qu := time.NewTicker(quiesceInterval)
2648+
2649+
stopTicking := func() {
2650+
hb.Stop()
2651+
lq.Stop()
2652+
qu.Stop()
2653+
}
2654+
defer stopTicking()
25922655

25932656
for n.State() == Leader {
25942657
select {
@@ -2602,6 +2665,12 @@ func (n *raft) runAsLeader() {
26022665
n.processAppendEntryResponse(ar)
26032666
}
26042667
n.resp.recycle(&ars)
2668+
// TODO follower could avoid sending a response
2669+
// for EntryQuiesce
2670+
if n.isQuiesced() {
2671+
// Avoid unquiescing immediately
2672+
continue
2673+
}
26052674
case <-n.prop.ch:
26062675
const maxBatch = 256 * 1024
26072676
const maxEntries = 512
@@ -2664,15 +2733,31 @@ func (n *raft) runAsLeader() {
26642733
}
26652734
case <-n.entry.ch:
26662735
n.processAppendEntries()
2736+
case <-qu.C:
2737+
if time.Since(n.active) > quiesceInterval && n.doQuiesce() {
2738+
stopTicking()
2739+
continue
2740+
}
2741+
case <-n.quiesce:
2742+
if n.doQuiesce() {
2743+
stopTicking()
2744+
continue
2745+
}
2746+
}
2747+
2748+
// Any interaction unquiesces the leader
2749+
if n.isQuiesced() {
2750+
hb.Reset(hbInterval)
2751+
lq.Reset(lostQuorumInterval)
2752+
qu.Reset(quiesceInterval)
2753+
n.setQuiesced(false)
2754+
n.debug("Leader unquiesced")
26672755
}
26682756
}
26692757
}
26702758

2671-
// Quorum reports the quorum status. Will be called on former leaders.
2672-
func (n *raft) Quorum() bool {
2673-
n.RLock()
2674-
defer n.RUnlock()
2675-
2759+
// Return true if leader believes it still has a quorum.
2760+
func (n *raft) hasQuorumLocked() bool {
26762761
nc := 0
26772762
for id, peer := range n.peers {
26782763
if id == n.id || time.Since(peer.ts) < lostQuorumInterval {
@@ -2684,6 +2769,13 @@ func (n *raft) Quorum() bool {
26842769
return false
26852770
}
26862771

2772+
// Quorum reports the quorum status. Will be called on former leaders.
2773+
func (n *raft) Quorum() bool {
2774+
n.RLock()
2775+
defer n.RUnlock()
2776+
return n.hasQuorumLocked()
2777+
}
2778+
26872779
func (n *raft) lostQuorum() bool {
26882780
n.RLock()
26892781
defer n.RUnlock()
@@ -2698,15 +2790,7 @@ func (n *raft) lostQuorumLocked() bool {
26982790
return false
26992791
}
27002792

2701-
nc := 0
2702-
for id, peer := range n.peers {
2703-
if id == n.id || time.Since(peer.ts) < lostQuorumInterval {
2704-
if nc++; nc >= n.qn {
2705-
return false
2706-
}
2707-
}
2708-
}
2709-
return true
2793+
return !n.hasQuorumLocked()
27102794
}
27112795

27122796
// Check for being not active in terms of sending entries.
@@ -3719,6 +3803,11 @@ CONTINUE:
37193803
// Check to see if we have any related entries to process here.
37203804
for _, e := range ae.entries {
37213805
switch e.Type {
3806+
case EntryQuiesce:
3807+
if isNew && n.State() == Follower {
3808+
n.elect.Stop()
3809+
n.quiesced = true
3810+
}
37223811
case EntryLeaderTransfer:
37233812
// Only process these if they are new, so no replays or catchups.
37243813
if isNew {
@@ -3870,6 +3959,11 @@ func (n *raft) buildAppendEntry(entries []*Entry) *appendEntry {
38703959
// Determine if we should store an entry. This stops us from storing
38713960
// heartbeat messages.
38723961
func (ae *appendEntry) shouldStore() bool {
3962+
if len(ae.entries) == 1 {
3963+
if e := ae.entries[0]; e.Type == EntryQuiesce {
3964+
return false
3965+
}
3966+
}
38733967
return ae != nil && len(ae.entries) > 0
38743968
}
38753969

@@ -4033,6 +4127,11 @@ func (n *raft) sendHeartbeat() {
40334127
n.sendAppendEntry(nil)
40344128
}
40354129

4130+
// Tell the cluster to quiesce the current term
4131+
func (n *raft) sendQuiesce() {
4132+
n.sendAppendEntry([]*Entry{{EntryQuiesce, nil}})
4133+
}
4134+
40364135
type voteRequest struct {
40374136
term uint64
40384137
lastTerm uint64

0 commit comments

Comments
 (0)