Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
833c65b
design v2
yulong-db Apr 18, 2025
289f48c
design v2
yulong-db Apr 18, 2025
2b03e6b
design v2
yulong-db Apr 18, 2025
55a240a
design v2
yulong-db Apr 18, 2025
66ea257
do not forward partial response strategy in proxy store
yuchen-db Apr 19, 2025
8821811
fix coroutine leak
yuchen-db Apr 20, 2025
ba19b89
use defer to unlock
yuchen-db Apr 20, 2025
865c60c
revert close order change
yuchen-db Apr 20, 2025
7a35f32
fix Pull iterator data race (#156)
yuchen-db Apr 21, 2025
08b1526
address feedback
yulong-db Apr 22, 2025
ed15b62
thanos querier aggregation label rewrite design v2 (#154)
yulong-db Apr 23, 2025
e89c45d
cherry pick upstream PR 7945
fpetkovski Nov 28, 2024
5dff045
cherry pick upstream PR 7945 (#158)
yuchen-db Apr 24, 2025
2fb8f8d
use prometheus fork
yuchen-db Apr 24, 2025
a68797a
Replace prometheus/prometheus with yuchen-db/prometheus in go.mod (#159)
yuchen-db Apr 24, 2025
8ff28cd
Limit lazyRespSet memory buffer size
hczhu-db Apr 25, 2025
13c6311
Addressed comments
hczhu-db Apr 28, 2025
6d65034
Fix linter
hczhu-db Apr 28, 2025
2b56361
goformat
hczhu-db Apr 28, 2025
7fa8264
Fix unit tests
hczhu-db Apr 28, 2025
59f5c54
Limit lazyRespSet memory buffer size (#161)
hczhu-db Apr 28, 2025
eafcb9d
update prometheus fork to include more upstream fixes
yuchen-db Apr 30, 2025
18f5c8c
wip
yuchen-db May 1, 2025
146ae2f
add aligned ketama hashring algorithm
yuchen-db May 1, 2025
c6d7c8e
add unit test
yuchen-db May 1, 2025
ec54bde
add sanity check, remove print
yuchen-db May 1, 2025
7e36958
add panic check
yuchen-db May 2, 2025
7878955
fix check
yuchen-db May 2, 2025
bbcb326
Revert "add panic check"
yuchen-db May 2, 2025
b32136b
polish code
yuchen-db May 2, 2025
f2da76b
fix lint
yuchen-db May 2, 2025
40f8359
clean up code
yuchen-db May 2, 2025
e7404dc
Thanos Streamer support ignore warnings from Querier
hczhu-db May 1, 2025
edb1a75
Fix a lazy retrieval timer issue
hczhu-db May 2, 2025
0227197
Add a flag to control client keepalive ping interval
hczhu-db May 2, 2025
2b393c9
Fix linter
hczhu-db May 2, 2025
bdf6675
Update streamer tool
hczhu-db May 2, 2025
c1704dd
clean up code
yuchen-db May 2, 2025
30c03ca
clean up code
yuchen-db May 2, 2025
e0a4a3a
Fix ignore warnings
hczhu-db May 2, 2025
a625125
resolve comments
yuchen-db May 2, 2025
6b16c81
update prometheus fork to include more upstream fixes (#167)
yuchen-db May 2, 2025
e2b2211
Fix issues from lazy retrieval (#168)
hczhu-db May 3, 2025
4c2ae87
Revert "update prometheus fork to include more upstream fixes"
yuchen-db May 3, 2025
f0d3930
Revert "update prometheus fork to include more upstream fixes #167" (…
hczhu-db May 3, 2025
0011ba3
add aligned ketama hashring algorithm (#170)
yuchen-db May 5, 2025
5326676
[ES-1458485] lower no compaction limit
jnyi May 13, 2025
e79dc25
[ES-1458485] lower no compaction limit (#173)
jnyi May 13, 2025
50248ae
do not forward partial response strategy in proxy store (#155)
yuchen-db May 15, 2025
b91ac6b
Add a command flag for lazyRetrievalMaxBufferedResponses in receive
hczhu-db Jun 1, 2025
fa2b9a3
Add a command flag for lazyRetrievalMaxBufferedResponses in receive (…
hczhu-db Jun 2, 2025
77bd1b8
fix dedup data loss
yuchen-db Jun 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,14 @@ func registerQuery(app *extkingpin.App) {
enforceTenancy := cmd.Flag("query.enforce-tenancy", "Enforce tenancy on Query APIs. Responses are returned only if the label value of the configured tenant-label-name and the value of the tenant header matches.").Default("false").Bool()
tenantLabel := cmd.Flag("query.tenant-label-name", "Label name to use when enforcing tenancy (if --query.enforce-tenancy is enabled).").Default(tenancy.DefaultTenantLabel).String()

rewriteAggregationLabelTo := cmd.Flag("query.aggregation-label-value-override", "The value override for __rollup__ label for aggregated metrics. If set to x, all queries on aggregated metrics will have a __rollup__=x matcher. Leave empty to disable this behavior. Default is empty.").Default("").String()
rewriteAggregationLabelStrategy := cmd.Flag("query.aggregation-label-strategy", "The strategy to use when rewriting aggregation labels. Used during aggregator migration only.").Default(string(query.NoopLabelRewriter)).Hidden().Enum(string(query.NoopLabelRewriter), string(query.UpsertLabelRewriter), string(query.InsertOnlyLabelRewriter))
rewriteAggregationLabelTo := cmd.Flag("query.aggregation-label-value-override", "The value override for aggregation label. If set to x, all queries on aggregated metrics will have a `__agg_rule_type__=x` matcher. If empty, this behavior is disabled. Default is empty.").Hidden().Default("").String()

lazyRetrievalMaxBufferedResponses := cmd.Flag("query.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled.").
Default("20").Int()

grpcStoreClientKeepAlivePingInterval := extkingpin.ModelDuration(cmd.Flag("query.grcp-store-client-keep-alive-ping-interval", "This value defines how often a store client sends a keepalive ping on an established gRPC stream. 0 means not to set. NB: a client is keeping a long‐running gRPC stream open. It still has active RPCs on the wire—even if Recv() is not called in a while. Setting PermitWithoutStream=false only stops pings when no streams exist; it does not suppress pings during an open stream").
Default("0s"))

var storeRateLimits store.SeriesSelectLimits
storeRateLimits.RegisterFlags(cmd)
Expand Down Expand Up @@ -386,7 +393,10 @@ func registerQuery(app *extkingpin.App) {
*enforceTenancy,
*tenantLabel,
*enableGroupReplicaPartialStrategy,
*rewriteAggregationLabelStrategy,
*rewriteAggregationLabelTo,
*lazyRetrievalMaxBufferedResponses,
time.Duration(*grpcStoreClientKeepAlivePingInterval),
)
})
}
Expand Down Expand Up @@ -471,7 +481,10 @@ func runQuery(
enforceTenancy bool,
tenantLabel string,
groupReplicaPartialResponseStrategy bool,
rewriteAggregationLabelStrategy string,
rewriteAggregationLabelTo string,
lazyRetrievalMaxBufferedResponses int,
grpcStoreClientKeepAlivePingInterval time.Duration,
) error {
comp := component.Query
if alertQueryURL == "" {
Expand All @@ -491,6 +504,11 @@ func runQuery(
if err != nil {
return errors.Wrap(err, "building gRPC client")
}
if grpcStoreClientKeepAlivePingInterval > 0 {
clientParameters := extgrpc.GetDefaultKeepaliveClientParameters()
clientParameters.Time = grpcStoreClientKeepAlivePingInterval
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(clientParameters))
}
if grpcCompression != compressionNone {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(grpcCompression)))
}
Expand Down Expand Up @@ -559,6 +577,7 @@ func runQuery(
store.WithTSDBSelector(tsdbSelector),
store.WithProxyStoreDebugLogging(debugLogging),
store.WithQuorumChunkDedup(queryDeduplicationFunc == dedup.AlgorithmQuorum),
store.WithLazyRetrievalMaxBufferedResponsesForProxy(lazyRetrievalMaxBufferedResponses),
}

// Parse and sanitize the provided replica labels flags.
Expand Down Expand Up @@ -601,6 +620,7 @@ func runQuery(
opts := query.Options{
GroupReplicaPartialResponseStrategy: groupReplicaPartialResponseStrategy,
DeduplicationFunc: queryDeduplicationFunc,
RewriteAggregationLabelStrategy: rewriteAggregationLabelStrategy,
RewriteAggregationLabelTo: rewriteAggregationLabelTo,
}
level.Info(logger).Log("msg", "databricks querier features", "opts", fmt.Sprintf("%+v", opts))
Expand Down
19 changes: 13 additions & 6 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,13 @@ func runReceive(
return errors.Wrap(err, "setup gRPC server")
}

if conf.lazyRetrievalMaxBufferedResponses <= 0 {
return errors.New("--receive.lazy-retrieval-max-buffered-responses must be > 0")
}
options := []store.ProxyStoreOption{
store.WithProxyStoreDebugLogging(debugLogging),
store.WithoutDedup(),
store.WithLazyRetrievalMaxBufferedResponsesForProxy(conf.lazyRetrievalMaxBufferedResponses),
}
if matcherConverter != nil {
options = append(options, store.WithProxyStoreMatcherConverter(matcherConverter))
Expand Down Expand Up @@ -996,11 +1000,12 @@ type receiveConfig struct {

asyncForwardWorkerCount uint

numTopMetricsPerTenant int
topMetricsMinimumCardinality uint64
topMetricsUpdateInterval time.Duration
matcherConverterCacheCapacity int
maxPendingGrpcWriteRequests int
numTopMetricsPerTenant int
topMetricsMinimumCardinality uint64
topMetricsUpdateInterval time.Duration
matcherConverterCacheCapacity int
maxPendingGrpcWriteRequests int
lazyRetrievalMaxBufferedResponses int

featureList *[]string
}
Expand Down Expand Up @@ -1049,7 +1054,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
hashringAlgorithmsHelptext := strings.Join([]string{string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)}, ", ")
cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings. Must be one of "+hashringAlgorithmsHelptext+". Will be overwritten by the tenant-specific algorithm in the hashring config.").
Default(string(receive.AlgorithmHashmod)).
EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama))
EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama), string(receive.AlgorithmAlignedKetama))

rc.refreshInterval = extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)").
Default("5m"))
Expand Down Expand Up @@ -1170,6 +1175,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("receive.max-pending-grcp-write-requests", "Reject right away gRPC write requests when this number of requests are pending. Value 0 disables this feature.").
Default("0").IntVar(&rc.maxPendingGrpcWriteRequests)
rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings()
cmd.Flag("receive.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled.").
Default("20").IntVar(&rc.lazyRetrievalMaxBufferedResponses)
}

// determineMode returns the ReceiverMode that this receiver is configured to run in.
Expand Down
10 changes: 8 additions & 2 deletions cmd/thanos/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type StreamerConfig struct {
storeAddrPort string
streamTimeoutSeconds int
replicaLabel string
ignoreWarnings bool
}

func registerStreamer(app *extkingpin.App) {
Expand All @@ -48,6 +49,7 @@ func registerStreamer(app *extkingpin.App) {
cmd.Flag("store", "Thanos Store API gRPC endpoint").Default("localhost:10901").StringVar(&config.storeAddrPort)
cmd.Flag("stream.timeout_seconds", "One stream's overall timeout in seconds ").Default("36000").IntVar(&config.streamTimeoutSeconds)
cmd.Flag("stream.replica_label", "Drop this replica label from all returns time series and dedup them.").Default("").StringVar(&config.replicaLabel)
cmd.Flag("stream.ignore_warnings", "Don't return an error when a warning response is received. --no-stream.ignore_warnings to disable").Default("true").BoolVar(&config.ignoreWarnings)

hc := &httpConfig{}
hc = hc.registerFlag(cmd)
Expand Down Expand Up @@ -262,10 +264,14 @@ func (s *Streamer) streamOneRequest(request *streamer.StreamerRequest, writer io
return writeResponse(&streamer.StreamerResponse{Err: err.Error()})
}
if warning := response.GetWarning(); warning != "" {
level.Error(s.logger).Log(
"warning", warning,
level.Warn(s.logger).Log(
"msg", "warning response from Store gRPC stream",
"warning", warning,
"ignore_warnings", s.config.ignoreWarnings,
"request_id", request.RequestId)
if s.config.ignoreWarnings {
continue
}
return writeResponse(&streamer.StreamerResponse{Err: fmt.Sprintf("warning response from Store gRPC stream: %s", warning)})
}
seriesResp := response.GetSeries()
Expand Down
30 changes: 24 additions & 6 deletions cmd/thanos/tools_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import (
)

type rawMetricConfig struct {
storeAddr string
metric string
hoursAgo int
skipChunks bool
storeAddr string
metric string
hoursAgo int
skipChunks bool
equalLabelMatcher string
notEqualLabelMatcher string
}

func registerFlags(cmd extkingpin.FlagClause) *rawMetricConfig {
Expand All @@ -37,6 +39,8 @@ func registerFlags(cmd extkingpin.FlagClause) *rawMetricConfig {
cmd.Flag("metric", "The metric name to stream time series for this metric").Default("node_cpu_seconds_total").StringVar(&conf.metric)
cmd.Flag("hours_ago", "Stream the metric from this number of hours ago").Default("16").IntVar(&conf.hoursAgo)
cmd.Flag("skip_chunks", "Skip chunks in the response").Default("false").BoolVar(&conf.skipChunks)
cmd.Flag("eq_label_matcher", "One label matcher using equal").Default("").StringVar(&conf.equalLabelMatcher)
cmd.Flag("neq_label_matcher", "One label matcher using not equal").Default("").StringVar(&conf.notEqualLabelMatcher)
return conf
}

Expand Down Expand Up @@ -72,6 +76,20 @@ func streamMetric(conf *rawMetricConfig, logger log.Logger) error {
labelMatchers := []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: conf.metric},
}
addMatcher := func(mtype storepb.LabelMatcher_Type, matcher string) {
parts := strings.Split(matcher, "=")
if len(parts) != 2 {
level.Error(logger).Log("msg", "ignoring an invalid label matcher", "matcher", matcher)
return
}
labelMatchers = append(labelMatchers, storepb.LabelMatcher{
Type: mtype,
Name: parts[0],
Value: parts[1],
})
}
addMatcher(storepb.LabelMatcher_EQ, conf.equalLabelMatcher)
addMatcher(storepb.LabelMatcher_NEQ, conf.notEqualLabelMatcher)
storeReq := &storepb.SeriesRequest{
Aggregates: []storepb.Aggr{storepb.Aggr_RAW},
Matchers: labelMatchers,
Expand Down Expand Up @@ -103,9 +121,9 @@ func streamMetric(conf *rawMetricConfig, logger log.Logger) error {
}
series := resPtr.GetSeries()
if series == nil {
return fmt.Errorf("Got a nil series")
return fmt.Errorf("got a nil series")
}
if 0 == (seq % 1000) {
if (seq % 1000) == 0 {
level.Info(logger).Log("msg", "streaming time series", "seq", seq)
}
metric := ""
Expand Down
33 changes: 28 additions & 5 deletions cmd/thanos/tools_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ import (
)

type streamerToolConfig struct {
socketPath string
metric string
hoursAgo int
skipChunks bool
socketPath string
metric string
hoursAgo int
skipChunks bool
equalLabelMatcher string
notEqualLabelMatcher string
sleepSeconds int
}

func registerStreamerTool(app extkingpin.AppClause) {
Expand All @@ -37,6 +40,9 @@ func registerStreamerTool(app extkingpin.AppClause) {
cmd.Flag("metric", "The metric name to stream time series for this metric").Default("node_cpu_seconds_total").StringVar(&conf.metric)
cmd.Flag("hours_ago", "Stream the metric from this number of hours ago").Default("16").IntVar(&conf.hoursAgo)
cmd.Flag("skip_chunks", "Skip chunks in the response").Default("false").BoolVar(&conf.skipChunks)
cmd.Flag("eq_label_matcher", "One label matcher using equal").Default("").StringVar(&conf.equalLabelMatcher)
cmd.Flag("neq_label_matcher", "One label matcher using not equal").Default("").StringVar(&conf.notEqualLabelMatcher)
cmd.Flag("sleep_seconds_between_calls", "Sleep this amount of seconds between calling two gRPC").Default("0").IntVar(&conf.sleepSeconds)

cmd.Setup(func(
g *run.Group,
Expand Down Expand Up @@ -72,6 +78,20 @@ func runStreamerTool(conf *streamerToolConfig, logger log.Logger) error {
SkipChunks: conf.skipChunks,
Metric: conf.metric,
}
addMatcher := func(mtype streamer_pkg.LabelMatcher_Type, matcher string) {
parts := strings.Split(matcher, "=")
if len(parts) != 2 {
level.Error(logger).Log("msg", "ignoring an invalid label matcher", "matcher", matcher)
return
}
request.LabelMatchers = append(request.LabelMatchers, streamer_pkg.LabelMatcher{
Type: mtype,
Name: parts[0],
Value: parts[1],
})
}
addMatcher(streamer_pkg.LabelMatcher_EQ, conf.equalLabelMatcher)
addMatcher(streamer_pkg.LabelMatcher_NEQ, conf.notEqualLabelMatcher)

level.Info(logger).Log(
"msg", "sending a socket request to Thanos streamer",
Expand Down Expand Up @@ -120,7 +140,7 @@ func runStreamerTool(conf *streamerToolConfig, logger log.Logger) error {
level.Error(logger).Log("msg", "error in response", "err", resp.Err)
return nil
}
if 0 == (seq % 1000) {
if (seq % 1000) == 0 {
level.Info(logger).Log("msg", "streaming time series", "seq", seq)
}
series := resp.Data
Expand All @@ -147,6 +167,9 @@ func runStreamerTool(conf *streamerToolConfig, logger log.Logger) error {
order++
}
fmt.Printf("\n")
if conf.sleepSeconds > 0 {
time.Sleep(time.Duration(conf.sleepSeconds) * time.Second)
}
}
return fmt.Errorf("unexpected interruption: %w", scanner.Err())
}
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ require (
)

require (
capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af
capnproto.org/go/capnp/v3 v3.0.0-alpha.30
github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake v1.4.0
github.com/cortexproject/promqlsmith v0.0.0-20240506042652-6cfdd9739a5e
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1
Expand All @@ -135,7 +135,6 @@ require (
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect
github.com/cilium/ebpf v0.11.0 // indirect
github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381 // indirect
github.com/containerd/cgroups/v3 v3.0.3 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/elastic/go-licenser v0.3.1 // indirect
Expand Down Expand Up @@ -169,6 +168,7 @@ require (
k8s.io/client-go v0.31.0 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
zenhack.net/go/util v0.0.0-20230414204917-531d38494cf5 // indirect
)

require (
Expand Down Expand Up @@ -289,6 +289,9 @@ replace (
// Required by Cortex https://github.com/cortexproject/cortex/pull/3051.
github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab

// Use Prometheus fork to include some fixes
github.com/prometheus/prometheus => github.com/yuchen-db/prometheus v0.0.0-20250424212119-34c3b01ad3f9

// Pin kuberesolver/v5 to support new grpc version. Need to upgrade kuberesolver version on weaveworks/common.
github.com/sercand/kuberesolver/v4 => github.com/sercand/kuberesolver/v5 v5.1.1

Expand Down
Loading
Loading