Skip to content

Commit 9973b9e

Browse files
hczhu-dbyuchen-db
authored andcommitted
Limit lazyRespSet memory buffer size
Addressed comments Fix linter goformat Fix unit tests
1 parent 1950324 commit 9973b9e

File tree

6 files changed

+135
-50
lines changed

6 files changed

+135
-50
lines changed

cmd/thanos/query.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,9 @@ func registerQuery(app *extkingpin.App) {
248248
rewriteAggregationLabelStrategy := cmd.Flag("query.aggregation-label-strategy", "The strategy to use when rewriting aggregation labels. Used during aggregator migration only.").Default(string(query.NoopLabelRewriter)).Hidden().Enum(string(query.NoopLabelRewriter), string(query.UpsertLabelRewriter), string(query.InsertOnlyLabelRewriter))
249249
rewriteAggregationLabelTo := cmd.Flag("query.aggregation-label-value-override", "The value override for aggregation label. If set to x, all queries on aggregated metrics will have a `__agg_rule_type__=x` matcher. If empty, this behavior is disabled. Default is empty.").Hidden().Default("").String()
250250

251+
lazyRetrievalMaxBufferedResponses := cmd.Flag("query.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled.").
252+
Default("20").Int()
253+
251254
var storeRateLimits store.SeriesSelectLimits
252255
storeRateLimits.RegisterFlags(cmd)
253256

@@ -389,6 +392,7 @@ func registerQuery(app *extkingpin.App) {
389392
*enableGroupReplicaPartialStrategy,
390393
*rewriteAggregationLabelStrategy,
391394
*rewriteAggregationLabelTo,
395+
*lazyRetrievalMaxBufferedResponses,
392396
)
393397
})
394398
}
@@ -475,6 +479,7 @@ func runQuery(
475479
groupReplicaPartialResponseStrategy bool,
476480
rewriteAggregationLabelStrategy string,
477481
rewriteAggregationLabelTo string,
482+
lazyRetrievalMaxBufferedResponses int,
478483
) error {
479484
comp := component.Query
480485
if alertQueryURL == "" {
@@ -562,6 +567,7 @@ func runQuery(
562567
store.WithTSDBSelector(tsdbSelector),
563568
store.WithProxyStoreDebugLogging(debugLogging),
564569
store.WithQuorumChunkDedup(queryDeduplicationFunc == dedup.AlgorithmQuorum),
570+
store.WithLazyRetrievalMaxBufferedResponsesForProxy(lazyRetrievalMaxBufferedResponses),
565571
}
566572

567573
// Parse and sanitize the provided replica labels flags.

pkg/store/bucket.go

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,8 @@ type BucketStore struct {
422422
enabledLazyExpandedPostings bool
423423

424424
sortingStrategy sortingStrategy
425+
// This flag limits memory usage when lazy retrieval strategy, newLazyRespSet(), is used.
426+
lazyRetrievalMaxBufferedResponses int
425427

426428
blockEstimatedMaxSeriesFunc BlockEstimator
427429
blockEstimatedMaxChunkFunc BlockEstimator
@@ -561,6 +563,14 @@ func WithDontResort(true bool) BucketStoreOption {
561563
}
562564
}
563565

566+
func WithLazyRetrievalMaxBufferedResponsesForBucket(n int) BucketStoreOption {
567+
return func(s *BucketStore) {
568+
if true {
569+
s.lazyRetrievalMaxBufferedResponses = n
570+
}
571+
}
572+
}
573+
564574
// WithIndexHeaderLazyDownloadStrategy specifies what block to lazy download its index header.
565575
// Only used when lazy mmap is enabled at the same time.
566576
func WithIndexHeaderLazyDownloadStrategy(strategy indexheader.LazyDownloadIndexHeaderFunc) BucketStoreOption {
@@ -597,23 +607,24 @@ func NewBucketStore(
597607
b := make([]byte, 0, initialBufSize)
598608
return &b
599609
}},
600-
chunkPool: pool.NoopPool[byte]{},
601-
blocks: map[ulid.ULID]*bucketBlock{},
602-
blockSets: map[uint64]*bucketBlockSet{},
603-
blockSyncConcurrency: blockSyncConcurrency,
604-
queryGate: gate.NewNoop(),
605-
chunksLimiterFactory: chunksLimiterFactory,
606-
seriesLimiterFactory: seriesLimiterFactory,
607-
bytesLimiterFactory: bytesLimiterFactory,
608-
partitioner: partitioner,
609-
enableCompatibilityLabel: enableCompatibilityLabel,
610-
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
611-
enableSeriesResponseHints: enableSeriesResponseHints,
612-
enableChunkHashCalculation: enableChunkHashCalculation,
613-
seriesBatchSize: SeriesBatchSize,
614-
sortingStrategy: sortingStrategyStore,
615-
indexHeaderLazyDownloadStrategy: indexheader.AlwaysEagerDownloadIndexHeader,
616-
requestLoggerFunc: NoopRequestLoggerFunc,
610+
chunkPool: pool.NoopPool[byte]{},
611+
blocks: map[ulid.ULID]*bucketBlock{},
612+
blockSets: map[uint64]*bucketBlockSet{},
613+
blockSyncConcurrency: blockSyncConcurrency,
614+
queryGate: gate.NewNoop(),
615+
chunksLimiterFactory: chunksLimiterFactory,
616+
seriesLimiterFactory: seriesLimiterFactory,
617+
bytesLimiterFactory: bytesLimiterFactory,
618+
partitioner: partitioner,
619+
enableCompatibilityLabel: enableCompatibilityLabel,
620+
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
621+
enableSeriesResponseHints: enableSeriesResponseHints,
622+
enableChunkHashCalculation: enableChunkHashCalculation,
623+
seriesBatchSize: SeriesBatchSize,
624+
sortingStrategy: sortingStrategyStore,
625+
lazyRetrievalMaxBufferedResponses: 1,
626+
indexHeaderLazyDownloadStrategy: indexheader.AlwaysEagerDownloadIndexHeader,
627+
requestLoggerFunc: NoopRequestLoggerFunc,
617628
}
618629

619630
for _, option := range options {
@@ -1613,6 +1624,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
16131624
nil,
16141625
)
16151626
} else {
1627+
lazyRetrievalMaxBufferedResponses := s.lazyRetrievalMaxBufferedResponses
1628+
if lazyRetrievalMaxBufferedResponses < 1 {
1629+
// Some unit and e2e tests hit this path.
1630+
lazyRetrievalMaxBufferedResponses = 1
1631+
}
16161632
resp = newLazyRespSet(
16171633
span,
16181634
10*time.Minute,
@@ -1623,6 +1639,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
16231639
shardMatcher,
16241640
false,
16251641
s.metrics.emptyPostingCount.WithLabelValues(tenant),
1642+
lazyRetrievalMaxBufferedResponses,
16261643
)
16271644
}
16281645

pkg/store/bucket_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1780,12 +1780,13 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
17801780
b1.meta.ULID: b1,
17811781
b2.meta.ULID: b2,
17821782
},
1783-
queryGate: gate.NewNoop(),
1784-
chunksLimiterFactory: NewChunksLimiterFactory(0),
1785-
seriesLimiterFactory: NewSeriesLimiterFactory(0),
1786-
bytesLimiterFactory: NewBytesLimiterFactory(0),
1787-
seriesBatchSize: SeriesBatchSize,
1788-
requestLoggerFunc: NoopRequestLoggerFunc,
1783+
queryGate: gate.NewNoop(),
1784+
chunksLimiterFactory: NewChunksLimiterFactory(0),
1785+
seriesLimiterFactory: NewSeriesLimiterFactory(0),
1786+
bytesLimiterFactory: NewBytesLimiterFactory(0),
1787+
seriesBatchSize: SeriesBatchSize,
1788+
requestLoggerFunc: NoopRequestLoggerFunc,
1789+
lazyRetrievalMaxBufferedResponses: 1,
17891790
}
17901791

17911792
t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) {

pkg/store/proxy.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,15 @@ type ProxyStore struct {
9393
selectorLabels labels.Labels
9494
buffers sync.Pool
9595

96-
responseTimeout time.Duration
97-
metrics *proxyStoreMetrics
98-
retrievalStrategy RetrievalStrategy
99-
debugLogging bool
100-
tsdbSelector *TSDBSelector
101-
quorumChunkDedup bool
102-
enableDedup bool
103-
matcherConverter *storepb.MatcherConverter
96+
responseTimeout time.Duration
97+
metrics *proxyStoreMetrics
98+
retrievalStrategy RetrievalStrategy
99+
debugLogging bool
100+
tsdbSelector *TSDBSelector
101+
quorumChunkDedup bool
102+
enableDedup bool
103+
matcherConverter *storepb.MatcherConverter
104+
lazyRetrievalMaxBufferedResponses int
104105
}
105106

106107
type proxyStoreMetrics struct {
@@ -137,6 +138,12 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(*
137138
// BucketStoreOption are functions that configure BucketStore.
138139
type ProxyStoreOption func(s *ProxyStore)
139140

141+
func WithLazyRetrievalMaxBufferedResponsesForProxy(buferSize int) ProxyStoreOption {
142+
return func(s *ProxyStore) {
143+
s.lazyRetrievalMaxBufferedResponses = buferSize
144+
}
145+
}
146+
140147
// WithProxyStoreDebugLogging toggles debug logging.
141148
func WithProxyStoreDebugLogging(enable bool) ProxyStoreOption {
142149
return func(s *ProxyStore) {
@@ -390,11 +397,10 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
390397
}
391398
}
392399
defer logGroupReplicaErrors()
393-
394400
for _, st := range stores {
395401
st := st
396402

397-
respSet, err := newAsyncRespSet(ctx, st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses)
403+
respSet, err := newAsyncRespSet(ctx, st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses, s.lazyRetrievalMaxBufferedResponses)
398404
if err != nil {
399405
level.Warn(s.logger).Log("msg", "Store failure", "group", st.GroupKey(), "replica", st.ReplicaKey(), "err", err)
400406
s.metrics.storeFailureCount.WithLabelValues(st.GroupKey(), st.ReplicaKey()).Inc()

pkg/store/proxy_merge.go

Lines changed: 67 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,20 @@ type lazyRespSet struct {
255255
frameTimeout time.Duration
256256

257257
// Internal bookkeeping.
258-
dataOrFinishEvent *sync.Cond
259-
bufferedResponses []*storepb.SeriesResponse
258+
dataOrFinishEvent *sync.Cond
259+
// This event firing means the buffer has a slot for more data.
260+
bufferSlotEvent *sync.Cond
261+
fixedBufferSize int
262+
// This a ring buffer of size fixedBufferSize.
263+
// A ring buffer of size N can hold N - 1 elements at most in order to distinguish being empty from being full.
264+
bufferedResponses []*storepb.SeriesResponse
265+
// ringHead points to the first element in the ring buffer.
266+
// ringTail points to the slot after the last element in the ring buffer.
267+
// if ringHead == ringTail then the buffer is empty.
268+
// if ringHead == (ringTail + 1) % fixedBufferSize then the buffer is full.
269+
ringHead int
270+
ringTail int
271+
closed bool
260272
bufferedResponsesMtx *sync.Mutex
261273
lastResp *storepb.SeriesResponse
262274

@@ -266,24 +278,32 @@ type lazyRespSet struct {
266278
shardMatcher *storepb.ShardMatcher
267279
}
268280

281+
func (l *lazyRespSet) isEmpty() bool {
282+
return l.ringHead == l.ringTail
283+
}
284+
285+
func (l *lazyRespSet) isFull() bool {
286+
return (l.ringTail+1)%l.fixedBufferSize == l.ringHead
287+
}
288+
269289
func (l *lazyRespSet) Empty() bool {
270290
l.bufferedResponsesMtx.Lock()
271291
defer l.bufferedResponsesMtx.Unlock()
272292

273293
// NOTE(GiedriusS): need to wait here for at least one
274294
// response so that we could build the heap properly.
275-
if l.noMoreData && len(l.bufferedResponses) == 0 {
295+
if l.noMoreData && l.isEmpty() {
276296
return true
277297
}
278298

279-
for len(l.bufferedResponses) == 0 {
299+
for l.isEmpty() {
280300
l.dataOrFinishEvent.Wait()
281-
if l.noMoreData && len(l.bufferedResponses) == 0 {
301+
if l.noMoreData && l.isEmpty() {
282302
break
283303
}
284304
}
285305

286-
return len(l.bufferedResponses) == 0 && l.noMoreData
306+
return l.isEmpty() && l.noMoreData
287307
}
288308

289309
// Next either blocks until more data is available or reads
@@ -295,23 +315,24 @@ func (l *lazyRespSet) Next() bool {
295315

296316
l.initialized = true
297317

298-
if l.noMoreData && len(l.bufferedResponses) == 0 {
318+
if l.noMoreData && l.isEmpty() {
299319
l.lastResp = nil
300320

301321
return false
302322
}
303323

304-
for len(l.bufferedResponses) == 0 {
324+
for l.isEmpty() {
305325
l.dataOrFinishEvent.Wait()
306-
if l.noMoreData && len(l.bufferedResponses) == 0 {
326+
if l.noMoreData && l.isEmpty() {
307327
break
308328
}
309329
}
310330

311-
if len(l.bufferedResponses) > 0 {
312-
l.lastResp = l.bufferedResponses[0]
331+
if !l.isEmpty() {
332+
l.lastResp = l.bufferedResponses[l.ringHead]
313333
if l.initialized {
314-
l.bufferedResponses = l.bufferedResponses[1:]
334+
l.ringHead = (l.ringHead + 1) % l.fixedBufferSize
335+
l.bufferSlotEvent.Signal()
315336
}
316337
return true
317338
}
@@ -338,8 +359,12 @@ func newLazyRespSet(
338359
shardMatcher *storepb.ShardMatcher,
339360
applySharding bool,
340361
emptyStreamResponses prometheus.Counter,
362+
fixedBufferSize int,
341363
) respSet {
342-
bufferedResponses := []*storepb.SeriesResponse{}
364+
// A ring buffer of size N can hold N - 1 elements at most in order to distinguish being empty from being full.
365+
// That's why the size is increased by 1 internally.
366+
fixedBufferSize++
367+
bufferedResponses := make([]*storepb.SeriesResponse, fixedBufferSize)
343368
bufferedResponsesMtx := &sync.Mutex{}
344369
dataAvailable := sync.NewCond(bufferedResponsesMtx)
345370

@@ -351,9 +376,14 @@ func newLazyRespSet(
351376
closeSeries: closeSeries,
352377
span: span,
353378
dataOrFinishEvent: dataAvailable,
379+
bufferSlotEvent: sync.NewCond(bufferedResponsesMtx),
354380
bufferedResponsesMtx: bufferedResponsesMtx,
355381
bufferedResponses: bufferedResponses,
356382
shardMatcher: shardMatcher,
383+
fixedBufferSize: fixedBufferSize,
384+
ringHead: 0,
385+
ringTail: 0,
386+
closed: false,
357387
}
358388
respSet.storeLabels = make(map[string]struct{})
359389
for _, ls := range storeLabelSets {
@@ -406,11 +436,16 @@ func newLazyRespSet(
406436
} else {
407437
rerr = errors.Wrapf(err, "receive series from %s", st)
408438
}
409-
410439
l.span.SetTag("err", rerr.Error())
411440

412441
l.bufferedResponsesMtx.Lock()
413-
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr))
442+
for l.isFull() && !l.closed {
443+
l.bufferSlotEvent.Wait()
444+
}
445+
if !l.closed {
446+
l.bufferedResponses[l.ringTail] = storepb.NewWarnSeriesResponse(rerr)
447+
l.ringTail = (l.ringTail + 1) % l.fixedBufferSize
448+
}
414449
l.noMoreData = true
415450
l.dataOrFinishEvent.Signal()
416451
l.bufferedResponsesMtx.Unlock()
@@ -429,8 +464,14 @@ func newLazyRespSet(
429464
}
430465

431466
l.bufferedResponsesMtx.Lock()
432-
l.bufferedResponses = append(l.bufferedResponses, resp)
433-
l.dataOrFinishEvent.Signal()
467+
for l.isFull() && !l.closed {
468+
l.bufferSlotEvent.Wait()
469+
}
470+
if !l.closed {
471+
l.bufferedResponses[l.ringTail] = resp
472+
l.ringTail = (l.ringTail + 1) % l.fixedBufferSize
473+
l.dataOrFinishEvent.Signal()
474+
}
434475
l.bufferedResponsesMtx.Unlock()
435476
return true
436477
}
@@ -474,6 +515,7 @@ func newAsyncRespSet(
474515
shardInfo *storepb.ShardInfo,
475516
logger log.Logger,
476517
emptyStreamResponses prometheus.Counter,
518+
lazyRetrievalMaxBufferedResponses int,
477519
) (respSet, error) {
478520

479521
var (
@@ -525,6 +567,11 @@ func newAsyncRespSet(
525567
switch retrievalStrategy {
526568
case LazyRetrieval:
527569
span.SetTag("retrival_strategy", LazyRetrieval)
570+
if lazyRetrievalMaxBufferedResponses < 1 {
571+
// Some unit and e2e tests hit this path.
572+
lazyRetrievalMaxBufferedResponses = 1
573+
}
574+
528575
return newLazyRespSet(
529576
span,
530577
frameTimeout,
@@ -535,6 +582,7 @@ func newAsyncRespSet(
535582
shardMatcher,
536583
applySharding,
537584
emptyStreamResponses,
585+
lazyRetrievalMaxBufferedResponses,
538586
), nil
539587
case EagerRetrieval:
540588
span.SetTag("retrival_strategy", EagerRetrieval)
@@ -560,6 +608,8 @@ func (l *lazyRespSet) Close() {
560608
defer l.bufferedResponsesMtx.Unlock()
561609

562610
l.closeSeries()
611+
l.closed = true
612+
l.bufferSlotEvent.Signal()
563613
l.noMoreData = true
564614
l.dataOrFinishEvent.Signal()
565615

0 commit comments

Comments
 (0)