Skip to content

Commit 2bd5404

Browse files
authored
Reduce apm internal metrics overhead (CSI-1279) (#42063)
### What does this PR do? Skip zero value metrics that are rarely used, these metrics provide _some_ debugging value, but don't provide any signal when `0`, thus we can stop sending these and reduce the overhead here a small amount. DM me for exact cost calculations ### Motivation Reduce overhead ### Describe how you validated your changes Ran locally and observed these metrics do not appear, but the other ones still do. Also unit tests ### Additional Notes Co-authored-by: andrew.glaude <[email protected]>
1 parent 608e800 commit 2bd5404

File tree

12 files changed

+102
-45
lines changed

12 files changed

+102
-45
lines changed

comp/trace/config/setup.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -663,6 +663,7 @@ func applyDatadogConfig(c *config.AgentConfig, core corecompcfg.Component) error
663663
if k := "apm_config.enable_v1_trace_endpoint"; core.IsSet(k) {
664664
c.EnableV1TraceEndpoint = core.GetBool("apm_config.enable_v1_trace_endpoint")
665665
}
666+
c.SendAllInternalStats = core.GetBool("apm_config.send_all_internal_stats") // default is false
666667
c.DebugServerPort = core.GetInt("apm_config.debug.port")
667668
return nil
668669
}

pkg/config/setup/apm.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ func setupAPM(config pkgconfigmodel.Setup) {
188188
config.BindEnvAndSetDefault("apm_config.debug.port", 5012, "DD_APM_DEBUG_PORT")
189189
config.BindEnvAndSetDefault("apm_config.debug_v1_payloads", false, "DD_APM_DEBUG_V1_PAYLOADS")
190190
config.BindEnvAndSetDefault("apm_config.enable_v1_trace_endpoint", false, "DD_APM_ENABLE_V1_TRACE_ENDPOINT")
191+
config.BindEnvAndSetDefault("apm_config.send_all_internal_stats", false, "DD_APM_SEND_ALL_INTERNAL_STATS")
191192
config.BindEnv("apm_config.features", "DD_APM_FEATURES") //nolint:forbidigo // TODO: replace by 'SetDefaultAndBindEnv'
192193
config.ParseEnvAsStringSlice("apm_config.features", func(s string) []string {
193194
// Either commas or spaces can be used as separators.

pkg/serverless/invocationlifecycle/trace.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func (lp *LifecycleProcessor) processTrace(spans []*pb.Span) {
201201
}
202202

203203
lp.ProcessTrace(&api.Payload{
204-
Source: info.NewReceiverStats().GetTagStats(info.Tags{}),
204+
Source: info.NewReceiverStats(true).GetTagStats(info.Tags{}),
205205
TracerPayload: tracerPayload,
206206
})
207207
}

pkg/serverless/trace/cold_start_span_creator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func (c *ColdStartSpanCreator) processSpan(coldStartSpan *pb.Span) {
168168
}
169169

170170
c.TraceAgent.Process(&api.Payload{
171-
Source: info.NewReceiverStats().GetTagStats(info.Tags{}),
171+
Source: info.NewReceiverStats(true).GetTagStats(info.Tags{}),
172172
TracerPayload: tracerPayload,
173173
})
174174
}

pkg/trace/agent/agent_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ func TestStopWaits(t *testing.T) {
187187
// Use select to avoid blocking if channel is closed
188188
payload := &api.Payload{
189189
TracerPayload: testutil.TracerPayloadWithChunk(testutil.TraceChunkWithSpan(span)),
190-
Source: info.NewReceiverStats().GetTagStats(info.Tags{}),
190+
Source: info.NewReceiverStats(true).GetTagStats(info.Tags{}),
191191
}
192192

193193
select {
@@ -247,7 +247,7 @@ func TestProcess(t *testing.T) {
247247

248248
agnt.Process(&api.Payload{
249249
TracerPayload: testutil.TracerPayloadWithChunk(testutil.TraceChunkWithSpan(span)),
250-
Source: info.NewReceiverStats().GetTagStats(info.Tags{}),
250+
Source: info.NewReceiverStats(true).GetTagStats(info.Tags{}),
251251
})
252252

253253
assert := assert.New(t)
@@ -277,7 +277,7 @@ func TestProcess(t *testing.T) {
277277

278278
agnt.Process(&api.Payload{
279279
TracerPayload: testutil.TracerPayloadWithChunk(testutil.TraceChunkWithSpan(span)),
280-
Source: info.NewReceiverStats().GetTagStats(info.Tags{}),
280+
Source: info.NewReceiverStats(true).GetTagStats(info.Tags{}),
281281
})
282282

283283
assert := assert.New(t)
@@ -317,7 +317,7 @@ func TestProcess(t *testing.T) {
317317
}
318318
agnt.Process(&api.Payload{
319319
TracerPayload: testutil.TracerPayloadWithChunk(testutil.TraceChunkWithSpan(span)),
320-
Source: info.NewReceiverStats().GetTagStats(info.Tags{}),
320+
Source: info.NewReceiverStats(true).GetTagStats(info.Tags{}),
321321
})
322322

323323
assert.Equal(t, 5555.0, span.Metrics["request.secret"])
@@ -2120,7 +2120,7 @@ func TestProbSamplerSetsChunkPriority(t *testing.T) {
21202120
conf: cfg,
21212121
}
21222122

2123-
keep, _ := a.traceSampling(now, info.NewReceiverStats().GetTagStats(info.Tags{}), &pt)
2123+
keep, _ := a.traceSampling(now, info.NewReceiverStats(true).GetTagStats(info.Tags{}), &pt)
21242124
assert.True(t, keep)
21252125
// In order to ensure intake keeps this chunk we must override whatever priority was previously set on this chunk
21262126
// This is especially an issue for incoming OTLP spans where the chunk priority may have the "unset" value of -128
@@ -2373,14 +2373,14 @@ func TestSampleTrace(t *testing.T) {
23732373
}
23742374
a.SamplerMetrics.Add(a.NoPrioritySampler, a.ErrorsSampler, a.PrioritySampler, a.RareSampler)
23752375
tt.expectStatsd(statsd)
2376-
keep, _ := a.traceSampling(now, info.NewReceiverStats().GetTagStats(info.Tags{}), &tt.trace)
2376+
keep, _ := a.traceSampling(now, info.NewReceiverStats(true).GetTagStats(info.Tags{}), &tt.trace)
23772377
metrics.Report()
23782378
assert.Equal(t, tt.keep, keep)
23792379
assert.Equal(t, !tt.keep, tt.trace.TraceChunk.DroppedTrace)
23802380
cfg.Features["error_rare_sample_tracer_drop"] = struct{}{}
23812381
defer delete(cfg.Features, "error_rare_sample_tracer_drop")
23822382
tt.expectStatsdWithFeature(statsd)
2383-
keep, _ = a.traceSampling(now, info.NewReceiverStats().GetTagStats(info.Tags{}), &tt.trace)
2383+
keep, _ = a.traceSampling(now, info.NewReceiverStats(true).GetTagStats(info.Tags{}), &tt.trace)
23842384
metrics.Report()
23852385
assert.Equal(t, tt.keepWithFeature, keep)
23862386
assert.Equal(t, !tt.keepWithFeature, tt.trace.TraceChunk.DroppedTrace)
@@ -2505,12 +2505,12 @@ func TestSample(t *testing.T) {
25052505
conf: cfg,
25062506
}
25072507
t.Run(name, func(t *testing.T) {
2508-
keep, _ := a.sample(now, info.NewReceiverStats().GetTagStats(info.Tags{}), &tt.trace)
2508+
keep, _ := a.sample(now, info.NewReceiverStats(true).GetTagStats(info.Tags{}), &tt.trace)
25092509
assert.Equal(t, tt.keep, keep)
25102510
assert.Equal(t, !tt.keep, tt.trace.TraceChunk.DroppedTrace)
25112511
cfg.Features["error_rare_sample_tracer_drop"] = struct{}{}
25122512
defer delete(cfg.Features, "error_rare_sample_tracer_drop")
2513-
keep, _ = a.sample(now, info.NewReceiverStats().GetTagStats(info.Tags{}), &tt.trace)
2513+
keep, _ = a.sample(now, info.NewReceiverStats(true).GetTagStats(info.Tags{}), &tt.trace)
25142514
assert.Equal(t, tt.keepWithFeature, keep)
25152515
assert.Equal(t, !tt.keepWithFeature, tt.trace.TraceChunk.DroppedTrace)
25162516
})
@@ -2542,7 +2542,7 @@ func TestSampleManualUserDropNoAnalyticsEvents(t *testing.T) {
25422542
SamplerMetrics: sampler.NewMetrics(statsd),
25432543
conf: cfg,
25442544
}
2545-
keep, _ := a.sample(now, info.NewReceiverStats().GetTagStats(info.Tags{}), &pt)
2545+
keep, _ := a.sample(now, info.NewReceiverStats(true).GetTagStats(info.Tags{}), &pt)
25462546
assert.False(t, keep)
25472547
assert.Empty(t, pt.Root.Metrics["_dd.analyzed"])
25482548
}
@@ -2610,7 +2610,7 @@ func TestPartialSamplingFree(t *testing.T) {
26102610
assert.Greater(t, m.HeapInuse, uint64(50*1e6))
26112611
agnt.Process(&api.Payload{
26122612
TracerPayload: tracerPayload,
2613-
Source: info.NewReceiverStats().GetTagStats(info.Tags{}),
2613+
Source: info.NewReceiverStats(true).GetTagStats(info.Tags{}),
26142614
})
26152615
runtime.GC()
26162616
runtime.ReadMemStats(&m)
@@ -2827,7 +2827,7 @@ func runTraceProcessingBenchmark(b *testing.B, c *config.AgentConfig) {
28272827
for i := 0; i < b.N; i++ {
28282828
ta.Process(&api.Payload{
28292829
TracerPayload: testutil.TracerPayloadWithChunk(testutil.RandomTraceChunk(10, 8)),
2830-
Source: info.NewReceiverStats().GetTagStats(info.Tags{}),
2830+
Source: info.NewReceiverStats(true).GetTagStats(info.Tags{}),
28312831
})
28322832
}
28332833
}
@@ -3468,7 +3468,7 @@ func TestSampleWithPriorityNone(t *testing.T) {
34683468
}
34693469
// before := traceutil.CopyTraceChunk(pt.TraceChunk)
34703470
before := pt.TraceChunk.ShallowCopy()
3471-
keep, numEvents := agnt.sample(time.Now(), info.NewReceiverStats().GetTagStats(info.Tags{}), &pt)
3471+
keep, numEvents := agnt.sample(time.Now(), info.NewReceiverStats(true).GetTagStats(info.Tags{}), &pt)
34723472
assert.True(t, keep) // Score Sampler should keep the trace.
34733473
assert.False(t, pt.TraceChunk.DroppedTrace)
34743474
assert.Equal(t, before, pt.TraceChunk)
@@ -3827,7 +3827,7 @@ func TestSingleSpanPlusAnalyticsEvents(t *testing.T) {
38273827
var b bytes.Buffer
38283828
oldLogger := log.SetLogger(log.NewBufferLogger(&b))
38293829
defer func() { log.SetLogger(oldLogger) }()
3830-
keep, numEvents := traceAgent.sample(time.Now(), info.NewReceiverStats().GetTagStats(info.Tags{}), payload)
3830+
keep, numEvents := traceAgent.sample(time.Now(), info.NewReceiverStats(true).GetTagStats(info.Tags{}), payload)
38313831
assert.Equal(t, "[WARN] Detected both analytics events AND single span sampling in the same trace. Single span sampling wins because App Analytics is deprecated.", b.String())
38323832
assert.False(t, keep) //The sampling decision was FALSE but the trace itself is marked as not dropped
38333833
assert.False(t, payload.TraceChunk.DroppedTrace)

pkg/trace/api/api.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func NewHTTPReceiver(
150150
containerIDProvider := NewIDProvider(conf.ContainerProcRoot, conf.ContainerIDFromOriginInfo)
151151
telemetryForwarder := NewTelemetryForwarder(conf, containerIDProvider, statsd)
152152
return &HTTPReceiver{
153-
Stats: info.NewReceiverStats(),
153+
Stats: info.NewReceiverStats(conf.SendAllInternalStats),
154154

155155
out: out,
156156
outV1: outV1,
@@ -912,7 +912,7 @@ func (r *HTTPReceiver) loop() {
912912
defer close(r.exit)
913913

914914
var lastLog time.Time
915-
accStats := info.NewReceiverStats()
915+
accStats := info.NewReceiverStats(r.conf.SendAllInternalStats)
916916

917917
t := time.NewTicker(5 * time.Second)
918918
defer t.Stop()

pkg/trace/config/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,9 @@ type AgentConfig struct {
555555

556556
// EnableV1TraceEndpoint enables the V1 trace endpoint, it is hidden by default
557557
EnableV1TraceEndpoint bool
558+
559+
// SendAllInternalStats enables all internal stats to be published, otherwise some less-frequently-used stats will be omitted when zero to save costs
560+
SendAllInternalStats bool
558561
}
559562

560563
// RemoteClient client is used to APM Sampling Updates from a remote source.

pkg/trace/info/info_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ func TestInfoReceiverStats(t *testing.T) {
396396
conf := testInit(t, nil)
397397
assert.NotNil(conf)
398398

399-
stats := NewReceiverStats()
399+
stats := NewReceiverStats(true)
400400
t1 := &TagStats{
401401
Tags{Lang: "python"},
402402
Stats{},

pkg/trace/info/stats.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@ import (
2323
// ReceiverStats is used to store all the stats per tags.
2424
type ReceiverStats struct {
2525
sync.RWMutex
26-
Stats map[Tags]*TagStats
26+
Stats map[Tags]*TagStats
27+
SendAllStats bool // if true, all stats will be published even those stats that are zero, otherwise some less-frequently-used stats will be omitted when zero to save costs
2728
}
2829

2930
// NewReceiverStats returns a new ReceiverStats
30-
func NewReceiverStats() *ReceiverStats {
31-
return &ReceiverStats{sync.RWMutex{}, map[Tags]*TagStats{}}
31+
// sendAllStats is a boolean that controls whether all stats will be published even those stats that are zero, otherwise some less-frequently-used stats will be omitted when zero to save costs
32+
func NewReceiverStats(sendAllStats bool) *ReceiverStats {
33+
return &ReceiverStats{sync.RWMutex{}, map[Tags]*TagStats{}, sendAllStats}
3234
}
3335

3436
// GetTagStats returns the struct in which the stats will be stored depending of their tags.
@@ -58,7 +60,7 @@ func (rs *ReceiverStats) Acc(recent *ReceiverStats) {
5860
func (rs *ReceiverStats) PublishAndReset(statsd statsd.ClientInterface) {
5961
rs.RLock()
6062
for _, tagStats := range rs.Stats {
61-
tagStats.publishAndReset(statsd)
63+
tagStats.publishAndReset(statsd, rs.SendAllStats)
6264
}
6365
rs.RUnlock()
6466
}
@@ -120,7 +122,15 @@ func (ts *TagStats) AsTags() []string {
120122
return (&ts.Tags).toArray()
121123
}
122124

123-
func (ts *TagStats) publishAndReset(statsd statsd.ClientInterface) {
125+
// publishNonZeroStats publishes a metric if the value is greater than 0 or if sendAllStats is true
126+
// This saves costs significantly for metrics that are frequently just zero
127+
func publishNonZeroStats(statsd statsd.ClientInterface, tags []string, value int64, metricName string, sendAllStats bool) {
128+
if sendAllStats || value > 0 {
129+
_ = statsd.Count(metricName, value, tags, 1)
130+
}
131+
}
132+
133+
func (ts *TagStats) publishAndReset(statsd statsd.ClientInterface, sendAllStats bool) {
124134
// Atomically load and reset any metrics used multiple times from ts
125135
tracesReceived := ts.TracesReceived.Swap(0)
126136

@@ -129,8 +139,7 @@ func (ts *TagStats) publishAndReset(statsd statsd.ClientInterface) {
129139

130140
_ = statsd.Count("datadog.trace_agent.receiver.trace", tracesReceived, tags, 1)
131141
_ = statsd.Count("datadog.trace_agent.receiver.traces_received", tracesReceived, tags, 1)
132-
_ = statsd.Count("datadog.trace_agent.receiver.traces_filtered",
133-
ts.TracesFiltered.Swap(0), tags, 1)
142+
publishNonZeroStats(statsd, tags, ts.TracesFiltered.Swap(0), "datadog.trace_agent.receiver.traces_filtered", sendAllStats)
134143
_ = statsd.Count("datadog.trace_agent.receiver.traces_priority",
135144
ts.TracesPriorityNone.Swap(0), append(tags, "priority:none"), 1)
136145
_ = statsd.Count("datadog.trace_agent.receiver.traces_bytes",
@@ -139,20 +148,15 @@ func (ts *TagStats) publishAndReset(statsd statsd.ClientInterface) {
139148
ts.SpansReceived.Swap(0), tags, 1)
140149
_ = statsd.Count("datadog.trace_agent.receiver.spans_dropped",
141150
ts.SpansDropped.Swap(0), tags, 1)
142-
_ = statsd.Count("datadog.trace_agent.receiver.spans_filtered",
143-
ts.SpansFiltered.Swap(0), tags, 1)
144-
_ = statsd.Count("datadog.trace_agent.receiver.events_extracted",
145-
ts.EventsExtracted.Swap(0), tags, 1)
146-
_ = statsd.Count("datadog.trace_agent.receiver.events_sampled",
147-
ts.EventsSampled.Swap(0), tags, 1)
151+
publishNonZeroStats(statsd, tags, ts.SpansFiltered.Swap(0), "datadog.trace_agent.receiver.spans_filtered", sendAllStats)
152+
publishNonZeroStats(statsd, tags, ts.EventsExtracted.Swap(0), "datadog.trace_agent.receiver.events_extracted", sendAllStats)
153+
publishNonZeroStats(statsd, tags, ts.EventsSampled.Swap(0), "datadog.trace_agent.receiver.events_sampled", sendAllStats)
148154
_ = statsd.Count("datadog.trace_agent.receiver.payload_accepted",
149155
ts.PayloadAccepted.Swap(0), tags, 1)
150156
_ = statsd.Count("datadog.trace_agent.receiver.payload_refused",
151157
ts.PayloadRefused.Swap(0), tags, 1)
152-
_ = statsd.Count("datadog.trace_agent.receiver.client_dropped_p0_spans",
153-
ts.ClientDroppedP0Spans.Swap(0), tags, 1)
154-
_ = statsd.Count("datadog.trace_agent.receiver.client_dropped_p0_traces",
155-
ts.ClientDroppedP0Traces.Swap(0), tags, 1)
158+
publishNonZeroStats(statsd, tags, ts.ClientDroppedP0Spans.Swap(0), "datadog.trace_agent.receiver.client_dropped_p0_spans", sendAllStats)
159+
publishNonZeroStats(statsd, tags, ts.ClientDroppedP0Traces.Swap(0), "datadog.trace_agent.receiver.client_dropped_p0_traces", sendAllStats)
156160

157161
for reason, counter := range ts.TracesDropped.tagCounters() {
158162
count := counter.Swap(0)

pkg/trace/info/stats_test.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ func TestReceiverStats(t *testing.T) {
246246
})
247247

248248
t.Run("update", func(t *testing.T) {
249-
stats := NewReceiverStats()
249+
stats := NewReceiverStats(false)
250250
newstats := testStats()
251251
stats.Acc(newstats)
252252
assert.EqualValues(t, stats, newstats)
@@ -270,6 +270,41 @@ func TestReceiverStats(t *testing.T) {
270270
})
271271
}
272272

273+
func TestReceiverStatsZeroValueStatsNotPublished(t *testing.T) {
274+
statsclient := &teststatsd.Client{}
275+
tags := Tags{
276+
Lang: "go",
277+
LangVersion: "1.12",
278+
LangVendor: "gov",
279+
Interpreter: "gcc",
280+
TracerVersion: "1.33",
281+
EndpointVersion: "v0.4",
282+
Service: "service",
283+
}
284+
testStats := func() *ReceiverStats {
285+
stats := NewStats()
286+
stats.TracesReceived.Store(1)
287+
stats.TracesFiltered.Store(8)
288+
return &ReceiverStats{
289+
Stats: map[Tags]*TagStats{
290+
tags: {
291+
Tags: tags,
292+
Stats: stats,
293+
},
294+
},
295+
SendAllStats: false,
296+
}
297+
}
298+
299+
t.Run("PublishAndReset", func(t *testing.T) {
300+
rs := testStats()
301+
rs.PublishAndReset(statsclient)
302+
// Non-zero stats for some fields aren't published so only 9 here
303+
assert.EqualValues(t, 9, len(statsclient.CountCalls))
304+
assertStatsAreReset(t, rs)
305+
})
306+
}
307+
273308
func assertStatsAreReset(t *testing.T, rs *ReceiverStats) {
274309
for _, tagstats := range rs.Stats {
275310
stats := tagstats.Stats

0 commit comments

Comments
 (0)