Skip to content

Commit 3efc8de

Browse files
[FIXED] Max streams reached with parallel stream creation (#6502)
Maximum streams reached errors would be returned when creating (the same) streams in parallel: - if max streams = 1, the second create request would fail as the previous create request being inflight made it exceed the limit, even if it was about the same stream - if max streams > 1, if the streams already existed would count the streams twice, once for the streams existing already, and another time due to being inflight Signed-off-by: Maurice van Veen <[email protected]>
2 parents aee8cbc + 7201a1c commit 3efc8de

File tree

2 files changed

+80
-2
lines changed

2 files changed

+80
-2
lines changed

server/jetstream_cluster.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6078,7 +6078,14 @@ func (js *jetStream) jsClusteredStreamLimitsCheck(acc *Account, cfg *StreamConfi
60786078
numStreams, reservations := tieredStreamAndReservationCount(asa, tier, cfg)
60796079
// Check for inflight proposals...
60806080
if cc := js.cluster; cc != nil && cc.inflight != nil {
6081-
numStreams += len(cc.inflight[acc.Name])
6081+
streams := cc.inflight[acc.Name]
6082+
numStreams += len(streams)
6083+
// If inflight contains the same stream, don't count toward exceeding maximum.
6084+
if cfg != nil {
6085+
if _, ok := streams[cfg.Name]; ok {
6086+
numStreams--
6087+
}
6088+
}
60826089
}
60836090
if selectedLimits.MaxStreams > 0 && numStreams >= selectedLimits.MaxStreams {
60846091
return NewJSMaximumStreamsLimitError()
@@ -6186,7 +6193,7 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,
61866193
// On success, add this as an inflight proposal so we can apply limits
61876194
// on concurrent create requests while this stream assignment has
61886195
// possibly not been processed yet.
6189-
if streams, ok := cc.inflight[acc.Name]; ok {
6196+
if streams, ok := cc.inflight[acc.Name]; ok && self == nil {
61906197
streams[cfg.Name] = &inflightInfo{rg, syncSubject}
61916198
}
61926199
}

server/jetstream_cluster_1_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3198,6 +3198,77 @@ func TestJetStreamClusterAccountInfoAndLimits(t *testing.T) {
31983198
}
31993199
}
32003200

3201+
func TestJetStreamClusterMaxStreamsReached(t *testing.T) {
3202+
c := createJetStreamClusterExplicit(t, "R3S", 3)
3203+
defer c.shutdown()
3204+
3205+
nc, js := jsClientConnect(t, c.randomNonLeader())
3206+
defer nc.Close()
3207+
3208+
// Adjust our limits.
3209+
c.updateLimits("$G", map[string]JetStreamAccountLimits{
3210+
_EMPTY_: {
3211+
MaxMemory: 1024,
3212+
MaxStore: 1024,
3213+
MaxStreams: 1,
3214+
},
3215+
})
3216+
3217+
// Many stream creations in parallel for the same stream should not result in
3218+
// maximum number of streams reached error. All should have a successful response.
3219+
var wg sync.WaitGroup
3220+
for i := 0; i < 15; i++ {
3221+
wg.Add(1)
3222+
go func() {
3223+
defer wg.Done()
3224+
_, err := js.AddStream(&nats.StreamConfig{
3225+
Name: "TEST",
3226+
Subjects: []string{"foo"},
3227+
Replicas: 1,
3228+
})
3229+
require_NoError(t, err)
3230+
}()
3231+
}
3232+
wg.Wait()
3233+
require_NoError(t, js.DeleteStream("TEST"))
3234+
3235+
// Adjust our limits.
3236+
c.updateLimits("$G", map[string]JetStreamAccountLimits{
3237+
_EMPTY_: {
3238+
MaxMemory: 1024,
3239+
MaxStore: 1024,
3240+
MaxStreams: 2,
3241+
},
3242+
})
3243+
3244+
// Setup streams beforehand.
3245+
for d := 0; d < 2; d++ {
3246+
_, err := js.AddStream(&nats.StreamConfig{
3247+
Name: fmt.Sprintf("TEST-%d", d),
3248+
Subjects: []string{fmt.Sprintf("foo.%d", d)},
3249+
Replicas: 1,
3250+
})
3251+
require_NoError(t, err)
3252+
}
3253+
3254+
// Many stream creations in parallel for streams that already exist should not result in
3255+
// maximum number of streams reached error. All should have a successful response.
3256+
for i := 0; i < 15; i++ {
3257+
wg.Add(1)
3258+
d := i % 2
3259+
go func() {
3260+
defer wg.Done()
3261+
_, err := js.AddStream(&nats.StreamConfig{
3262+
Name: fmt.Sprintf("TEST-%d", d),
3263+
Subjects: []string{fmt.Sprintf("foo.%d", d)},
3264+
Replicas: 1,
3265+
})
3266+
require_NoError(t, err)
3267+
}()
3268+
}
3269+
wg.Wait()
3270+
}
3271+
32013272
func TestJetStreamClusterStreamLimits(t *testing.T) {
32023273
c := createJetStreamClusterExplicit(t, "R3S", 3)
32033274
defer c.shutdown()

0 commit comments

Comments
 (0)