Skip to content

Commit 973bd4a

Browse files
committed
Raft Batching: Maximize batch size
Limit batch size based on the configured max_payload.
1 parent 2866e2e commit 973bd4a

File tree

1 file changed

+26
-9
lines changed

1 file changed

+26
-9
lines changed

server/raft.go

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2673,7 +2673,7 @@ func (n *raft) runAsLeader() {
26732673

26742674
// Send our batch if we have one.
26752675
if len(batchEntries) > 0 {
2676-
log.Println("Batch:", len(batchEntries), "entries", sz, "bytes")
2676+
log.Println("Batch:", len(batchEntries), "entries", sz, "bytes", "maxBatch", n.maxBatchSize())
26772677
n.sendAppendEntry(batchEntries)
26782678
}
26792679

@@ -2732,31 +2732,48 @@ func (n *raft) runAsLeader() {
27322732
}
27332733
}
27342734

2735+
// Returns the maximum number of bytes we can safely
2736+
// send in a single message.
2737+
func (n *raft) maxBatchSize() int {
2738+
max_payload := MAX_PAYLOAD_SIZE
2739+
if n.s.info.MaxPayload > 0 {
2740+
max_payload = int(n.s.info.MaxPayload)
2741+
}
2742+
if n.acc.mpay > 0 {
2743+
max_payload = int(n.acc.mpay)
2744+
}
2745+
return max_payload - MAX_CONTROL_LINE_SIZE
2746+
}
2747+
27352748
// composeBatch will compose a batch from a set of proposals.
27362749
// It will return a batch of entries to be sent and any new leftovers.
27372750
func (n *raft) composeBatch(allProposals []*proposedEntry) ([]*Entry, []*proposedEntry, int) {
2738-
const maxBatch = 256 * 1024
2739-
const maxEntries = 512
2751+
const maxEntries = math.MaxUint16
2752+
maxBatchSize := n.maxBatchSize()
27402753

27412754
if len(allProposals) == 0 {
27422755
return nil, nil, 0
27432756
}
27442757

2745-
var sz int
27462758
end := 0
2759+
batchSize := int(appendEntryBaseLen)
2760+
27472761
for end < len(allProposals) {
27482762
p := allProposals[end]
2763+
msgSize := len(p.Data) + 1 + 4 // to encode type and size
2764+
27492765
// If we have a snapshot do not batch with anything else.
27502766
if p.Type == EntrySnapshot {
27512767
if end == 0 {
2752-
sz = len(p.Data) + 1
2768+
batchSize += msgSize
27532769
end = 1
27542770
}
27552771
break
27562772
}
2757-
sz += len(p.Data) + 1
2758-
end++
2759-
if sz < maxBatch && end < maxEntries {
2773+
2774+
if end == 0 || (batchSize+msgSize) < maxBatchSize && end < maxEntries {
2775+
batchSize += msgSize
2776+
end++
27602777
continue
27612778
}
27622779
break
@@ -2776,7 +2793,7 @@ func (n *raft) composeBatch(allProposals []*proposedEntry) ([]*Entry, []*propose
27762793
entries[i] = p.Entry
27772794
}
27782795

2779-
return entries, newLeftovers, sz
2796+
return entries, newLeftovers, batchSize
27802797
}
27812798

27822799
// Quorum reports the quorum status. Will be called on former leaders.

0 commit comments

Comments
 (0)