Skip to content

Commit e165052

Browse files
hczhu-dbyuchen-db
authored andcommitted
Fix issues from lazy retrieval
Fix a lazy retrieval timer issue Add a flag to control client keepalive ping interval Fix linter Update streamer tool Fix ignore warnings
1 parent 9973b9e commit e165052

File tree

6 files changed

+81
-14
lines changed

6 files changed

+81
-14
lines changed

cmd/thanos/query.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,9 @@ func registerQuery(app *extkingpin.App) {
251251
lazyRetrievalMaxBufferedResponses := cmd.Flag("query.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled.").
252252
Default("20").Int()
253253

254+
grpcStoreClientKeepAlivePingInterval := extkingpin.ModelDuration(cmd.Flag("query.grcp-store-client-keep-alive-ping-interval", "This value defines how often a store client sends a keepalive ping on an established gRPC stream. 0 means not to set. NB: a client is keeping a long‐running gRPC stream open. It still has active RPCs on the wire—even if Recv() is not called in a while. Setting PermitWithoutStream=false only stops pings when no streams exist; it does not suppress pings during an open stream").
255+
Default("0s"))
256+
254257
var storeRateLimits store.SeriesSelectLimits
255258
storeRateLimits.RegisterFlags(cmd)
256259

@@ -393,6 +396,7 @@ func registerQuery(app *extkingpin.App) {
393396
*rewriteAggregationLabelStrategy,
394397
*rewriteAggregationLabelTo,
395398
*lazyRetrievalMaxBufferedResponses,
399+
time.Duration(*grpcStoreClientKeepAlivePingInterval),
396400
)
397401
})
398402
}
@@ -480,6 +484,7 @@ func runQuery(
480484
rewriteAggregationLabelStrategy string,
481485
rewriteAggregationLabelTo string,
482486
lazyRetrievalMaxBufferedResponses int,
487+
grpcStoreClientKeepAlivePingInterval time.Duration,
483488
) error {
484489
comp := component.Query
485490
if alertQueryURL == "" {
@@ -499,6 +504,11 @@ func runQuery(
499504
if err != nil {
500505
return errors.Wrap(err, "building gRPC client")
501506
}
507+
if grpcStoreClientKeepAlivePingInterval > 0 {
508+
clientParameters := extgrpc.GetDefaultKeepaliveClientParameters()
509+
clientParameters.Time = grpcStoreClientKeepAlivePingInterval
510+
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(clientParameters))
511+
}
502512
if grpcCompression != compressionNone {
503513
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(grpcCompression)))
504514
}

cmd/thanos/streamer.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type StreamerConfig struct {
3939
storeAddrPort string
4040
streamTimeoutSeconds int
4141
replicaLabel string
42+
ignoreWarnings bool
4243
}
4344

4445
func registerStreamer(app *extkingpin.App) {
@@ -48,6 +49,7 @@ func registerStreamer(app *extkingpin.App) {
4849
cmd.Flag("store", "Thanos Store API gRPC endpoint").Default("localhost:10901").StringVar(&config.storeAddrPort)
4950
cmd.Flag("stream.timeout_seconds", "One stream's overall timeout in seconds ").Default("36000").IntVar(&config.streamTimeoutSeconds)
5051
cmd.Flag("stream.replica_label", "Drop this replica label from all returns time series and dedup them.").Default("").StringVar(&config.replicaLabel)
52+
cmd.Flag("stream.ignore_warnings", "Don't return an error when a warning response is received. --no-stream.ignore_warnings to disable").Default("true").BoolVar(&config.ignoreWarnings)
5153

5254
hc := &httpConfig{}
5355
hc = hc.registerFlag(cmd)
@@ -262,10 +264,14 @@ func (s *Streamer) streamOneRequest(request *streamer.StreamerRequest, writer io
262264
return writeResponse(&streamer.StreamerResponse{Err: err.Error()})
263265
}
264266
if warning := response.GetWarning(); warning != "" {
265-
level.Error(s.logger).Log(
266-
"warning", warning,
267+
level.Warn(s.logger).Log(
267268
"msg", "warning response from Store gRPC stream",
269+
"warning", warning,
270+
"ignore_warnings", s.config.ignoreWarnings,
268271
"request_id", request.RequestId)
272+
if s.config.ignoreWarnings {
273+
continue
274+
}
269275
return writeResponse(&streamer.StreamerResponse{Err: fmt.Sprintf("warning response from Store gRPC stream: %s", warning)})
270276
}
271277
seriesResp := response.GetSeries()

cmd/thanos/tools_metric.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ import (
2525
)
2626

2727
type rawMetricConfig struct {
28-
storeAddr string
29-
metric string
30-
hoursAgo int
31-
skipChunks bool
28+
storeAddr string
29+
metric string
30+
hoursAgo int
31+
skipChunks bool
32+
equalLabelMatcher string
33+
notEqualLabelMatcher string
3234
}
3335

3436
func registerFlags(cmd extkingpin.FlagClause) *rawMetricConfig {
@@ -37,6 +39,8 @@ func registerFlags(cmd extkingpin.FlagClause) *rawMetricConfig {
3739
cmd.Flag("metric", "The metric name to stream time series for this metric").Default("node_cpu_seconds_total").StringVar(&conf.metric)
3840
cmd.Flag("hours_ago", "Stream the metric from this number of hours ago").Default("16").IntVar(&conf.hoursAgo)
3941
cmd.Flag("skip_chunks", "Skip chunks in the response").Default("false").BoolVar(&conf.skipChunks)
42+
cmd.Flag("eq_label_matcher", "One label matcher using equal").Default("").StringVar(&conf.equalLabelMatcher)
43+
cmd.Flag("neq_label_matcher", "One label matcher using not equal").Default("").StringVar(&conf.notEqualLabelMatcher)
4044
return conf
4145
}
4246

@@ -72,6 +76,20 @@ func streamMetric(conf *rawMetricConfig, logger log.Logger) error {
7276
labelMatchers := []storepb.LabelMatcher{
7377
{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: conf.metric},
7478
}
79+
addMatcher := func(mtype storepb.LabelMatcher_Type, matcher string) {
80+
parts := strings.Split(matcher, "=")
81+
if len(parts) != 2 {
82+
level.Error(logger).Log("msg", "ignoring an invalid label matcher", "matcher", matcher)
83+
return
84+
}
85+
labelMatchers = append(labelMatchers, storepb.LabelMatcher{
86+
Type: mtype,
87+
Name: parts[0],
88+
Value: parts[1],
89+
})
90+
}
91+
addMatcher(storepb.LabelMatcher_EQ, conf.equalLabelMatcher)
92+
addMatcher(storepb.LabelMatcher_NEQ, conf.notEqualLabelMatcher)
7593
storeReq := &storepb.SeriesRequest{
7694
Aggregates: []storepb.Aggr{storepb.Aggr_RAW},
7795
Matchers: labelMatchers,
@@ -103,9 +121,9 @@ func streamMetric(conf *rawMetricConfig, logger log.Logger) error {
103121
}
104122
series := resPtr.GetSeries()
105123
if series == nil {
106-
return fmt.Errorf("Got a nil series")
124+
return fmt.Errorf("got a nil series")
107125
}
108-
if 0 == (seq % 1000) {
126+
if (seq % 1000) == 0 {
109127
level.Info(logger).Log("msg", "streaming time series", "seq", seq)
110128
}
111129
metric := ""

cmd/thanos/tools_streamer.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@ import (
2323
)
2424

2525
type streamerToolConfig struct {
26-
socketPath string
27-
metric string
28-
hoursAgo int
29-
skipChunks bool
26+
socketPath string
27+
metric string
28+
hoursAgo int
29+
skipChunks bool
30+
equalLabelMatcher string
31+
notEqualLabelMatcher string
32+
sleepSeconds int
3033
}
3134

3235
func registerStreamerTool(app extkingpin.AppClause) {
@@ -37,6 +40,9 @@ func registerStreamerTool(app extkingpin.AppClause) {
3740
cmd.Flag("metric", "The metric name to stream time series for this metric").Default("node_cpu_seconds_total").StringVar(&conf.metric)
3841
cmd.Flag("hours_ago", "Stream the metric from this number of hours ago").Default("16").IntVar(&conf.hoursAgo)
3942
cmd.Flag("skip_chunks", "Skip chunks in the response").Default("false").BoolVar(&conf.skipChunks)
43+
cmd.Flag("eq_label_matcher", "One label matcher using equal").Default("").StringVar(&conf.equalLabelMatcher)
44+
cmd.Flag("neq_label_matcher", "One label matcher using not equal").Default("").StringVar(&conf.notEqualLabelMatcher)
45+
cmd.Flag("sleep_seconds_between_calls", "Sleep this amount of seconds between calling two gRPC").Default("0").IntVar(&conf.sleepSeconds)
4046

4147
cmd.Setup(func(
4248
g *run.Group,
@@ -72,6 +78,20 @@ func runStreamerTool(conf *streamerToolConfig, logger log.Logger) error {
7278
SkipChunks: conf.skipChunks,
7379
Metric: conf.metric,
7480
}
81+
addMatcher := func(mtype streamer_pkg.LabelMatcher_Type, matcher string) {
82+
parts := strings.Split(matcher, "=")
83+
if len(parts) != 2 {
84+
level.Error(logger).Log("msg", "ignoring an invalid label matcher", "matcher", matcher)
85+
return
86+
}
87+
request.LabelMatchers = append(request.LabelMatchers, streamer_pkg.LabelMatcher{
88+
Type: mtype,
89+
Name: parts[0],
90+
Value: parts[1],
91+
})
92+
}
93+
addMatcher(streamer_pkg.LabelMatcher_EQ, conf.equalLabelMatcher)
94+
addMatcher(streamer_pkg.LabelMatcher_NEQ, conf.notEqualLabelMatcher)
7595

7696
level.Info(logger).Log(
7797
"msg", "sending a socket request to Thanos streamer",
@@ -120,7 +140,7 @@ func runStreamerTool(conf *streamerToolConfig, logger log.Logger) error {
120140
level.Error(logger).Log("msg", "error in response", "err", resp.Err)
121141
return nil
122142
}
123-
if 0 == (seq % 1000) {
143+
if (seq % 1000) == 0 {
124144
level.Info(logger).Log("msg", "streaming time series", "seq", seq)
125145
}
126146
series := resp.Data
@@ -147,6 +167,9 @@ func runStreamerTool(conf *streamerToolConfig, logger log.Logger) error {
147167
order++
148168
}
149169
fmt.Printf("\n")
170+
if conf.sleepSeconds > 0 {
171+
time.Sleep(time.Duration(conf.sleepSeconds) * time.Second)
172+
}
150173
}
151174
return fmt.Errorf("unexpected interruption: %w", scanner.Err())
152175
}

pkg/extgrpc/client.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ func EndpointGroupGRPCOpts() []grpc.DialOption {
4444
}
4545
}
4646

47+
func GetDefaultKeepaliveClientParameters() keepalive.ClientParameters {
48+
return keepalive.ClientParameters{Time: 10 * time.Second, Timeout: 5 * time.Second}
49+
}
50+
4751
// StoreClientGRPCOpts creates gRPC dial options for connecting to a store client.
4852
func StoreClientGRPCOpts(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, secure, skipVerify bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) {
4953
grpcMets := grpc_prometheus.NewClientMetrics(
@@ -71,7 +75,7 @@ func StoreClientGRPCOpts(logger log.Logger, reg prometheus.Registerer, tracer op
7175
grpcMets.StreamClientInterceptor(),
7276
tracing.StreamClientInterceptor(tracer),
7377
),
74-
grpc.WithKeepaliveParams(keepalive.ClientParameters{Time: 10 * time.Second, Timeout: 5 * time.Second}),
78+
grpc.WithKeepaliveParams(GetDefaultKeepaliveClientParameters()),
7579
}
7680
if reg != nil {
7781
reg.MustRegister(grpcMets)

pkg/store/proxy_merge.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"fmt"
99
"io"
10+
"math"
1011
"sort"
1112
"sync"
1213
"time"
@@ -451,6 +452,11 @@ func newLazyRespSet(
451452
l.bufferedResponsesMtx.Unlock()
452453
return false
453454
}
455+
if t != nil {
456+
// frameTimeout only applies to cl.Recv() gRPC call because the goroutine may be blocked on waiting for an empty buffer slot.
457+
// Set the timeout to the largest possible value to void triggering it.
458+
t.Reset(time.Duration(math.MaxInt64))
459+
}
454460

455461
numResponses++
456462
bytesProcessed += resp.Size()

0 commit comments

Comments
 (0)