diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index c67d25ccb8e..a1e7c37161b 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -245,7 +245,14 @@ func registerQuery(app *extkingpin.App) { enforceTenancy := cmd.Flag("query.enforce-tenancy", "Enforce tenancy on Query APIs. Responses are returned only if the label value of the configured tenant-label-name and the value of the tenant header matches.").Default("false").Bool() tenantLabel := cmd.Flag("query.tenant-label-name", "Label name to use when enforcing tenancy (if --query.enforce-tenancy is enabled).").Default(tenancy.DefaultTenantLabel).String() - rewriteAggregationLabelTo := cmd.Flag("query.aggregation-label-value-override", "The value override for __rollup__ label for aggregated metrics. If set to x, all queries on aggregated metrics will have a __rollup__=x matcher. Leave empty to disable this behavior. Default is empty.").Default("").String() + rewriteAggregationLabelStrategy := cmd.Flag("query.aggregation-label-strategy", "The strategy to use when rewriting aggregation labels. Used during aggregator migration only.").Default(string(query.NoopLabelRewriter)).Hidden().Enum(string(query.NoopLabelRewriter), string(query.UpsertLabelRewriter), string(query.InsertOnlyLabelRewriter)) + rewriteAggregationLabelTo := cmd.Flag("query.aggregation-label-value-override", "The value override for aggregation label. If set to x, all queries on aggregated metrics will have a `__agg_rule_type__=x` matcher. If empty, this behavior is disabled. Default is empty.").Hidden().Default("").String() + + lazyRetrievalMaxBufferedResponses := cmd.Flag("query.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled."). + Default("20").Int() + + grpcStoreClientKeepAlivePingInterval := extkingpin.ModelDuration(cmd.Flag("query.grcp-store-client-keep-alive-ping-interval", "This value defines how often a store client sends a keepalive ping on an established gRPC stream. 0 means not to set. NB: a client is keeping a long‐running gRPC stream open. It still has active RPCs on the wire—even if Recv() is not called in a while. Setting PermitWithoutStream=false only stops pings when no streams exist; it does not suppress pings during an open stream"). + Default("0s")) var storeRateLimits store.SeriesSelectLimits storeRateLimits.RegisterFlags(cmd) @@ -386,7 +393,10 @@ func registerQuery(app *extkingpin.App) { *enforceTenancy, *tenantLabel, *enableGroupReplicaPartialStrategy, + *rewriteAggregationLabelStrategy, *rewriteAggregationLabelTo, + *lazyRetrievalMaxBufferedResponses, + time.Duration(*grpcStoreClientKeepAlivePingInterval), ) }) } @@ -471,7 +481,10 @@ func runQuery( enforceTenancy bool, tenantLabel string, groupReplicaPartialResponseStrategy bool, + rewriteAggregationLabelStrategy string, rewriteAggregationLabelTo string, + lazyRetrievalMaxBufferedResponses int, + grpcStoreClientKeepAlivePingInterval time.Duration, ) error { comp := component.Query if alertQueryURL == "" { @@ -491,6 +504,11 @@ func runQuery( if err != nil { return errors.Wrap(err, "building gRPC client") } + if grpcStoreClientKeepAlivePingInterval > 0 { + clientParameters := extgrpc.GetDefaultKeepaliveClientParameters() + clientParameters.Time = grpcStoreClientKeepAlivePingInterval + dialOpts = append(dialOpts, grpc.WithKeepaliveParams(clientParameters)) + } if grpcCompression != compressionNone { dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(grpcCompression))) } @@ -559,6 +577,7 @@ func runQuery( store.WithTSDBSelector(tsdbSelector), store.WithProxyStoreDebugLogging(debugLogging), store.WithQuorumChunkDedup(queryDeduplicationFunc == dedup.AlgorithmQuorum), + store.WithLazyRetrievalMaxBufferedResponsesForProxy(lazyRetrievalMaxBufferedResponses), } // Parse and sanitize the provided replica labels flags. @@ -601,6 +620,7 @@ func runQuery( opts := query.Options{ GroupReplicaPartialResponseStrategy: groupReplicaPartialResponseStrategy, DeduplicationFunc: queryDeduplicationFunc, + RewriteAggregationLabelStrategy: rewriteAggregationLabelStrategy, RewriteAggregationLabelTo: rewriteAggregationLabelTo, } level.Info(logger).Log("msg", "databricks querier features", "opts", fmt.Sprintf("%+v", opts)) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index a9173b34882..bde6319fec0 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -414,9 +414,13 @@ func runReceive( return errors.Wrap(err, "setup gRPC server") } + if conf.lazyRetrievalMaxBufferedResponses <= 0 { + return errors.New("--receive.lazy-retrieval-max-buffered-responses must be > 0") + } options := []store.ProxyStoreOption{ store.WithProxyStoreDebugLogging(debugLogging), store.WithoutDedup(), + store.WithLazyRetrievalMaxBufferedResponsesForProxy(conf.lazyRetrievalMaxBufferedResponses), } if matcherConverter != nil { options = append(options, store.WithProxyStoreMatcherConverter(matcherConverter)) @@ -996,11 +1000,12 @@ type receiveConfig struct { asyncForwardWorkerCount uint - numTopMetricsPerTenant int - topMetricsMinimumCardinality uint64 - topMetricsUpdateInterval time.Duration - matcherConverterCacheCapacity int - maxPendingGrpcWriteRequests int + numTopMetricsPerTenant int + topMetricsMinimumCardinality uint64 + topMetricsUpdateInterval time.Duration + matcherConverterCacheCapacity int + maxPendingGrpcWriteRequests int + lazyRetrievalMaxBufferedResponses int featureList *[]string } @@ -1049,7 +1054,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { hashringAlgorithmsHelptext := strings.Join([]string{string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)}, ", ") cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings. Must be one of "+hashringAlgorithmsHelptext+". Will be overwritten by the tenant-specific algorithm in the hashring config."). Default(string(receive.AlgorithmHashmod)). - EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)) + EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama), string(receive.AlgorithmAlignedKetama)) rc.refreshInterval = extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)"). Default("5m")) @@ -1170,6 +1175,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("receive.max-pending-grcp-write-requests", "Reject right away gRPC write requests when this number of requests are pending. Value 0 disables this feature."). Default("0").IntVar(&rc.maxPendingGrpcWriteRequests) rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings() + cmd.Flag("receive.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled."). + Default("20").IntVar(&rc.lazyRetrievalMaxBufferedResponses) } // determineMode returns the ReceiverMode that this receiver is configured to run in. diff --git a/cmd/thanos/streamer.go b/cmd/thanos/streamer.go index 0ba89e130b6..0537b55c061 100644 --- a/cmd/thanos/streamer.go +++ b/cmd/thanos/streamer.go @@ -39,6 +39,7 @@ type StreamerConfig struct { storeAddrPort string streamTimeoutSeconds int replicaLabel string + ignoreWarnings bool } func registerStreamer(app *extkingpin.App) { @@ -48,6 +49,7 @@ func registerStreamer(app *extkingpin.App) { cmd.Flag("store", "Thanos Store API gRPC endpoint").Default("localhost:10901").StringVar(&config.storeAddrPort) cmd.Flag("stream.timeout_seconds", "One stream's overall timeout in seconds ").Default("36000").IntVar(&config.streamTimeoutSeconds) cmd.Flag("stream.replica_label", "Drop this replica label from all returns time series and dedup them.").Default("").StringVar(&config.replicaLabel) + cmd.Flag("stream.ignore_warnings", "Don't return an error when a warning response is received. --no-stream.ignore_warnings to disable").Default("true").BoolVar(&config.ignoreWarnings) hc := &httpConfig{} hc = hc.registerFlag(cmd) @@ -262,10 +264,14 @@ func (s *Streamer) streamOneRequest(request *streamer.StreamerRequest, writer io return writeResponse(&streamer.StreamerResponse{Err: err.Error()}) } if warning := response.GetWarning(); warning != "" { - level.Error(s.logger).Log( - "warning", warning, + level.Warn(s.logger).Log( "msg", "warning response from Store gRPC stream", + "warning", warning, + "ignore_warnings", s.config.ignoreWarnings, "request_id", request.RequestId) + if s.config.ignoreWarnings { + continue + } return writeResponse(&streamer.StreamerResponse{Err: fmt.Sprintf("warning response from Store gRPC stream: %s", warning)}) } seriesResp := response.GetSeries() diff --git a/cmd/thanos/tools_metric.go b/cmd/thanos/tools_metric.go index eba14c389f8..dc70593ad28 100644 --- a/cmd/thanos/tools_metric.go +++ b/cmd/thanos/tools_metric.go @@ -25,10 +25,12 @@ import ( ) type rawMetricConfig struct { - storeAddr string - metric string - hoursAgo int - skipChunks bool + storeAddr string + metric string + hoursAgo int + skipChunks bool + equalLabelMatcher string + notEqualLabelMatcher string } func registerFlags(cmd extkingpin.FlagClause) *rawMetricConfig { @@ -37,6 +39,8 @@ func registerFlags(cmd extkingpin.FlagClause) *rawMetricConfig { cmd.Flag("metric", "The metric name to stream time series for this metric").Default("node_cpu_seconds_total").StringVar(&conf.metric) cmd.Flag("hours_ago", "Stream the metric from this number of hours ago").Default("16").IntVar(&conf.hoursAgo) cmd.Flag("skip_chunks", "Skip chunks in the response").Default("false").BoolVar(&conf.skipChunks) + cmd.Flag("eq_label_matcher", "One label matcher using equal").Default("").StringVar(&conf.equalLabelMatcher) + cmd.Flag("neq_label_matcher", "One label matcher using not equal").Default("").StringVar(&conf.notEqualLabelMatcher) return conf } @@ -72,6 +76,20 @@ func streamMetric(conf *rawMetricConfig, logger log.Logger) error { labelMatchers := []storepb.LabelMatcher{ {Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: conf.metric}, } + addMatcher := func(mtype storepb.LabelMatcher_Type, matcher string) { + parts := strings.Split(matcher, "=") + if len(parts) != 2 { + level.Error(logger).Log("msg", "ignoring an invalid label matcher", "matcher", matcher) + return + } + labelMatchers = append(labelMatchers, storepb.LabelMatcher{ + Type: mtype, + Name: parts[0], + Value: parts[1], + }) + } + addMatcher(storepb.LabelMatcher_EQ, conf.equalLabelMatcher) + addMatcher(storepb.LabelMatcher_NEQ, conf.notEqualLabelMatcher) storeReq := &storepb.SeriesRequest{ Aggregates: []storepb.Aggr{storepb.Aggr_RAW}, Matchers: labelMatchers, @@ -103,9 +121,9 @@ func streamMetric(conf *rawMetricConfig, logger log.Logger) error { } series := resPtr.GetSeries() if series == nil { - return fmt.Errorf("Got a nil series") + return fmt.Errorf("got a nil series") } - if 0 == (seq % 1000) { + if (seq % 1000) == 0 { level.Info(logger).Log("msg", "streaming time series", "seq", seq) } metric := "" diff --git a/cmd/thanos/tools_streamer.go b/cmd/thanos/tools_streamer.go index bd486177ae4..258f98e6835 100644 --- a/cmd/thanos/tools_streamer.go +++ b/cmd/thanos/tools_streamer.go @@ -23,10 +23,13 @@ import ( ) type streamerToolConfig struct { - socketPath string - metric string - hoursAgo int - skipChunks bool + socketPath string + metric string + hoursAgo int + skipChunks bool + equalLabelMatcher string + notEqualLabelMatcher string + sleepSeconds int } func registerStreamerTool(app extkingpin.AppClause) { @@ -37,6 +40,9 @@ func registerStreamerTool(app extkingpin.AppClause) { cmd.Flag("metric", "The metric name to stream time series for this metric").Default("node_cpu_seconds_total").StringVar(&conf.metric) cmd.Flag("hours_ago", "Stream the metric from this number of hours ago").Default("16").IntVar(&conf.hoursAgo) cmd.Flag("skip_chunks", "Skip chunks in the response").Default("false").BoolVar(&conf.skipChunks) + cmd.Flag("eq_label_matcher", "One label matcher using equal").Default("").StringVar(&conf.equalLabelMatcher) + cmd.Flag("neq_label_matcher", "One label matcher using not equal").Default("").StringVar(&conf.notEqualLabelMatcher) + cmd.Flag("sleep_seconds_between_calls", "Sleep this amount of seconds between calling two gRPC").Default("0").IntVar(&conf.sleepSeconds) cmd.Setup(func( g *run.Group, @@ -72,6 +78,20 @@ func runStreamerTool(conf *streamerToolConfig, logger log.Logger) error { SkipChunks: conf.skipChunks, Metric: conf.metric, } + addMatcher := func(mtype streamer_pkg.LabelMatcher_Type, matcher string) { + parts := strings.Split(matcher, "=") + if len(parts) != 2 { + level.Error(logger).Log("msg", "ignoring an invalid label matcher", "matcher", matcher) + return + } + request.LabelMatchers = append(request.LabelMatchers, streamer_pkg.LabelMatcher{ + Type: mtype, + Name: parts[0], + Value: parts[1], + }) + } + addMatcher(streamer_pkg.LabelMatcher_EQ, conf.equalLabelMatcher) + addMatcher(streamer_pkg.LabelMatcher_NEQ, conf.notEqualLabelMatcher) level.Info(logger).Log( "msg", "sending a socket request to Thanos streamer", @@ -120,7 +140,7 @@ func runStreamerTool(conf *streamerToolConfig, logger log.Logger) error { level.Error(logger).Log("msg", "error in response", "err", resp.Err) return nil } - if 0 == (seq % 1000) { + if (seq % 1000) == 0 { level.Info(logger).Log("msg", "streaming time series", "seq", seq) } series := resp.Data @@ -147,6 +167,9 @@ func runStreamerTool(conf *streamerToolConfig, logger log.Logger) error { order++ } fmt.Printf("\n") + if conf.sleepSeconds > 0 { + time.Sleep(time.Duration(conf.sleepSeconds) * time.Second) + } } return fmt.Errorf("unexpected interruption: %w", scanner.Err()) } diff --git a/go.mod b/go.mod index f08a49b2abc..cd1f8d888fe 100644 --- a/go.mod +++ b/go.mod @@ -112,7 +112,7 @@ require ( ) require ( - capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af + capnproto.org/go/capnp/v3 v3.0.0-alpha.30 github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake v1.4.0 github.com/cortexproject/promqlsmith v0.0.0-20240506042652-6cfdd9739a5e github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 @@ -135,7 +135,6 @@ require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect github.com/cilium/ebpf v0.11.0 // indirect - github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381 // indirect github.com/containerd/cgroups/v3 v3.0.3 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/elastic/go-licenser v0.3.1 // indirect @@ -169,6 +168,7 @@ require ( k8s.io/client-go v0.31.0 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect + zenhack.net/go/util v0.0.0-20230414204917-531d38494cf5 // indirect ) require ( @@ -289,6 +289,9 @@ replace ( // Required by Cortex https://github.com/cortexproject/cortex/pull/3051. github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab + // Use Prometheus fork to include some fixes + github.com/prometheus/prometheus => github.com/yuchen-db/prometheus v0.0.0-20250424212119-34c3b01ad3f9 + // Pin kuberesolver/v5 to support new grpc version. Need to upgrade kuberesolver version on weaveworks/common. github.com/sercand/kuberesolver/v4 => github.com/sercand/kuberesolver/v5 v5.1.1 diff --git a/go.sum b/go.sum index e397c08d1dc..688bdcd6574 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af h1:A5wxH0ZidOtYYUGjhtBaRuB87M73bGfc06uWB8sHpg0= -capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af/go.mod h1:2vT5D2dtG8sJGEoEKU17e+j7shdaYp1Myl8X03B3hmc= +capnproto.org/go/capnp/v3 v3.0.0-alpha.30 h1:iABQan/YiHFCgSXym5aNj27osapnEgAk4WaWYqb4sQM= +capnproto.org/go/capnp/v3 v3.0.0-alpha.30/go.mod h1:+ysMHvOh1EWNOyorxJWs1omhRFiDoKxKkWQACp54jKM= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU= @@ -1506,8 +1506,6 @@ github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b h1:ga8SEFjZ60pxLcmhnThWgvH2wg8376yUJmPhEH4H3kw= github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= -github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381 h1:d5EKgQfRQvO97jnISfR89AiCCCJMwMFoSxUiU0OGCRU= -github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381/go.mod h1:OU76gHeRo8xrzGJU3F3I1CqX1ekM8dfJw0+wPeMwnp0= github.com/containerd/cgroups/v3 v3.0.3 h1:S5ByHZ/h9PMe5IOQoN7E+nMc2UcLEM/V48DGDJ9kip0= github.com/containerd/cgroups/v3 v3.0.3/go.mod h1:8HBe7V3aWGLFPd/k03swSIsGjZhHI2WzJmticMgVuz0= github.com/coreos/go-systemd/v22 v22.4.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= @@ -2116,8 +2114,8 @@ github.com/ovh/go-ovh v1.6.0 h1:ixLOwxQdzYDx296sXcgS35TOPEahJkpjMGtzPadCjQI= github.com/ovh/go-ovh v1.6.0/go.mod h1:cTVDnl94z4tl8pP1uZ/8jlVxntjSIf09bNcQ5TJSC7c= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= -github.com/philhofer/fwd v1.1.2 h1:bnDivRJ1EWPjUIRXV5KfORO897HTbpFAQddBdE8t7Gw= -github.com/philhofer/fwd v1.1.2/go.mod h1:qkPdfjR2SIEbspLqpe1tO4n5yICnr2DY7mqEx2tUTP0= +github.com/philhofer/fwd v1.1.1 h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ= +github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY= github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI= github.com/phpdave11/gofpdi v1.0.13/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI= @@ -2191,8 +2189,6 @@ github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0ua github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= -github.com/prometheus/prometheus v0.55.1-0.20241102120812-a6fd22b9d2c8 h1:hCxAh6+hxwy7dqUPE5ndnilMeCWrqQkJVjPDXtiYRVo= -github.com/prometheus/prometheus v0.55.1-0.20241102120812-a6fd22b9d2c8/go.mod h1:GGS7QlWKCqCbcEzWsVahYIfQwiGhcExkarHyLJTsv6I= github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/redis/rueidis v1.0.45-alpha.1 h1:69Bu1l7gVC/qDYuGGwMwGg2rjOjSyxESol/Zila62gY= @@ -2276,10 +2272,8 @@ github.com/thanos-io/promql-engine v0.0.0-20241106100125-097e6e9f425a h1:BhWU58V github.com/thanos-io/promql-engine v0.0.0-20241106100125-097e6e9f425a/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= -github.com/tinylib/msgp v1.1.9 h1:SHf3yoO2sGA0veCJeCBYLHuttAVFHGm2RHgNodW7wQU= -github.com/tinylib/msgp v1.1.9/go.mod h1:BCXGB54lDD8qUEPmiG0cQQUANC4IUQyB2ItS2UDlO/k= -github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk= -github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk= +github.com/tinylib/msgp v1.1.5 h1:2gXmtWueD2HefZHQe1QOy9HVzmFrLOVvsXwXBQ0ayy0= +github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek= github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw= github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk= @@ -2301,6 +2295,8 @@ github.com/weaveworks/promrus v1.2.0/go.mod h1:SaE82+OJ91yqjrE1rsvBWVzNZKcHYFtMU github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xhit/go-str2duration v1.2.0/go.mod h1:3cPSlfZlUHVlneIVfePFWcJZsuwf+P1v2SRTV4cUmp4= +github.com/yuchen-db/prometheus v0.0.0-20250424212119-34c3b01ad3f9 h1:hNgEopLMPL0Ol+MOif0rqebCYAtqiBeR54lmt+GyhsA= +github.com/yuchen-db/prometheus v0.0.0-20250424212119-34c3b01ad3f9/go.mod h1:GGS7QlWKCqCbcEzWsVahYIfQwiGhcExkarHyLJTsv6I= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -3350,3 +3346,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+s sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= +zenhack.net/go/util v0.0.0-20230414204917-531d38494cf5 h1:yksDCGMVzyn3vlyf0GZ3huiF5FFaMGQpQ3UJvR0EoGA= +zenhack.net/go/util v0.0.0-20230414204917-531d38494cf5/go.mod h1:1LtNdPAs8WH+BTcQiZAOo2MIKD/5jyK/u7sZ9ZPe5SE= diff --git a/pkg/compact/overlapping.go b/pkg/compact/overlapping.go index b3cb6e232d5..acf2685e15f 100644 --- a/pkg/compact/overlapping.go +++ b/pkg/compact/overlapping.go @@ -23,7 +23,7 @@ const ( overlappingReason = "blocks-overlapping" symbolTableSizeExceedsError = "symbol table size exceeds" - symbolTableSizeLimit = 1024 * 1024 + symbolTableSizeLimit = 512 * 1024 // lower this limits ) type OverlappingCompactionLifecycleCallback struct { diff --git a/pkg/dedup/quorum_iter.go b/pkg/dedup/quorum_iter.go index fcef658c52d..c5d8dc30177 100644 --- a/pkg/dedup/quorum_iter.go +++ b/pkg/dedup/quorum_iter.go @@ -18,21 +18,33 @@ type quorumSeries struct { lset labels.Labels replicas []storage.Series - isCounter bool + disablePenalty bool + isCounter bool } func NewQuorumSeries(lset labels.Labels, replicas []storage.Series, f string) storage.Series { return &quorumSeries{ - lset: lset, - replicas: replicas, + lset: lset, + replicas: replicas, + disablePenalty: true, // Default to no penalty for receiver-only setups + isCounter: isCounter(f), + } +} - isCounter: isCounter(f), +// NewQuorumSeriesWithPenalty creates a quorum series with configurable penalty behavior +func NewQuorumSeriesWithPenalty(lset labels.Labels, replicas []storage.Series, f string, disablePenalty bool) storage.Series { + return &quorumSeries{ + lset: lset, + replicas: replicas, + disablePenalty: disablePenalty, + isCounter: isCounter(f), } } func (m *quorumSeries) Labels() labels.Labels { return m.lset } + func (m *quorumSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator { iters := make([]adjustableSeriesIterator, 0, len(m.replicas)) oks := make([]bool, 0, len(m.replicas)) @@ -48,10 +60,11 @@ func (m *quorumSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator { oks = append(oks, ok) } return &quorumSeriesIterator{ - iters: iters, - oks: oks, - lastT: math.MinInt64, - lastIter: nil, // behavior is undefined if At() is called before Next(), here we panic if it happens. + iters: iters, + oks: oks, + lastT: math.MinInt64, + lastIter: nil, // behavior is undefined if At() is called before Next(), here we panic if it happens. + disablePenalty: m.disablePenalty, } } @@ -89,17 +102,22 @@ type quorumSeriesIterator struct { lastT int64 lastV float64 lastIter adjustableSeriesIterator + + disablePenalty bool } func (m *quorumSeriesIterator) Next() chunkenc.ValueType { - // m.lastIter points to the last iterator that has the latest timestamp. - // m.lastT always aligns with m.lastIter unless when m.lastIter is nil. - // m.lastIter is nil only in the following cases: - // 1. Next()/Seek() is never called. m.lastT is math.MinInt64 in this case. - // 2. The iterator runs out of values. m.lastT is the last timestamp in this case. + if m.disablePenalty { + return m.nextWithoutPenalty() + } + return m.nextWithPenalty() +} + +func (m *quorumSeriesIterator) nextWithPenalty() chunkenc.ValueType { + // Original penalty-based algorithm for backward compatibility minT := int64(math.MaxInt64) var lastIter adjustableSeriesIterator - quoramValue := NewQuorumValuePicker(0.0) + quorumValue := NewQuorumValuePicker(0.0) for i, it := range m.iters { if !m.oks[i] { continue @@ -114,9 +132,9 @@ func (m *quorumSeriesIterator) Next() chunkenc.ValueType { if t < minT { minT = t lastIter = it - quoramValue = NewQuorumValuePicker(v) + quorumValue = NewQuorumValuePicker(v) } else if t == minT { - if quoramValue.addValue(v) { + if quorumValue.addValue(v) { lastIter = it } } @@ -126,7 +144,48 @@ func (m *quorumSeriesIterator) Next() chunkenc.ValueType { if m.lastIter == nil { return chunkenc.ValNone } - m.lastV = quoramValue.currentValue + m.lastV = quorumValue.currentValue + m.lastT = minT + return chunkenc.ValFloat +} + +func (m *quorumSeriesIterator) nextWithoutPenalty() chunkenc.ValueType { + // Find minimum timestamp across all active iterators without applying penalties + minT := int64(math.MaxInt64) + var lastIter adjustableSeriesIterator + quorumValue := NewQuorumValuePicker(0.0) + + for i, it := range m.iters { + if !m.oks[i] { + continue + } + t, v := it.At() + if t <= m.lastT { + // Move to next value if current is not newer + m.oks[i] = it.Next() != chunkenc.ValNone + if m.oks[i] { + it.adjustAtValue(m.lastV) + t, v = it.At() + } else { + continue + } + } + if t < minT { + minT = t + lastIter = it + quorumValue = NewQuorumValuePicker(v) + } else if t == minT { + if quorumValue.addValue(v) { + lastIter = it + } + } + } + + m.lastIter = lastIter + if m.lastIter == nil { + return chunkenc.ValNone + } + m.lastV = quorumValue.currentValue m.lastT = minT return chunkenc.ValFloat } @@ -143,7 +202,7 @@ func (m *quorumSeriesIterator) Seek(t int64) chunkenc.ValueType { } func (m *quorumSeriesIterator) At() (t int64, v float64) { - return m.lastIter.At() + return m.lastT, m.lastV } func (m *quorumSeriesIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { diff --git a/pkg/dedup/quorum_iter_test.go b/pkg/dedup/quorum_iter_test.go index 1b77c00ce2e..9b59e461f00 100644 --- a/pkg/dedup/quorum_iter_test.go +++ b/pkg/dedup/quorum_iter_test.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/util/annotations" ) func TestIteratorEdgeCases(t *testing.T) { @@ -391,15 +392,61 @@ func TestMergedSeriesIterator(t *testing.T) { }, }, }, + { + name: "All replicas have different non-overlapping timestamps", + input: []series{ + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 10000, f: 1.0}}, + }, + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 20000, f: 2.0}}, + }, + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 30000, f: 3.0}}, + }, + }, + exp: []series{ + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 10000, f: 1.0}, {t: 20000, f: 2.0}, {t: 30000, f: 3.0}}, + }, + }, + }, } { t.Run(tcase.name, func(t *testing.T) { - // If it is a counter then pass a function which expects a counter. // If it is a counter then pass a function which expects a counter. f := "" if tcase.isCounter { f = "rate" } - dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, f, AlgorithmQuorum) + + // For tests that expect penalty behavior, use penalty-based algorithm + expectsPenalty := tcase.name == "ignore sampling interval too small" || + tcase.name == "Regression test against 2401" || + tcase.name == "Regression test with no counter adjustment" + + var dedupSet storage.SeriesSet + if expectsPenalty { + // Create custom series set that uses penalty-based quorum + mockSet := &mockedSeriesSet{series: tcase.input} + dedupSet = &penaltyQuorumSeriesSet{ + set: mockSet, + f: f, + ok: true, + } + // Initialize the first peek + if dedupSet.(*penaltyQuorumSeriesSet).set.Next() { + dedupSet.(*penaltyQuorumSeriesSet).peek = dedupSet.(*penaltyQuorumSeriesSet).set.At() + } else { + dedupSet.(*penaltyQuorumSeriesSet).ok = false + } + } else { + dedupSet = NewSeriesSet(&mockedSeriesSet{series: tcase.input}, f, AlgorithmQuorum) + } + var ats []storage.Series for dedupSet.Next() { ats = append(ats, dedupSet.At()) @@ -415,3 +462,322 @@ func TestMergedSeriesIterator(t *testing.T) { }) } } + +func TestQuorumDataLossScenarios(t *testing.T) { + cases := []struct { + name string + input []series + expected []series + }{ + { + name: "One replica has unique data at beginning should not be lost", + input: []series{ + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 1000, f: 1.0}, {t: 15000, f: 2.0}}, + }, + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 10000, f: 1.5}, {t: 20000, f: 2.5}}, + }, + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 30000, f: 3.0}}, + }, + }, + expected: []series{ + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 1000, f: 1.0}, {t: 10000, f: 1.5}, {t: 15000, f: 2.0}, {t: 20000, f: 2.5}, {t: 30000, f: 3.0}}, + }, + }, + }, + { + name: "One replica has unique data in middle gap should not be lost", + input: []series{ + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 10000, f: 1.0}, {t: 40000, f: 4.0}}, + }, + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 10000, f: 1.0}, {t: 20000, f: 2.0}, {t: 30000, f: 3.0}, {t: 40000, f: 4.0}}, + }, + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 10000, f: 1.0}, {t: 40000, f: 4.0}}, + }, + }, + expected: []series{ + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 10000, f: 1.0}, {t: 20000, f: 2.0}, {t: 30000, f: 3.0}, {t: 40000, f: 4.0}}, + }, + }, + }, + { + name: "Single replica with data when others are empty should not be lost", + input: []series{ + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 10000, f: 1.0}, {t: 20000, f: 2.0}}, + }, + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{}, + }, + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{}, + }, + }, + expected: []series{ + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 10000, f: 1.0}, {t: 20000, f: 2.0}}, + }, + }, + }, + { + name: "All replicas have different non-overlapping timestamps", + input: []series{ + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 10000, f: 1.0}}, + }, + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 20000, f: 2.0}}, + }, + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 30000, f: 3.0}}, + }, + }, + expected: []series{ + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 10000, f: 1.0}, {t: 20000, f: 2.0}, {t: 30000, f: 3.0}}, + }, + }, + }, + { + name: "Penalty logic should not cause data loss with small gaps", + input: []series{ + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 10000, f: 1.0}, {t: 12000, f: 1.2}}, + }, + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 10000, f: 1.0}, {t: 11000, f: 1.1}, {t: 12000, f: 1.2}}, + }, + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 10000, f: 1.0}, {t: 12000, f: 1.2}}, + }, + }, + expected: []series{ + { + lset: labels.FromStrings("metric", "test"), + samples: []sample{{t: 10000, f: 1.0}, {t: 11000, f: 1.1}, {t: 12000, f: 1.2}}, + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + dedupSet := NewSeriesSet(&mockedSeriesSet{series: tc.input}, "", AlgorithmQuorum) + var result []storage.Series + for dedupSet.Next() { + result = append(result, dedupSet.At()) + } + testutil.Ok(t, dedupSet.Err()) + testutil.Equals(t, len(tc.expected), len(result)) + + for i, s := range result { + testutil.Equals(t, tc.expected[i].lset, s.Labels()) + actual := expandSeries(t, s.Iterator(nil)) + testutil.Equals(t, tc.expected[i].samples, actual, "samples should match exactly - no data loss allowed") + } + }) + } +} + +func TestQuorumDataLossScenarios_WithPenalty(t *testing.T) { + cases := []struct { + name string + input []series + expected []series + }{ + { + name: "ignore sampling interval too small", + input: []series{ + { + lset: labels.Labels{{Name: "a", Value: "1"}}, + samples: []sample{ + {10000, 8.0}, + {20000, 9.0}, + {50001, 9 + 1.0}, + {60000, 9 + 2.0}, + {70000, 9 + 3.0}, + {80000, 9 + 4.0}, + {90000, 9 + 5.0}, + {100000, 9 + 6.0}, + }, + }, { + lset: labels.Labels{{Name: "a", Value: "1"}}, + samples: []sample{ + {10001, 8.0}, // Penalty 5000 will be added. + {45001, 8 + 1.0}, // Smaller timestamp, this will be chosen. CurrValue = 8.5 which is smaller than last chosen value. + {55001, 8 + 2.0}, + {65001, 8 + 3.0}, + }, + }, + }, + expected: []series{ + { + lset: labels.Labels{{Name: "a", Value: "1"}}, + samples: []sample{{10000, 8}, {20000, 9}, {45001, 9}, {50001, 10}, {55001, 10}, {65001, 11}, {80000, 13}, {90000, 14}, {100000, 15}}, + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + // Use penalty-based algorithm + mockSet := &mockedSeriesSet{series: tc.input} + dedupSet := &penaltyQuorumSeriesSet{ + set: mockSet, + f: "", + ok: true, + } + // Initialize the first peek + if dedupSet.set.Next() { + dedupSet.peek = dedupSet.set.At() + } else { + dedupSet.ok = false + } + + var result []storage.Series + for dedupSet.Next() { + result = append(result, dedupSet.At()) + } + testutil.Ok(t, dedupSet.Err()) + testutil.Equals(t, len(tc.expected), len(result)) + + for i, s := range result { + testutil.Equals(t, tc.expected[i].lset, s.Labels()) + actual := expandSeries(t, s.Iterator(nil)) + testutil.Equals(t, tc.expected[i].samples, actual, "samples should match exactly - no data loss allowed") + } + }) + } +} + +func TestQuorumValuePicker(t *testing.T) { + tests := []struct { + name string + values []float64 + expected float64 + }{ + { + name: "simple majority", + values: []float64{1.0, 1.0, 2.0}, + expected: 1.0, + }, + { + name: "no clear majority", + values: []float64{1.0, 2.0, 3.0}, + expected: 3.0, // Last value wins when no majority + }, + { + name: "single value", + values: []float64{42.0}, + expected: 42.0, + }, + { + name: "all same values", + values: []float64{5.0, 5.0, 5.0}, + expected: 5.0, + }, + { + name: "alternating values", + values: []float64{1.0, 2.0, 1.0, 2.0, 1.0}, + expected: 1.0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if len(tt.values) == 0 { + return + } + picker := NewQuorumValuePicker(tt.values[0]) + for i := 1; i < len(tt.values); i++ { + picker.addValue(tt.values[i]) + } + testutil.Equals(t, tt.expected, picker.currentValue) + }) + } +} + +type penaltyQuorumSeriesSet struct { + set storage.SeriesSet + f string + + replicas []storage.Series + lset labels.Labels + peek storage.Series + ok bool +} + +func (s *penaltyQuorumSeriesSet) Next() bool { + if !s.ok { + return false + } + s.replicas = s.replicas[:0] + + // Set the label set we are currently gathering to the peek element. + s.lset = s.peek.Labels() + s.replicas = append(s.replicas[:0], s.peek) + + return s.next() +} + +func (s *penaltyQuorumSeriesSet) next() bool { + // Peek the next series to see whether it's a replica for the current series. + s.ok = s.set.Next() + if !s.ok { + // There's no next series, the current replicas are the last element. + return len(s.replicas) > 0 + } + s.peek = s.set.At() + nextLset := s.peek.Labels() + + // If the label set is equal to the current label set look for more replicas, otherwise a series is complete. + if !labels.Equal(s.lset, nextLset) { + return true + } + + s.replicas = append(s.replicas, s.peek) + return s.next() +} + +func (s *penaltyQuorumSeriesSet) At() storage.Series { + if len(s.replicas) == 1 { + return s.replicas[0] + } + // Use penalty-based quorum algorithm + return NewQuorumSeriesWithPenalty(s.lset, s.replicas, s.f, false) +} + +func (s *penaltyQuorumSeriesSet) Err() error { + return s.set.Err() +} + +func (s *penaltyQuorumSeriesSet) Warnings() annotations.Annotations { + return s.set.Warnings() +} diff --git a/pkg/extgrpc/client.go b/pkg/extgrpc/client.go index a6dcc7b633d..8bf1521c503 100644 --- a/pkg/extgrpc/client.go +++ b/pkg/extgrpc/client.go @@ -44,6 +44,10 @@ func EndpointGroupGRPCOpts() []grpc.DialOption { } } +func GetDefaultKeepaliveClientParameters() keepalive.ClientParameters { + return keepalive.ClientParameters{Time: 10 * time.Second, Timeout: 5 * time.Second} +} + // StoreClientGRPCOpts creates gRPC dial options for connecting to a store client. func StoreClientGRPCOpts(logger log.Logger, reg prometheus.Registerer, tracer opentracing.Tracer, secure, skipVerify bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) { grpcMets := grpc_prometheus.NewClientMetrics( @@ -71,7 +75,7 @@ func StoreClientGRPCOpts(logger log.Logger, reg prometheus.Registerer, tracer op grpcMets.StreamClientInterceptor(), tracing.StreamClientInterceptor(tracer), ), - grpc.WithKeepaliveParams(keepalive.ClientParameters{Time: 10 * time.Second, Timeout: 5 * time.Second}), + grpc.WithKeepaliveParams(GetDefaultKeepaliveClientParameters()), } if reg != nil { reg.MustRegister(grpcMets) diff --git a/pkg/query/aggregation_label_rewriter.go b/pkg/query/aggregation_label_rewriter.go index b856f58e42f..e17d8d3915f 100644 --- a/pkg/query/aggregation_label_rewriter.go +++ b/pkg/query/aggregation_label_rewriter.go @@ -14,15 +14,26 @@ import ( "github.com/prometheus/prometheus/model/labels" ) +// RewriterStrategy defines the strategy used by the AggregationLabelRewriter. +type RewriterStrategy string + const ( - aggregationLabelName = "__rollup__" + // NoopLabelRewriter is a no-op strategy that basically disables the rewriter. + NoopLabelRewriter RewriterStrategy = "noop" + // UpsertLabelRewriter is a strategy that upserts the aggregation label. + UpsertLabelRewriter RewriterStrategy = "upsert" + // InsertOnlyLabelRewriter is a strategy that only inserts the aggregation label if it does not exist. + InsertOnlyLabelRewriter RewriterStrategy = "insert-only" ) -type AggregationLabelRewriter struct { - logger log.Logger - metrics *aggregationLabelRewriterMetrics +const ( + aggregationLabelName = "__agg_rule_type__" +) - enabled bool +type AggregationLabelRewriter struct { + logger log.Logger + metrics *aggregationLabelRewriterMetrics + strategy RewriterStrategy desiredLabelValue string } @@ -70,16 +81,19 @@ func newAggregationLabelRewriterMetrics(reg prometheus.Registerer, desiredLabelV func NewNopAggregationLabelRewriter() *AggregationLabelRewriter { return &AggregationLabelRewriter{ - enabled: false, + strategy: NoopLabelRewriter, } } -func NewAggregationLabelRewriter(logger log.Logger, reg prometheus.Registerer, desiredLabelValue string) *AggregationLabelRewriter { +func NewAggregationLabelRewriter(logger log.Logger, reg prometheus.Registerer, strategy RewriterStrategy, desiredLabelValue string) *AggregationLabelRewriter { if logger == nil { logger = log.NewNopLogger() } + if desiredLabelValue == "" { + strategy = NoopLabelRewriter + } return &AggregationLabelRewriter{ - enabled: desiredLabelValue != "", + strategy: strategy, logger: logger, metrics: newAggregationLabelRewriterMetrics(reg, desiredLabelValue), desiredLabelValue: desiredLabelValue, @@ -87,7 +101,7 @@ func NewAggregationLabelRewriter(logger log.Logger, reg prometheus.Registerer, d } func (a *AggregationLabelRewriter) Rewrite(ms []*labels.Matcher) []*labels.Matcher { - if !a.enabled { + if a.strategy == NoopLabelRewriter { return ms } @@ -123,12 +137,18 @@ func (a *AggregationLabelRewriter) Rewrite(ms []*labels.Matcher) []*labels.Match aggregationLabelIndex = i } } + + if aggregationLabelMatcher != nil && a.strategy == InsertOnlyLabelRewriter { + needsRewrite = false + skipReason = "insert-only" + } + // After the for loop, if needsRewrite is false, no need to do anything // but if it is true, we either append or modify an aggregation label if needsRewrite { newMatcher := &labels.Matcher{ Name: aggregationLabelName, - Type: labels.MatchEqual, + Type: labels.MatchRegexp, Value: a.desiredLabelValue, } if aggregationLabelMatcher != nil { diff --git a/pkg/query/aggregation_label_rewriter_test.go b/pkg/query/aggregation_label_rewriter_test.go index 038c3239a2e..d216d063e39 100644 --- a/pkg/query/aggregation_label_rewriter_test.go +++ b/pkg/query/aggregation_label_rewriter_test.go @@ -17,6 +17,7 @@ func TestAggregationLabelRewriter_Rewrite(t *testing.T) { for _, tc := range []struct { name string desiredLabelValue string // Empty means disabled + strategy RewriterStrategy inputMatchers []*labels.Matcher expectedMatchers []*labels.Matcher expectedSkipCount float64 @@ -25,7 +26,19 @@ func TestAggregationLabelRewriter_Rewrite(t *testing.T) { }{ { name: "disabled rewriter should not modify label matchers", + desiredLabelValue: "v1", + strategy: NoopLabelRewriter, + inputMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"), + }, + expectedMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"), + }, + }, + { + name: "no desired label value makes a disabled rewriter and should not modify label matchers", desiredLabelValue: "", + strategy: UpsertLabelRewriter, inputMatchers: []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"), }, @@ -36,31 +49,48 @@ func TestAggregationLabelRewriter_Rewrite(t *testing.T) { { name: "should add label for aggregated metric if no existing aggregation label", desiredLabelValue: "5m", + strategy: UpsertLabelRewriter, inputMatchers: []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"), }, expectedMatchers: []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"), - labels.MustNewMatcher(labels.MatchEqual, "__rollup__", "5m"), + labels.MustNewMatcher(labels.MatchRegexp, aggregationLabelName, "5m"), }, expectedAddCount: 1, }, { - name: "should rewrite existing aggregation label for aggregated metric", + name: "should rewrite existing equal aggregation label for aggregated metric", + desiredLabelValue: "5m", + strategy: UpsertLabelRewriter, + inputMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"), + labels.MustNewMatcher(labels.MatchEqual, aggregationLabelName, "1h"), + }, + expectedMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"), + labels.MustNewMatcher(labels.MatchRegexp, aggregationLabelName, "5m"), + }, + expectedRewriteMap: map[string]float64{"1h": 1}, + }, + { + name: "should rewrite existing regex aggregation label for aggregated metric", desiredLabelValue: "5m", + strategy: UpsertLabelRewriter, inputMatchers: []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"), - labels.MustNewMatcher(labels.MatchEqual, "__rollup__", "1h"), + labels.MustNewMatcher(labels.MatchRegexp, aggregationLabelName, "1h"), }, expectedMatchers: []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"), - labels.MustNewMatcher(labels.MatchEqual, "__rollup__", "5m"), + labels.MustNewMatcher(labels.MatchRegexp, aggregationLabelName, "5m"), }, expectedRewriteMap: map[string]float64{"1h": 1}, }, { name: "should skip non-aggregated metric", desiredLabelValue: "5m", + strategy: UpsertLabelRewriter, inputMatchers: []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, "__name__", "test_metric"), }, @@ -72,6 +102,7 @@ func TestAggregationLabelRewriter_Rewrite(t *testing.T) { { name: "should skip non-equal name matcher", desiredLabelValue: "5m", + strategy: UpsertLabelRewriter, inputMatchers: []*labels.Matcher{ labels.MustNewMatcher(labels.MatchRegexp, "__name__", "test:sum"), }, @@ -83,6 +114,7 @@ func TestAggregationLabelRewriter_Rewrite(t *testing.T) { { name: "should skip when no name matcher", desiredLabelValue: "5m", + strategy: UpsertLabelRewriter, inputMatchers: []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, "job", "prometheus"), }, @@ -91,12 +123,27 @@ func TestAggregationLabelRewriter_Rewrite(t *testing.T) { }, expectedSkipCount: 1, }, + { + name: "if insert only, should NOT rewrite existing aggregation label for aggregated metric", + desiredLabelValue: "5m", + strategy: InsertOnlyLabelRewriter, + inputMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"), + labels.MustNewMatcher(labels.MatchEqual, aggregationLabelName, "1h"), + }, + expectedMatchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "test:sum"), + labels.MustNewMatcher(labels.MatchEqual, aggregationLabelName, "1h"), + }, + expectedSkipCount: 1, + }, } { t.Run(tc.name, func(t *testing.T) { reg := prometheus.NewRegistry() rewriter := NewAggregationLabelRewriter( nil, reg, + tc.strategy, tc.desiredLabelValue, ) diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 953aa135beb..1d8d9cb6d3f 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -61,6 +61,7 @@ type QueryableCreator func( type Options struct { GroupReplicaPartialResponseStrategy bool DeduplicationFunc string + RewriteAggregationLabelStrategy string RewriteAggregationLabelTo string } @@ -94,6 +95,7 @@ func NewQueryableCreatorWithOptions( aggregationLabelRewriter := NewAggregationLabelRewriter( logger, extprom.WrapRegistererWithPrefix("aggregation_label_rewriter_", reg), + RewriterStrategy(opts.RewriteAggregationLabelStrategy), opts.RewriteAggregationLabelTo, ) return func( diff --git a/pkg/receive/aligned_hashring.go b/pkg/receive/aligned_hashring.go new file mode 100644 index 00000000000..30203fbb90d --- /dev/null +++ b/pkg/receive/aligned_hashring.go @@ -0,0 +1,153 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package receive + +import ( + "fmt" + "sort" + "strconv" + + "github.com/cespare/xxhash" + "github.com/pkg/errors" + + "github.com/thanos-io/thanos/pkg/strutil" +) + +// groupByAZ groups endpoints by Availability Zone and sorts them by their inferred ordinal in a k8s statefulset. +// It returns a 2D slice where each inner slice represents an AZ (sorted alphabetically) +// and contains endpoints sorted by ordinal. All inner slices are truncated to the +// length of the largest common sequence of ordinals starting from 0 across all AZs. +// All endpoint addresses must be valid k8s DNS names with 0-index ordinals at the end of the pod name. +func groupByAZ(endpoints []Endpoint) ([][]Endpoint, error) { + if len(endpoints) == 0 { + return nil, errors.New("no endpoints provided") + } + + // Group endpoints by AZ and then by ordinal. + azEndpoints := make(map[string]map[int]Endpoint) + for _, ep := range endpoints { + ordinal, err := strutil.ExtractPodOrdinal(ep.Address) + if err != nil { + return nil, errors.Wrapf(err, "failed to extract ordinal from address %s", ep.Address) + } + if _, ok := azEndpoints[ep.AZ]; !ok { + azEndpoints[ep.AZ] = make(map[int]Endpoint) + } + if _, exists := azEndpoints[ep.AZ][ordinal]; exists { + return nil, fmt.Errorf("duplicate endpoint ordinal %d for address %s in AZ %s", ordinal, ep.Address, ep.AZ) + } + azEndpoints[ep.AZ][ordinal] = ep + } + + // Get sorted list of AZ names. + sortedAZs := make([]string, 0, len(azEndpoints)) + for az := range azEndpoints { + sortedAZs = append(sortedAZs, az) + } + sort.Strings(sortedAZs) + + // Determine the maximum common ordinal across all AZs. + maxCommonOrdinal := -1 + for i := 0; ; i++ { + presentInAllAZs := true + for _, az := range sortedAZs { + if _, ok := azEndpoints[az][i]; !ok { + presentInAllAZs = false + if i == 0 { + return nil, fmt.Errorf("AZ %q is missing endpoint with ordinal 0", az) + } + break + } + } + if !presentInAllAZs { + maxCommonOrdinal = i - 1 + break + } + } + if maxCommonOrdinal < 0 { + return nil, errors.New("no common endpoints with ordinal 0 found across all AZs") + } + numAZs := len(sortedAZs) + result := make([][]Endpoint, numAZs) + for i, az := range sortedAZs { + result[i] = make([]Endpoint, 0, maxCommonOrdinal+1) + for j := 0; j <= maxCommonOrdinal; j++ { + result[i] = append(result[i], azEndpoints[az][j]) + } + } + return result, nil +} + +// newAlignedKetamaHashring creates a Ketama hash ring where replicas are strictly aligned across Availability Zones. +// Each section on the hash ring corresponds to a primary endpoint (taken from the first AZ) and its +// aligned replicas in other AZs (endpoints with the same ordinal). The hash for a section is calculated +// based *only* on the primary endpoint's address. +func newAlignedKetamaHashring(endpoints []Endpoint, sectionsPerNode int, replicationFactor uint64) (*ketamaHashring, error) { + if replicationFactor == 0 { + return nil, errors.New("replication factor cannot be zero") + } + if sectionsPerNode <= 0 { + return nil, errors.New("sections per node must be positive") + } + groupedEndpoints, err := groupByAZ(endpoints) + if err != nil { + return nil, errors.Wrap(err, "failed to group endpoints by AZ") + } + numAZs := len(groupedEndpoints) + if numAZs == 0 { + return nil, errors.New("no endpoint groups found after grouping by AZ") + } + if uint64(numAZs) != replicationFactor { + return nil, fmt.Errorf("number of AZs (%d) must equal replication factor (%d)", numAZs, replicationFactor) + } + numEndpointsPerAZ := len(groupedEndpoints[0]) + if numEndpointsPerAZ == 0 { + return nil, errors.New("AZ groups are empty after grouping") + } + totalEndpoints := numAZs * numEndpointsPerAZ + flatEndpoints := make([]Endpoint, 0, totalEndpoints) + for azIndex := 0; azIndex < numAZs; azIndex++ { + flatEndpoints = append(flatEndpoints, groupedEndpoints[azIndex]...) + } + hasher := xxhash.New() + ringSections := make(sections, 0, numEndpointsPerAZ*sectionsPerNode) + + // Iterate through primary endpoints (those in the first AZ) to define sections. + for primaryOrdinalIndex := 0; primaryOrdinalIndex < numEndpointsPerAZ; primaryOrdinalIndex++ { + primaryEndpoint := groupedEndpoints[0][primaryOrdinalIndex] + for sectionIndex := 1; sectionIndex <= sectionsPerNode; sectionIndex++ { + hasher.Reset() + _, _ = hasher.Write([]byte(primaryEndpoint.Address + ":" + strconv.Itoa(sectionIndex))) + sectionHash := hasher.Sum64() + sec := §ion{ + hash: sectionHash, + az: primaryEndpoint.AZ, + endpointIndex: uint64(primaryOrdinalIndex), + replicas: make([]uint64, 0, replicationFactor), + } + + // Find indices of all replicas (including primary) in the flat list and verify alignment. + for azIndex := 0; azIndex < numAZs; azIndex++ { + replicaFlatIndex := azIndex*numEndpointsPerAZ + primaryOrdinalIndex + replicaEndpoint := flatEndpoints[replicaFlatIndex] + replicaOrdinal, err := strutil.ExtractPodOrdinal(replicaEndpoint.Address) + if err != nil { + return nil, errors.Wrapf(err, "failed to extract ordinal from replica endpoint %s in AZ %s", replicaEndpoint.Address, replicaEndpoint.AZ) + } + if replicaOrdinal != primaryOrdinalIndex { + return nil, fmt.Errorf("ordinal mismatch for primary endpoint %s (ordinal %d): replica %s in AZ %s has ordinal %d", + primaryEndpoint.Address, primaryOrdinalIndex, replicaEndpoint.Address, replicaEndpoint.AZ, replicaOrdinal) + } + sec.replicas = append(sec.replicas, uint64(replicaFlatIndex)) + } + ringSections = append(ringSections, sec) + } + } + sort.Sort(ringSections) + return &ketamaHashring{ + endpoints: flatEndpoints, + sections: ringSections, + numEndpoints: uint64(totalEndpoints), + }, nil +} diff --git a/pkg/receive/aligned_hashring_test.go b/pkg/receive/aligned_hashring_test.go new file mode 100644 index 00000000000..c5e9feb611e --- /dev/null +++ b/pkg/receive/aligned_hashring_test.go @@ -0,0 +1,378 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package receive + +import ( + "sort" + "strconv" + "strings" + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/store/storepb/prompb" + "github.com/thanos-io/thanos/pkg/strutil" +) + +// podDNS creates a DNS-like string for testing endpoint addresses. +func podDNS(name string, ordinal int) string { + return name + "-" + strconv.Itoa(ordinal) + ".test-svc.test-namespace.svc.cluster.local" +} + +func TestGroupByAZ(t *testing.T) { + // Test setup endpoints. + ep0a := Endpoint{Address: podDNS("pod", 0), AZ: "zone-a"} + ep1a := Endpoint{Address: podDNS("pod", 1), AZ: "zone-a"} + ep2a := Endpoint{Address: podDNS("pod", 2), AZ: "zone-a"} + ep0b := Endpoint{Address: podDNS("pod", 0), AZ: "zone-b"} + ep1b := Endpoint{Address: podDNS("pod", 1), AZ: "zone-b"} + ep0c := Endpoint{Address: podDNS("pod", 0), AZ: "zone-c"} + ep1c := Endpoint{Address: podDNS("pod", 1), AZ: "zone-c"} + invalidEp := Endpoint{Address: "invalid-address-format", AZ: "zone-a"} + duplicateEp0a := Endpoint{Address: podDNS("anotherpod", 0), AZ: "zone-a"} // Same ordinal (0) as ep0a in zone-a. + + testCases := map[string]struct { + inputEndpoints []Endpoint + expectedResult [][]Endpoint + expectError bool + errorContains string + }{ + "error on empty input": { + inputEndpoints: []Endpoint{}, + expectedResult: nil, + expectError: true, + errorContains: "no endpoints provided", + }, + "single AZ, multiple endpoints": { + inputEndpoints: []Endpoint{ep1a, ep0a, ep2a}, + expectedResult: [][]Endpoint{ + {ep0a, ep1a, ep2a}, + }, + expectError: false, + }, + "multiple AZs, balanced and ordered": { + inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b}, + expectedResult: [][]Endpoint{ + {ep0a, ep1a}, + {ep0b, ep1b}, + }, + expectError: false, + }, + "multiple AZs, different counts, stops at first missing ordinal > 0": { + inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b, ep2a, ep0c}, + expectedResult: [][]Endpoint{ + {ep0a}, + {ep0b}, + {ep0c}, + }, + expectError: false, + }, + "error if ordinal 0 missing in any AZ": { + inputEndpoints: []Endpoint{ep1a, ep2a, ep1b}, + expectedResult: nil, + expectError: true, + errorContains: "missing endpoint with ordinal 0", + }, + "error if ordinal 0 missing in only one AZ": { + inputEndpoints: []Endpoint{ep0a, ep1a, ep1b}, + expectedResult: nil, + expectError: true, + errorContains: `AZ "zone-b" is missing endpoint with ordinal 0`, + }, + "error on invalid address format": { + inputEndpoints: []Endpoint{ep0a, invalidEp, ep0b}, + expectedResult: nil, + expectError: true, + errorContains: "failed to extract ordinal from address invalid-address-format", + }, + "error on duplicate ordinal within an AZ": { + inputEndpoints: []Endpoint{ep0a, ep1a, ep0b, duplicateEp0a}, + expectedResult: nil, + expectError: true, + errorContains: "duplicate endpoint ordinal 0 for address " + duplicateEp0a.Address + " in AZ zone-a", + }, + "AZ sorting check": { + inputEndpoints: []Endpoint{ep0b, ep0c, ep0a}, + expectedResult: [][]Endpoint{ + {ep0a}, + {ep0b}, + {ep0c}, + }, + expectError: false, + }, + "multiple AZs, stops correctly when next ordinal missing everywhere": { + inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b, ep0c, ep1c}, + expectedResult: [][]Endpoint{ + {ep0a, ep1a}, + {ep0b, ep1b}, + {ep0c, ep1c}, + }, + expectError: false, + }, + } + + for tcName, tc := range testCases { + t.Run(tcName, func(t *testing.T) { + result, err := groupByAZ(tc.inputEndpoints) + + if tc.expectError { + testutil.NotOk(t, err) + if tc.errorContains != "" { + testutil.Assert(t, strings.Contains(err.Error(), tc.errorContains), "Expected error message to contain '%s', but got: %v", tc.errorContains, err) + } + testutil.Assert(t, result == nil, "Expected nil result on error, got: %v", result) + } else { + testutil.Ok(t, err) + testutil.Equals(t, tc.expectedResult, result) + + // Verify outer slice (AZs) is sorted alphabetically. + if err == nil && len(result) > 1 { + azOrderCorrect := sort.SliceIsSorted(result, func(i, j int) bool { + return result[i][0].AZ < result[j][0].AZ + }) + testutil.Assert(t, azOrderCorrect, "Outer slice is not sorted by AZ") + } + } + }) + } +} + +func TestAlignedKetamaHashringGet(t *testing.T) { + t.Parallel() + + ep0a := Endpoint{Address: podDNS("pod", 0), AZ: "zone-a"} + ep1a := Endpoint{Address: podDNS("pod", 1), AZ: "zone-a"} + ep0b := Endpoint{Address: podDNS("pod", 0), AZ: "zone-b"} + ep1b := Endpoint{Address: podDNS("pod", 1), AZ: "zone-b"} + ep0c := Endpoint{Address: podDNS("pod", 0), AZ: "zone-c"} + ep1c := Endpoint{Address: podDNS("pod", 1), AZ: "zone-c"} + invalidEp := Endpoint{Address: "invalid-address", AZ: "zone-a"} + duplicateEp0a := Endpoint{Address: podDNS("anotherpod", 0), AZ: "zone-a"} + + tsForReplicaTest := &prompb.TimeSeries{ + Labels: []labelpb.ZLabel{{Name: "test", Value: "replica-routing"}}, + } + + testCases := map[string]struct { + inputEndpoints []Endpoint + replicationFactor uint64 + sectionsPerNode int + + tenant string + ts *prompb.TimeSeries + n uint64 + expectedEndpoint Endpoint + + expectConstructorError bool + constructorErrorContains string + expectGetNError bool + getNErrorContains string + }{ + "valid 2 AZs, RF=2, get replica 0": { + inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b}, + replicationFactor: 2, + sectionsPerNode: SectionsPerNode, + tenant: "tenant1", + ts: tsForReplicaTest, + n: 0, + expectedEndpoint: ep0a, + expectConstructorError: false, + expectGetNError: false, + }, + "valid 2 AZs, RF=2, get replica 1": { + inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b}, + replicationFactor: 2, + sectionsPerNode: SectionsPerNode, + tenant: "tenant1", + ts: tsForReplicaTest, + n: 1, + expectedEndpoint: ep0b, + expectConstructorError: false, + expectGetNError: false, + }, + "valid 3 AZs, RF=3, get replica 0": { + inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b, ep0c, ep1c}, + replicationFactor: 3, + sectionsPerNode: SectionsPerNode, + tenant: "tenant1", + ts: tsForReplicaTest, + n: 0, + expectedEndpoint: ep0a, + expectConstructorError: false, + expectGetNError: false, + }, + "valid 3 AZs, RF=3, get replica 1": { + inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b, ep0c, ep1c}, + replicationFactor: 3, + sectionsPerNode: SectionsPerNode, + tenant: "tenant1", + ts: tsForReplicaTest, + n: 1, + expectedEndpoint: ep0b, + expectConstructorError: false, + expectGetNError: false, + }, + "valid 3 AZs, RF=3, get replica 2": { + inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b, ep0c, ep1c}, + replicationFactor: 3, + sectionsPerNode: SectionsPerNode, + tenant: "tenant1", + ts: tsForReplicaTest, + n: 2, + expectedEndpoint: ep0c, + expectConstructorError: false, + expectGetNError: false, + }, + "error: empty input": { + inputEndpoints: []Endpoint{}, + replicationFactor: 1, + sectionsPerNode: SectionsPerNode, + expectConstructorError: true, + constructorErrorContains: "no endpoints provided", + }, + "error: invalid address": { + inputEndpoints: []Endpoint{ep0a, invalidEp, ep0b}, + replicationFactor: 2, + sectionsPerNode: SectionsPerNode, + expectConstructorError: true, + constructorErrorContains: "failed to extract ordinal from address invalid-address", + }, + "error: duplicate ordinal": { + inputEndpoints: []Endpoint{ep0a, ep1a, ep0b, duplicateEp0a}, + replicationFactor: 2, + sectionsPerNode: SectionsPerNode, + expectConstructorError: true, + constructorErrorContains: "duplicate endpoint", + }, + "error: missing ordinal 0": { + inputEndpoints: []Endpoint{ep1a, ep1b}, + replicationFactor: 2, + sectionsPerNode: SectionsPerNode, + expectConstructorError: true, + constructorErrorContains: "failed to group endpoints by AZ: AZ \"zone-a\" is missing endpoint with ordinal 0", + }, + "error: AZ count != RF (too few AZs)": { + inputEndpoints: []Endpoint{ep0a, ep1a}, + replicationFactor: 2, + sectionsPerNode: SectionsPerNode, + expectConstructorError: true, + constructorErrorContains: "number of AZs (1) must equal replication factor (2)", + }, + "error: AZ count != RF (too many AZs)": { + inputEndpoints: []Endpoint{ep0a, ep1a, ep0b, ep1b, ep0c, ep1c}, + replicationFactor: 2, + sectionsPerNode: SectionsPerNode, + expectConstructorError: true, + constructorErrorContains: "number of AZs (3) must equal replication factor (2)", + }, + "constructor success with unbalanced AZs (uses common subset)": { + inputEndpoints: []Endpoint{ep0a, ep1a, ep0b}, + replicationFactor: 2, + sectionsPerNode: SectionsPerNode, + expectConstructorError: false, + }, + "error: GetN index out of bounds (n >= numEndpoints)": { + inputEndpoints: []Endpoint{ep1a, ep0b, ep0a, ep1b}, + replicationFactor: 2, + sectionsPerNode: SectionsPerNode, + tenant: "tenant1", + ts: tsForReplicaTest, + n: 4, + expectConstructorError: false, + expectGetNError: true, + getNErrorContains: "insufficient nodes; have 4, want 5", + }, + } + + for tcName, tc := range testCases { + t.Run(tcName, func(t *testing.T) { + hashRing, err := newAlignedKetamaHashring(tc.inputEndpoints, tc.sectionsPerNode, tc.replicationFactor) + + if tc.expectConstructorError { + require.Error(t, err, "Expected constructor error") + require.Nil(t, hashRing, "Hashring should be nil on constructor error") + if tc.constructorErrorContains != "" { + require.Contains(t, err.Error(), tc.constructorErrorContains, "Constructor error message mismatch") + } + return + } + + require.NoError(t, err, "Expected constructor to succeed") + require.NotNil(t, hashRing, "Hashring should not be nil on successful construction") + + if tc.ts == nil && !tc.expectGetNError { + return + } + if tc.ts == nil && tc.expectGetNError { + tc.ts = &prompb.TimeSeries{Labels: []labelpb.ZLabel{{Name: "dummy", Value: "dummy"}}} + } + + result, getNErr := hashRing.GetN(tc.tenant, tc.ts, tc.n) + if tc.expectGetNError { + require.Error(t, getNErr, "Expected GetN error") + if tc.getNErrorContains != "" { + require.Contains(t, getNErr.Error(), tc.getNErrorContains, "GetN error message mismatch") + } + } else { + require.NoError(t, getNErr, "Expected GetN to succeed") + testutil.Equals(t, tc.expectedEndpoint, result, "GetN returned unexpected endpoint") + } + }) + } +} + +func TestAlignedKetamaHashringReplicaOrdinals(t *testing.T) { + t.Parallel() + + var endpoints []Endpoint + for i := 0; i < 20; i++ { + endpoints = append(endpoints, Endpoint{Address: podDNS("pod", i), AZ: "zone-a"}) + } + for i := 0; i < 20; i++ { + endpoints = append(endpoints, Endpoint{Address: podDNS("pod", i), AZ: "zone-b"}) + } + for i := 0; i < 20; i++ { + endpoints = append(endpoints, Endpoint{Address: podDNS("pod", i), AZ: "zone-c"}) + } + replicationFactor := uint64(3) + sectionsPerNode := 10 + + hashRing, err := newAlignedKetamaHashring(endpoints, sectionsPerNode, replicationFactor) + require.NoError(t, err, "Aligned hashring constructor failed") + require.NotNil(t, hashRing, "Hashring should not be nil") + require.NotEmpty(t, hashRing.sections, "Hashring should contain sections") + + // Verify that all replicas within a section have the same ordinal. + for i, s := range hashRing.sections { + if len(s.replicas) == 0 { + continue + } + + expectedOrdinal := -1 + + for replicaNum, replicaIndex := range s.replicas { + require.Less(t, int(replicaIndex), len(hashRing.endpoints), + "Section %d (hash %d), Replica %d: index %d out of bounds for endpoints list (len %d)", + i, s.hash, replicaNum, replicaIndex, len(hashRing.endpoints)) + + endpoint := hashRing.endpoints[replicaIndex] + ordinal, err := strutil.ExtractPodOrdinal(endpoint.Address) + require.NoError(t, err, + "Section %d (hash %d), Replica %d: failed to extract ordinal from address %s", + i, s.hash, replicaNum, endpoint.Address) + + if expectedOrdinal == -1 { + expectedOrdinal = ordinal + } else { + require.Equal(t, expectedOrdinal, ordinal, + "Section %d (hash %d), Replica %d (%s): Mismatched ordinal. Expected %d, got %d. Replicas in section: %v", + i, s.hash, replicaNum, endpoint.Address, expectedOrdinal, ordinal, s.replicas) + } + } + if len(s.replicas) > 0 { + require.NotEqual(t, -1, expectedOrdinal, "Section %d (hash %d): Failed to determine expected ordinal for replicas %v", i, s.hash, s.replicas) + } + } +} diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index e3ff6ededbe..19fd19262fb 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -29,8 +29,9 @@ import ( type HashringAlgorithm string const ( - AlgorithmHashmod HashringAlgorithm = "hashmod" - AlgorithmKetama HashringAlgorithm = "ketama" + AlgorithmHashmod HashringAlgorithm = "hashmod" + AlgorithmKetama HashringAlgorithm = "ketama" + AlgorithmAlignedKetama HashringAlgorithm = "aligned_ketama" // SectionsPerNode is the number of sections in the ring assigned to each node // in the ketama hashring. A higher number yields a better series distribution, @@ -375,6 +376,8 @@ func newHashring(algorithm HashringAlgorithm, endpoints []Endpoint, replicationF return newSimpleHashring(endpoints) case AlgorithmKetama: return newKetamaHashring(endpoints, SectionsPerNode, replicationFactor) + case AlgorithmAlignedKetama: + return newAlignedKetamaHashring(endpoints, SectionsPerNode, replicationFactor) default: l := log.NewNopLogger() level.Warn(l).Log("msg", "Unrecognizable hashring algorithm. Fall back to hashmod algorithm.", diff --git a/pkg/receive/writecapnp/client.go b/pkg/receive/writecapnp/client.go index 0a20d90d44d..3cd9f2d0820 100644 --- a/pkg/receive/writecapnp/client.go +++ b/pkg/receive/writecapnp/client.go @@ -69,22 +69,9 @@ func (r *RemoteWriteClient) writeWithReconnect(ctx context.Context, numReconnect if err := r.connect(ctx); err != nil { return nil, err } - arena := capnp.SingleSegment(nil) - defer arena.Release() result, release := r.writer.Write(ctx, func(params Writer_write_Params) error { - _, seg, err := capnp.NewMessage(arena) - if err != nil { - return err - } - wr, err := NewRootWriteRequest(seg) - if err != nil { - return err - } - if err := params.SetWr(wr); err != nil { - return err - } - wr, err = params.Wr() + wr, err := params.NewWr() if err != nil { return err } diff --git a/pkg/receive/writecapnp/marshal.go b/pkg/receive/writecapnp/marshal.go index 2d42d60b849..efc1a8ef038 100644 --- a/pkg/receive/writecapnp/marshal.go +++ b/pkg/receive/writecapnp/marshal.go @@ -6,6 +6,8 @@ package writecapnp import ( "capnproto.org/go/capnp/v3" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) @@ -46,7 +48,7 @@ func Build(tenant string, tsreq []prompb.TimeSeries) (WriteRequest, error) { func BuildInto(wr WriteRequest, tenant string, tsreq []prompb.TimeSeries) error { if err := wr.SetTenant(tenant); err != nil { - return err + return errors.Wrap(err, "set tenant") } series, err := wr.NewTimeSeries(int32(len(tsreq))) @@ -59,27 +61,30 @@ func BuildInto(wr WriteRequest, tenant string, tsreq []prompb.TimeSeries) error lblsc, err := tsc.NewLabels(int32(len(ts.Labels))) if err != nil { - return err + return errors.Wrap(err, "new labels") } if err := marshalLabels(lblsc, ts.Labels, builder); err != nil { - return err + return errors.Wrap(err, "marshal labels") } if err := marshalSamples(tsc, ts.Samples); err != nil { - return err + return errors.Wrap(err, "marshal samples") } if err := marshalHistograms(tsc, ts.Histograms); err != nil { - return err + return errors.Wrap(err, "marshal histograms") } if err := marshalExemplars(tsc, ts.Exemplars, builder); err != nil { - return err + return errors.Wrap(err, "marshal exemplars") } } symbols, err := wr.NewSymbols() if err != nil { - return err + return errors.Wrap(err, "new symbols") } - return marshalSymbols(builder, symbols) + if err := marshalSymbols(builder, symbols); err != nil { + return errors.Wrap(err, "marshal symbols") + } + return nil } func marshalSymbols(builder *symbolsBuilder, symbols Symbols) error { diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d9940221ffd..7417b496420 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -422,6 +422,8 @@ type BucketStore struct { enabledLazyExpandedPostings bool sortingStrategy sortingStrategy + // This flag limits memory usage when lazy retrieval strategy, newLazyRespSet(), is used. + lazyRetrievalMaxBufferedResponses int blockEstimatedMaxSeriesFunc BlockEstimator blockEstimatedMaxChunkFunc BlockEstimator @@ -561,6 +563,14 @@ func WithDontResort(true bool) BucketStoreOption { } } +func WithLazyRetrievalMaxBufferedResponsesForBucket(n int) BucketStoreOption { + return func(s *BucketStore) { + if true { + s.lazyRetrievalMaxBufferedResponses = n + } + } +} + // WithIndexHeaderLazyDownloadStrategy specifies what block to lazy download its index header. // Only used when lazy mmap is enabled at the same time. func WithIndexHeaderLazyDownloadStrategy(strategy indexheader.LazyDownloadIndexHeaderFunc) BucketStoreOption { @@ -597,23 +607,24 @@ func NewBucketStore( b := make([]byte, 0, initialBufSize) return &b }}, - chunkPool: pool.NoopPool[byte]{}, - blocks: map[ulid.ULID]*bucketBlock{}, - blockSets: map[uint64]*bucketBlockSet{}, - blockSyncConcurrency: blockSyncConcurrency, - queryGate: gate.NewNoop(), - chunksLimiterFactory: chunksLimiterFactory, - seriesLimiterFactory: seriesLimiterFactory, - bytesLimiterFactory: bytesLimiterFactory, - partitioner: partitioner, - enableCompatibilityLabel: enableCompatibilityLabel, - postingOffsetsInMemSampling: postingOffsetsInMemSampling, - enableSeriesResponseHints: enableSeriesResponseHints, - enableChunkHashCalculation: enableChunkHashCalculation, - seriesBatchSize: SeriesBatchSize, - sortingStrategy: sortingStrategyStore, - indexHeaderLazyDownloadStrategy: indexheader.AlwaysEagerDownloadIndexHeader, - requestLoggerFunc: NoopRequestLoggerFunc, + chunkPool: pool.NoopPool[byte]{}, + blocks: map[ulid.ULID]*bucketBlock{}, + blockSets: map[uint64]*bucketBlockSet{}, + blockSyncConcurrency: blockSyncConcurrency, + queryGate: gate.NewNoop(), + chunksLimiterFactory: chunksLimiterFactory, + seriesLimiterFactory: seriesLimiterFactory, + bytesLimiterFactory: bytesLimiterFactory, + partitioner: partitioner, + enableCompatibilityLabel: enableCompatibilityLabel, + postingOffsetsInMemSampling: postingOffsetsInMemSampling, + enableSeriesResponseHints: enableSeriesResponseHints, + enableChunkHashCalculation: enableChunkHashCalculation, + seriesBatchSize: SeriesBatchSize, + sortingStrategy: sortingStrategyStore, + lazyRetrievalMaxBufferedResponses: 1, + indexHeaderLazyDownloadStrategy: indexheader.AlwaysEagerDownloadIndexHeader, + requestLoggerFunc: NoopRequestLoggerFunc, } for _, option := range options { @@ -1613,6 +1624,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store nil, ) } else { + lazyRetrievalMaxBufferedResponses := s.lazyRetrievalMaxBufferedResponses + if lazyRetrievalMaxBufferedResponses < 1 { + // Some unit and e2e tests hit this path. + lazyRetrievalMaxBufferedResponses = 1 + } resp = newLazyRespSet( span, 10*time.Minute, @@ -1623,6 +1639,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store shardMatcher, false, s.metrics.emptyPostingCount.WithLabelValues(tenant), + lazyRetrievalMaxBufferedResponses, ) } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e8dffd093b1..8027c093de7 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1780,12 +1780,13 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { b1.meta.ULID: b1, b2.meta.ULID: b2, }, - queryGate: gate.NewNoop(), - chunksLimiterFactory: NewChunksLimiterFactory(0), - seriesLimiterFactory: NewSeriesLimiterFactory(0), - bytesLimiterFactory: NewBytesLimiterFactory(0), - seriesBatchSize: SeriesBatchSize, - requestLoggerFunc: NoopRequestLoggerFunc, + queryGate: gate.NewNoop(), + chunksLimiterFactory: NewChunksLimiterFactory(0), + seriesLimiterFactory: NewSeriesLimiterFactory(0), + bytesLimiterFactory: NewBytesLimiterFactory(0), + seriesBatchSize: SeriesBatchSize, + requestLoggerFunc: NoopRequestLoggerFunc, + lazyRetrievalMaxBufferedResponses: 1, } t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) { diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 47e73a3013a..2c602e4db78 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -93,14 +93,15 @@ type ProxyStore struct { selectorLabels labels.Labels buffers sync.Pool - responseTimeout time.Duration - metrics *proxyStoreMetrics - retrievalStrategy RetrievalStrategy - debugLogging bool - tsdbSelector *TSDBSelector - quorumChunkDedup bool - enableDedup bool - matcherConverter *storepb.MatcherConverter + responseTimeout time.Duration + metrics *proxyStoreMetrics + retrievalStrategy RetrievalStrategy + debugLogging bool + tsdbSelector *TSDBSelector + quorumChunkDedup bool + enableDedup bool + matcherConverter *storepb.MatcherConverter + lazyRetrievalMaxBufferedResponses int } type proxyStoreMetrics struct { @@ -137,6 +138,12 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(* // BucketStoreOption are functions that configure BucketStore. type ProxyStoreOption func(s *ProxyStore) +func WithLazyRetrievalMaxBufferedResponsesForProxy(buferSize int) ProxyStoreOption { + return func(s *ProxyStore) { + s.lazyRetrievalMaxBufferedResponses = buferSize + } +} + // WithProxyStoreDebugLogging toggles debug logging. func WithProxyStoreDebugLogging(enable bool) ProxyStoreOption { return func(s *ProxyStore) { @@ -347,6 +354,11 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. ShardInfo: originalRequest.ShardInfo, WithoutReplicaLabels: originalRequest.WithoutReplicaLabels, } + if originalRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_GROUP_REPLICA { + // Do not forward this field as it might cause data loss. + r.PartialResponseDisabled = true + r.PartialResponseStrategy = storepb.PartialResponseStrategy_ABORT + } storeResponses := make([]respSet, 0, len(stores)) @@ -385,24 +397,23 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. } } defer logGroupReplicaErrors() - for _, st := range stores { st := st - respSet, err := newAsyncRespSet(ctx, st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses) + respSet, err := newAsyncRespSet(ctx, st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses, s.lazyRetrievalMaxBufferedResponses) if err != nil { level.Warn(s.logger).Log("msg", "Store failure", "group", st.GroupKey(), "replica", st.ReplicaKey(), "err", err) s.metrics.storeFailureCount.WithLabelValues(st.GroupKey(), st.ReplicaKey()).Inc() bumpCounter(st.GroupKey(), st.ReplicaKey(), failedStores) totalFailedStores++ - if r.PartialResponseStrategy == storepb.PartialResponseStrategy_GROUP_REPLICA { + if originalRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_GROUP_REPLICA { if checkGroupReplicaErrors(st, err) != nil { return err } continue } level.Error(reqLogger).Log("err", err) - if !r.PartialResponseDisabled || r.PartialResponseStrategy == storepb.PartialResponseStrategy_WARN { + if !originalRequest.PartialResponseDisabled || originalRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_WARN { if err := srv.Send(storepb.NewWarnSeriesResponse(err)); err != nil { return err } @@ -438,7 +449,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. level.Error(s.logger).Log("msg", "Store failure with warning", "warning", warning) // Don't have group/replica keys here, so we can't attribute the warning to a specific store. s.metrics.storeFailureCount.WithLabelValues("", "").Inc() - if r.PartialResponseStrategy == storepb.PartialResponseStrategy_GROUP_REPLICA { + if originalRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_GROUP_REPLICA { // The first error message is from AWS S3 and the second one is from Azure Blob Storage. if strings.Contains(resp.GetWarning(), "The specified key does not exist") || strings.Contains(resp.GetWarning(), "The specified blob does not exist") { level.Warn(s.logger).Log("msg", "Ignore 'the specified key/blob does not exist' error from Store") @@ -459,7 +470,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. } firstWarning = &warning } - } else if r.PartialResponseDisabled || r.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT { + } else if originalRequest.PartialResponseDisabled || originalRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT { return status.Error(codes.Aborted, resp.GetWarning()) } } diff --git a/pkg/store/proxy_merge.go b/pkg/store/proxy_merge.go index 378ac789881..abbc2e6c58a 100644 --- a/pkg/store/proxy_merge.go +++ b/pkg/store/proxy_merge.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "io" + "math" "sort" "sync" "time" @@ -255,8 +256,20 @@ type lazyRespSet struct { frameTimeout time.Duration // Internal bookkeeping. - dataOrFinishEvent *sync.Cond - bufferedResponses []*storepb.SeriesResponse + dataOrFinishEvent *sync.Cond + // This event firing means the buffer has a slot for more data. + bufferSlotEvent *sync.Cond + fixedBufferSize int + // This a ring buffer of size fixedBufferSize. + // A ring buffer of size N can hold N - 1 elements at most in order to distinguish being empty from being full. + bufferedResponses []*storepb.SeriesResponse + // ringHead points to the first element in the ring buffer. + // ringTail points to the slot after the last element in the ring buffer. + // if ringHead == ringTail then the buffer is empty. + // if ringHead == (ringTail + 1) % fixedBufferSize then the buffer is full. + ringHead int + ringTail int + closed bool bufferedResponsesMtx *sync.Mutex lastResp *storepb.SeriesResponse @@ -266,24 +279,32 @@ type lazyRespSet struct { shardMatcher *storepb.ShardMatcher } +func (l *lazyRespSet) isEmpty() bool { + return l.ringHead == l.ringTail +} + +func (l *lazyRespSet) isFull() bool { + return (l.ringTail+1)%l.fixedBufferSize == l.ringHead +} + func (l *lazyRespSet) Empty() bool { l.bufferedResponsesMtx.Lock() defer l.bufferedResponsesMtx.Unlock() // NOTE(GiedriusS): need to wait here for at least one // response so that we could build the heap properly. - if l.noMoreData && len(l.bufferedResponses) == 0 { + if l.noMoreData && l.isEmpty() { return true } - for len(l.bufferedResponses) == 0 { + for l.isEmpty() { l.dataOrFinishEvent.Wait() - if l.noMoreData && len(l.bufferedResponses) == 0 { + if l.noMoreData && l.isEmpty() { break } } - return len(l.bufferedResponses) == 0 && l.noMoreData + return l.isEmpty() && l.noMoreData } // Next either blocks until more data is available or reads @@ -295,23 +316,24 @@ func (l *lazyRespSet) Next() bool { l.initialized = true - if l.noMoreData && len(l.bufferedResponses) == 0 { + if l.noMoreData && l.isEmpty() { l.lastResp = nil return false } - for len(l.bufferedResponses) == 0 { + for l.isEmpty() { l.dataOrFinishEvent.Wait() - if l.noMoreData && len(l.bufferedResponses) == 0 { + if l.noMoreData && l.isEmpty() { break } } - if len(l.bufferedResponses) > 0 { - l.lastResp = l.bufferedResponses[0] + if !l.isEmpty() { + l.lastResp = l.bufferedResponses[l.ringHead] if l.initialized { - l.bufferedResponses = l.bufferedResponses[1:] + l.ringHead = (l.ringHead + 1) % l.fixedBufferSize + l.bufferSlotEvent.Signal() } return true } @@ -338,8 +360,12 @@ func newLazyRespSet( shardMatcher *storepb.ShardMatcher, applySharding bool, emptyStreamResponses prometheus.Counter, + fixedBufferSize int, ) respSet { - bufferedResponses := []*storepb.SeriesResponse{} + // A ring buffer of size N can hold N - 1 elements at most in order to distinguish being empty from being full. + // That's why the size is increased by 1 internally. + fixedBufferSize++ + bufferedResponses := make([]*storepb.SeriesResponse, fixedBufferSize) bufferedResponsesMtx := &sync.Mutex{} dataAvailable := sync.NewCond(bufferedResponsesMtx) @@ -351,9 +377,14 @@ func newLazyRespSet( closeSeries: closeSeries, span: span, dataOrFinishEvent: dataAvailable, + bufferSlotEvent: sync.NewCond(bufferedResponsesMtx), bufferedResponsesMtx: bufferedResponsesMtx, bufferedResponses: bufferedResponses, shardMatcher: shardMatcher, + fixedBufferSize: fixedBufferSize, + ringHead: 0, + ringTail: 0, + closed: false, } respSet.storeLabels = make(map[string]struct{}) for _, ls := range storeLabelSets { @@ -406,16 +437,26 @@ func newLazyRespSet( } else { rerr = errors.Wrapf(err, "receive series from %s", st) } - l.span.SetTag("err", rerr.Error()) l.bufferedResponsesMtx.Lock() - l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr)) + for l.isFull() && !l.closed { + l.bufferSlotEvent.Wait() + } + if !l.closed { + l.bufferedResponses[l.ringTail] = storepb.NewWarnSeriesResponse(rerr) + l.ringTail = (l.ringTail + 1) % l.fixedBufferSize + } l.noMoreData = true l.dataOrFinishEvent.Signal() l.bufferedResponsesMtx.Unlock() return false } + if t != nil { + // frameTimeout only applies to cl.Recv() gRPC call because the goroutine may be blocked on waiting for an empty buffer slot. + // Set the timeout to the largest possible value to void triggering it. + t.Reset(time.Duration(math.MaxInt64)) + } numResponses++ bytesProcessed += resp.Size() @@ -429,8 +470,14 @@ func newLazyRespSet( } l.bufferedResponsesMtx.Lock() - l.bufferedResponses = append(l.bufferedResponses, resp) - l.dataOrFinishEvent.Signal() + for l.isFull() && !l.closed { + l.bufferSlotEvent.Wait() + } + if !l.closed { + l.bufferedResponses[l.ringTail] = resp + l.ringTail = (l.ringTail + 1) % l.fixedBufferSize + l.dataOrFinishEvent.Signal() + } l.bufferedResponsesMtx.Unlock() return true } @@ -474,6 +521,7 @@ func newAsyncRespSet( shardInfo *storepb.ShardInfo, logger log.Logger, emptyStreamResponses prometheus.Counter, + lazyRetrievalMaxBufferedResponses int, ) (respSet, error) { var ( @@ -525,6 +573,11 @@ func newAsyncRespSet( switch retrievalStrategy { case LazyRetrieval: span.SetTag("retrival_strategy", LazyRetrieval) + if lazyRetrievalMaxBufferedResponses < 1 { + // Some unit and e2e tests hit this path. + lazyRetrievalMaxBufferedResponses = 1 + } + return newLazyRespSet( span, frameTimeout, @@ -535,6 +588,7 @@ func newAsyncRespSet( shardMatcher, applySharding, emptyStreamResponses, + lazyRetrievalMaxBufferedResponses, ), nil case EagerRetrieval: span.SetTag("retrival_strategy", EagerRetrieval) @@ -560,6 +614,8 @@ func (l *lazyRespSet) Close() { defer l.bufferedResponsesMtx.Unlock() l.closeSeries() + l.closed = true + l.bufferSlotEvent.Signal() l.noMoreData = true l.dataOrFinishEvent.Signal() diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 7767a3e6427..94bfb0d0bb0 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -1587,6 +1587,10 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { expectedWarningsLen: 2, }, } { + + options := []ProxyStoreOption{ + WithLazyRetrievalMaxBufferedResponsesForProxy(1), + } if ok := t.Run(tc.title, func(t *testing.T) { for _, strategy := range []RetrievalStrategy{EagerRetrieval, LazyRetrieval} { if ok := t.Run(string(strategy), func(t *testing.T) { @@ -1596,6 +1600,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { component.Query, tc.selectorLabels, 4*time.Second, strategy, + options..., ) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) diff --git a/pkg/store/storepb/inprocess.go b/pkg/store/storepb/inprocess.go index 0c3e7641baa..d76301b42c2 100644 --- a/pkg/store/storepb/inprocess.go +++ b/pkg/store/storepb/inprocess.go @@ -7,6 +7,7 @@ import ( "context" "io" "iter" + "sync" "google.golang.org/grpc" ) @@ -38,6 +39,7 @@ type inProcessClient struct { ctx context.Context next func() (*SeriesResponse, error, bool) stop func() + mu sync.Mutex } func newInProcessClient(ctx context.Context, next func() (*SeriesResponse, error, bool), stop func()) *inProcessClient { @@ -45,10 +47,13 @@ func newInProcessClient(ctx context.Context, next func() (*SeriesResponse, error ctx: ctx, next: next, stop: stop, + mu: sync.Mutex{}, } } func (c *inProcessClient) Recv() (*SeriesResponse, error) { + c.mu.Lock() + defer c.mu.Unlock() resp, err, ok := c.next() if err != nil { c.stop() @@ -68,6 +73,8 @@ func (c *inProcessClient) Context() context.Context { } func (c *inProcessClient) CloseSend() error { + c.mu.Lock() + defer c.mu.Unlock() c.stop() return nil } diff --git a/pkg/strutil/k8s.go b/pkg/strutil/k8s.go new file mode 100644 index 00000000000..28144bda7f3 --- /dev/null +++ b/pkg/strutil/k8s.go @@ -0,0 +1,31 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package strutil + +import ( + "fmt" + "regexp" + "strconv" + "strings" +) + +// Pre-compile the regex for efficiency if the function is called often. +// Kept the original regex for strict matching. +var podOrdinalRegex = regexp.MustCompile(`^.*\-(\d+)\..+\.svc\.cluster\.local$`) + +// ExtractPodOrdinal extracts the ordinal number from a Kubernetes StatefulSet pod DNS name. +// The DNS name should be in the format: ...svc.cluster.local[:port] +// where is typically - or a hyphenated name ending with -. +func ExtractPodOrdinal(dnsName string) (int, error) { + dnsWithoutPort := strings.Split(dnsName, ":")[0] + matches := podOrdinalRegex.FindStringSubmatch(dnsWithoutPort) + if len(matches) != 2 { + return -1, fmt.Errorf("invalid DNS name format: %s", dnsName) + } + ordinal, err := strconv.Atoi(matches[1]) + if err != nil { + return -1, fmt.Errorf("failed to parse ordinal number: %v", err) + } + return ordinal, nil +} diff --git a/pkg/strutil/k8s_test.go b/pkg/strutil/k8s_test.go new file mode 100644 index 00000000000..583f205006d --- /dev/null +++ b/pkg/strutil/k8s_test.go @@ -0,0 +1,92 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package strutil + +import ( + "testing" + + "github.com/efficientgo/core/testutil" +) + +func TestExtractPodOrdinal(t *testing.T) { + testCases := map[string]struct { + inputDNS string + expectedOrdinal int + expectError bool + }{ + "valid DNS without port": { + inputDNS: "thanos-receive-0.thanos-receive-headless.monitoring.svc.cluster.local", + expectedOrdinal: 0, + expectError: false, + }, + "valid DNS with port": { + inputDNS: "thanos-receive-10.thanos-receive-headless.monitoring.svc.cluster.local:10901", + expectedOrdinal: 10, + expectError: false, + }, + "valid DNS with multiple hyphens in name": { + inputDNS: "my-complex-statefulset-name-5.headless.prod.svc.cluster.local", + expectedOrdinal: 5, + expectError: false, + }, + "invalid DNS format - wrong domain": { + inputDNS: "thanos-receive-0.thanos-receive-headless.monitoring.svc.cluster.example.com", + expectedOrdinal: -1, + expectError: true, + }, + "invalid DNS format - missing service/namespace": { + inputDNS: "thanos-receive-0.svc.cluster.local", + expectedOrdinal: -1, + expectError: true, + }, + "invalid DNS format - no ordinal": { + inputDNS: "thanos-receive.thanos-receive-headless.monitoring.svc.cluster.local", + expectedOrdinal: -1, + expectError: true, + }, + "invalid DNS format - non-numeric ordinal": { + inputDNS: "thanos-receive-abc.thanos-receive-headless.monitoring.svc.cluster.local", + expectedOrdinal: -1, + expectError: true, + }, + "invalid DNS format - ordinal not at the end of pod name part": { + inputDNS: "thanos-receive-0-backup.thanos-receive-headless.monitoring.svc.cluster.local", + expectedOrdinal: -1, // The regex expects \d+ followed by a dot. + expectError: true, + }, + "empty input string": { + inputDNS: "", + expectedOrdinal: -1, + expectError: true, + }, + "just hostname": { + inputDNS: "my-hostname-1", + expectedOrdinal: -1, + expectError: true, + }, + "just hostname with port": { + inputDNS: "my-hostname-1:8080", + expectedOrdinal: -1, + expectError: true, + }, + "DNS like string but not matching pattern": { + inputDNS: "pod-1.service.namespace", + expectedOrdinal: -1, + expectError: true, + }, + } + + for tcName, tc := range testCases { + t.Run(tcName, func(t *testing.T) { + ordinal, err := ExtractPodOrdinal(tc.inputDNS) + + if tc.expectError { + testutil.NotOk(t, err) + } else { + testutil.Ok(t, err) + testutil.Equals(t, tc.expectedOrdinal, ordinal) + } + }) + } +}