Skip to content

Commit 5ab3a6a

Browse files
committed
Add per tenant flag to add unit and type labels for rw2 request
Signed-off-by: SungJin1212 <[email protected]>
1 parent db252aa commit 5ab3a6a

File tree

12 files changed

+362
-23
lines changed

12 files changed

+362
-23
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
* [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873
2525
* [FEATURE] Query Frontend: Add support /api/v1/format_query API for formatting queries. #6893
2626
* [FEATURE] Query Frontend: Add support for /api/v1/parse_query API (experimental) to parse a PromQL expression and return it as a JSON-formatted AST (abstract syntax tree). #6978
27+
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.rw2-enable-type-and-unit-labels` which enables to add `__unit__` and `__type__` labels for remote write v2 requests. #7077
2728
* [ENHANCEMENT] Upgrade the Prometheus version to 3.6.0 and add a `-name-validation-scheme` flag to support UTF-8. #7040 #7056
2829
* [ENHANCEMENT] Distributor: Emit an error with a 400 status code when empty labels are found before the relabelling or label dropping process. #7052
2930
* [ENHANCEMENT] Parquet Storage: Add support for additional sort columns during Parquet file generation #7003

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3988,6 +3988,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
39883988
# CLI flag: -distributor.promote-resource-attributes
39893989
[promote_resource_attributes: <list of string> | default = ]
39903990
3991+
# EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics.
3992+
# This only applies to remote write v2 requests.
3993+
# CLI flag: -distributor.rw2-enable-type-and-unit-labels
3994+
[rw_2_enable_type_and_unit_labels: <boolean> | default = false]
3995+
39913996
# The maximum number of active series per user, per ingester. 0 to disable.
39923997
# CLI flag: -ingester.max-series-per-user
39933998
[max_series_per_user: <int> | default = 5000000]

integration/e2e/util.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,18 +465,20 @@ func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Labe
465465
st := writev2.NewSymbolTable()
466466
lb := labels.NewScratchBuilder(0)
467467
lb.Add("__name__", name)
468-
469468
for _, label := range additionalLabels {
470469
lb.Add(label.Name, label.Value)
471470
}
471+
472472
series = append(series, writev2.TimeSeries{
473473
// Generate the series
474474
LabelsRefs: st.SymbolizeLabels(lb.Labels(), nil),
475475
Samples: []writev2.Sample{
476476
{Value: value, Timestamp: tsMillis},
477477
},
478478
Metadata: writev2.Metadata{
479-
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
479+
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
480+
HelpRef: 2, // equal to name
481+
UnitRef: 2, // equal to name
480482
},
481483
})
482484
symbols = st.Symbols()

integration/remote_write_v2_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,77 @@ func TestIngest_SenderSendPRW2_DistributorNotAllowPRW2(t *testing.T) {
205205
require.Empty(t, result)
206206
}
207207

208+
func TestIngest_EnableTypeAndUnitLabels(t *testing.T) {
209+
const blockRangePeriod = 5 * time.Second
210+
211+
s, err := e2e.NewScenario(networkName)
212+
require.NoError(t, err)
213+
defer s.Close()
214+
215+
// Start dependencies.
216+
consul := e2edb.NewConsulWithName("consul")
217+
require.NoError(t, s.StartAndWaitReady(consul))
218+
219+
flags := mergeFlags(
220+
AlertmanagerLocalFlags(),
221+
map[string]string{
222+
"-store.engine": blocksStorageEngine,
223+
"-blocks-storage.backend": "filesystem",
224+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
225+
"-blocks-storage.bucket-store.sync-interval": "15m",
226+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
227+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
228+
"-querier.query-store-for-labels-enabled": "true",
229+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
230+
"-blocks-storage.tsdb.ship-interval": "1s",
231+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
232+
"-blocks-storage.tsdb.enable-native-histograms": "true",
233+
// Ingester.
234+
"-ring.store": "consul",
235+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
236+
// Distributor.
237+
"-distributor.replication-factor": "1",
238+
"-distributor.remote-writev2-enabled": "true",
239+
"-distributor.rw2-enable-type-and-unit-labels": "true",
240+
// Store-gateway.
241+
"-store-gateway.sharding-enabled": "false",
242+
// alert manager
243+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
244+
},
245+
)
246+
247+
// make alert manager config dir
248+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
249+
250+
path := path.Join(s.SharedDir(), "cortex-1")
251+
252+
flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path})
253+
// Start Cortex replicas.
254+
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
255+
require.NoError(t, s.StartAndWaitReady(cortex))
256+
257+
// Wait until Cortex replicas have updated the ring state.
258+
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
259+
260+
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
261+
require.NoError(t, err)
262+
263+
now := time.Now()
264+
265+
// series push
266+
symbols1, series, _ := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"})
267+
writeStats, err := c.PushV2(symbols1, series)
268+
require.NoError(t, err)
269+
testPushHeader(t, writeStats, 1, 0, 0)
270+
271+
value, err := c.Query("test_series", now)
272+
require.NoError(t, err)
273+
require.Equal(t, model.ValVector, value.Type())
274+
vec := value.(model.Vector)
275+
require.True(t, vec[0].Metric["__unit__"] != "")
276+
require.True(t, vec[0].Metric["__type__"] != "")
277+
}
278+
208279
func TestIngest(t *testing.T) {
209280
const blockRangePeriod = 5 * time.Second
210281

pkg/api/api.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) {
283283
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) {
284284
distributorpb.RegisterDistributorServer(a.server.GRPC, d)
285285

286-
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
286+
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
287287
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
288288

289289
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
@@ -295,7 +295,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
295295
a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET")
296296

297297
// Legacy Routes
298-
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
298+
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
299299
a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET")
300300
a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET")
301301
}
@@ -313,7 +313,7 @@ type Ingester interface {
313313
}
314314

315315
// RegisterIngester registers the ingesters HTTP and GRPC service
316-
func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
316+
func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config, overrides *validation.Overrides) {
317317
client.RegisterIngesterServer(a.server.GRPC, i)
318318

319319
a.indexPage.AddLink(SectionAdminEndpoints, "/ingester/all_user_stats", "Usage Statistics")
@@ -328,12 +328,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
328328
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
329329
a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET")
330330
a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST")
331-
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
331+
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
332332

333333
// Legacy Routes
334334
a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
335335
a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
336-
a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
336+
a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
337337
}
338338

339339
func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) {

pkg/cortex/modules.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) {
492492
}
493493

494494
func (t *Cortex) initIngester() (serv services.Service, err error) {
495-
t.API.RegisterIngester(t.Ingester, t.Cfg.Distributor)
495+
t.API.RegisterIngester(t.Ingester, t.Cfg.Distributor, t.Overrides)
496496

497497
return nil, nil
498498
}

pkg/cortexpb/compatv2.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package cortexpb
22

3-
import "github.com/prometheus/prometheus/model/labels"
3+
import (
4+
"github.com/prometheus/common/model"
5+
"github.com/prometheus/prometheus/model/labels"
6+
)
47

58
func (e *ExemplarV2) ToLabels(b *labels.ScratchBuilder, symbols []string) labels.Labels {
69
return desymbolizeLabels(b, e.GetLabelsRefs(), symbols)
@@ -20,3 +23,26 @@ func desymbolizeLabels(b *labels.ScratchBuilder, labelRefs []uint32, symbols []s
2023
b.Sort()
2124
return b.Labels()
2225
}
26+
27+
func MetadataV2MetricTypeToMetricType(mt MetadataV2_MetricType) model.MetricType {
28+
switch mt {
29+
case METRIC_TYPE_UNSPECIFIED:
30+
return model.MetricTypeUnknown
31+
case METRIC_TYPE_COUNTER:
32+
return model.MetricTypeCounter
33+
case METRIC_TYPE_GAUGE:
34+
return model.MetricTypeGauge
35+
case METRIC_TYPE_HISTOGRAM:
36+
return model.MetricTypeHistogram
37+
case METRIC_TYPE_GAUGEHISTOGRAM:
38+
return model.MetricTypeGaugeHistogram
39+
case METRIC_TYPE_SUMMARY:
40+
return model.MetricTypeSummary
41+
case METRIC_TYPE_INFO:
42+
return model.MetricTypeInfo
43+
case METRIC_TYPE_STATESET:
44+
return model.MetricTypeStateset
45+
default:
46+
return model.MetricTypeUnknown
47+
}
48+
}

pkg/util/push/push.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,17 @@ import (
99
"github.com/go-kit/log/level"
1010
"github.com/prometheus/client_golang/exp/api/remote"
1111
"github.com/prometheus/prometheus/model/labels"
12+
"github.com/prometheus/prometheus/schema"
1213
"github.com/prometheus/prometheus/util/compression"
1314
"github.com/weaveworks/common/httpgrpc"
1415
"github.com/weaveworks/common/middleware"
1516

1617
"github.com/cortexproject/cortex/pkg/cortexpb"
18+
"github.com/cortexproject/cortex/pkg/tenant"
1719
"github.com/cortexproject/cortex/pkg/util"
1820
"github.com/cortexproject/cortex/pkg/util/extract"
1921
"github.com/cortexproject/cortex/pkg/util/log"
22+
"github.com/cortexproject/cortex/pkg/util/validation"
2023
)
2124

2225
const (
@@ -36,7 +39,7 @@ const (
3639
type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
3740

3841
// Handler is a http.Handler which accepts WriteRequests.
39-
func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
42+
func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation.Overrides, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
4043
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
4144
ctx := r.Context()
4245
logger := log.WithContext(ctx, log.Logger)
@@ -78,8 +81,13 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware
7881
}
7982

8083
handlePRW2 := func() {
84+
userID, err := tenant.TenantID(ctx)
85+
if err != nil {
86+
return
87+
}
88+
8189
var req cortexpb.PreallocWriteRequestV2
82-
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
90+
err = util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
8391
if err != nil {
8492
level.Error(logger).Log("err", err.Error())
8593
http.Error(w, err.Error(), http.StatusBadRequest)
@@ -91,7 +99,7 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware
9199
req.Source = cortexpb.API
92100
}
93101

94-
v1Req, err := convertV2RequestToV1(&req)
102+
v1Req, err := convertV2RequestToV1(&req, overrides.RW2EnableTypeAndUnitLabels(userID))
95103
if err != nil {
96104
level.Error(logger).Log("err", err.Error())
97105
http.Error(w, err.Error(), http.StatusBadRequest)
@@ -169,7 +177,7 @@ func setPRW2RespHeader(w http.ResponseWriter, samples, histograms, exemplars int
169177
w.Header().Set(rw20WrittenExemplarsHeader, strconv.FormatInt(exemplars, 10))
170178
}
171179

172-
func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2) (cortexpb.PreallocWriteRequest, error) {
180+
func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUnitLabels bool) (cortexpb.PreallocWriteRequest, error) {
173181
var v1Req cortexpb.PreallocWriteRequest
174182
v1Timeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries))
175183
var v1Metadata []*cortexpb.MetricMetadata
@@ -178,6 +186,20 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2) (cortexpb.Preall
178186
symbols := req.Symbols
179187
for _, v2Ts := range req.Timeseries {
180188
lbs := v2Ts.ToLabels(&b, symbols)
189+
190+
unit := symbols[v2Ts.Metadata.UnitRef]
191+
metricType := v2Ts.Metadata.Type
192+
shouldAttachTypeAndUnitLabels := enableTypeAndUnitLabels && (metricType != cortexpb.METRIC_TYPE_UNSPECIFIED || unit != "")
193+
if shouldAttachTypeAndUnitLabels {
194+
slb := labels.NewScratchBuilder(lbs.Len() + 2) // for __type__ and __unit__
195+
lbs.Range(func(l labels.Label) {
196+
slb.Add(l.Name, l.Value)
197+
})
198+
schema.Metadata{Type: cortexpb.MetadataV2MetricTypeToMetricType(metricType), Unit: unit}.AddToLabels(&slb)
199+
slb.Sort()
200+
lbs = slb.Labels()
201+
}
202+
181203
v1Timeseries = append(v1Timeseries, cortexpb.PreallocTimeseries{
182204
TimeSeries: &cortexpb.TimeSeries{
183205
Labels: cortexpb.FromLabelsToLabelAdapters(lbs),

0 commit comments

Comments
 (0)