Skip to content

Commit 7fdbf97

Browse files
authored
sync recent changes with upstream (#4)
regulating cache delay option which specifies the amount of time after which the processor starts regulating cache sizes. This can be used to avoid regulating cache sizes on an initial empty cache, as it can cause unnecessary cache size adjustments and high memory usage. More in docs. Emitting span for not sampled trace. You can now specify on specific policies to emit a single span in place of the trace. This is useful if someone goes searching for a dropped trace in the system, it will tell them why it's dropped because a span appears with the policy.
1 parent c75119f commit 7fdbf97

File tree

12 files changed

+117
-24
lines changed

12 files changed

+117
-24
lines changed

internal/ptraceutil/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/atlassian-labs/atlassian-sampling-processor/internal/ptraceutil
22

3-
go 1.23.4
3+
go 1.23.5
44

55
require (
66
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.115.0

pkg/processor/atlassiansamplingprocessor/DESIGN.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ code significantly and potentially lead to bugs, as experienced in the upstream
4141

4242
This is, of course, a trade-off. The processing throughput is limited by the capacity of a single goroutine,
4343
creating a potential bottleneck. This can be alleviated by deploying more instances of the processor with reduced
44-
memory allocation per instance (e.g., more pods, each with less memory). If the bottleneck becomes a significant issue,
44+
memory allocation per instance (e.g., more nodes, each with less memory). If the bottleneck becomes a significant issue,
4545
a future enhancement could involve sharding the processor. This would involve splitting the processing workload by trace
4646
ID and maintaining separate caches and states for each shard.
4747

@@ -107,7 +107,8 @@ they can only access cached metadata from the cache. This restriction is in plac
107107
1. **Cache Compression:** It allows for the compression of spans within the cache. We are guaranteed that a compressed
108108
blob is decompressed at most once, which occurs in the case it is sampled and transmitted.
109109
Restricting the reading of cached spans allows for this optimisation.
110-
1.**Performance Efficiency:** It ensures that policy evaluation remains efficient, adhering to O(n) complexity,
110+
111+
2. **Performance Efficiency:** It ensures that policy evaluation remains efficient, adhering to O(n) complexity,
111112
where n represents the number of spans in the current arriving batch. This prevents the policies from becoming slow.
112113

113114
## Caches

pkg/processor/atlassiansamplingprocessor/README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ The `primary_cache_size` value should be greater than 0, and should be set to a
4444
`processor_atlassian_sampling_trace_eviction_time` metric to tune how long you would like your traces to stay pending
4545
in memory before being considered not-sampled.
4646

47-
The primary cache size is initially set to 80% of the `primary_cache_size` value.
47+
The primary cache size is initially set to 60% of the `primary_cache_size` value.
4848
It is automatically adjusted depending on heap memory usage at runtime, but will not exceed the `primary_cache_size` value.
4949

5050
### `secondary_cache_size`
@@ -103,6 +103,9 @@ used as the final decision.
103103

104104
Policies include a `name`, `type`, and then further configuration depending on what the `type` was.
105105

106+
`emit_single_span_for_not_sampled` is an optional field that can be set to `true` for a policy. If set, the processor will emit a single span for a trace that is not sampled, instead of dropping the trace entirely.
107+
This span will have the same trace ID as the original trace. The span will have the name `TRACE NOT SAMPLED`, with policy name in its attribute.
108+
106109
Current supported policy types are:
107110

108111
- `span_count` - samples the trace if it meets a minimum amount of spans.

pkg/processor/atlassiansamplingprocessor/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package atlassiansamplingprocessor // import "github.com/atlassian-labs/atlassia
22

33
import (
44
"errors"
5+
"time"
56

67
"go.opentelemetry.io/collector/component"
78
"go.uber.org/multierr"
@@ -17,6 +18,11 @@ type Config struct {
1718
// A good starting point to set this is about 75% of overall memory resource allocation.
1819
TargetHeapBytes uint64 `mapstructure:"target_heap_bytes"`
1920

21+
// RegulateCacheDelay is the amount of time after which the processor starts regulating cache sizes based on the set TargetHeapBytes (if specified).
22+
// It is optional and defaults to 0s.
23+
// This can be used to avoid regulating cache sizes on an initial empty cache, as it can cause unnecessary cache size adjustments and high memory usage.
24+
RegulateCacheDelay time.Duration `mapstructure:"regulate_cache_delay"`
25+
2026
// PrimaryCacheSize sets the initial and maximum size of the primary cache that holds non-low priority traces.
2127
PrimaryCacheSize int `mapstructure:"primary_cache_size"`
2228

pkg/processor/atlassiansamplingprocessor/config_policy.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ type SharedPolicyConfig struct {
3131
Name string `mapstructure:"name"`
3232
// Type of the policy this will be used to match the proper configuration of the policy.
3333
Type PolicyType `mapstructure:"type"`
34+
// EmitSingleSpanForNotSampled enabled to emit a single span with same trace ID as the one that was dropped.
35+
// So when people search for it, it comes up with “TRACE NOT SAMPLED” and then the sampling policy name attached as an attribute.
36+
EmitSingleSpanForNotSampled bool `mapstructure:"emit_single_span_for_not_sampled"`
3437
// Configs for probabilistic sampling policy evaluator.
3538
ProbabilisticConfig `mapstructure:"probabilistic"`
3639
// Configs for span count filter sampling policy evaluator.

pkg/processor/atlassiansamplingprocessor/config_policy_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func TestPolicyCreationFromConfig(t *testing.T) {
4747
assert.Equal(t, RemoteProbabilistic, policies[8].policyType)
4848
assert.Equal(t, "test-policy-10", policies[9].name)
4949
assert.Equal(t, Downgrader, policies[9].policyType)
50+
assert.True(t, policies[9].emitSingleSpanForNotSampled)
5051

5152
assert.Equal(t, 10, len(policies), "wrong number of assertions")
5253
}

pkg/processor/atlassiansamplingprocessor/config_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package atlassiansamplingprocessor
99
import (
1010
"path/filepath"
1111
"testing"
12+
"time"
1213

1314
"github.com/stretchr/testify/assert"
1415
"github.com/stretchr/testify/require"
@@ -33,12 +34,16 @@ func TestLoadConfig(t *testing.T) {
3334
require.NoError(t, err)
3435
require.NoError(t, sub.Unmarshal(cfg))
3536

37+
delay, err := time.ParseDuration("5m")
38+
require.NoError(t, err)
39+
3640
assert.Equal(t,
3741
cfg,
3842
&Config{
3943
PrimaryCacheSize: 1000,
4044
SecondaryCacheSize: 100,
4145
TargetHeapBytes: 100_000_000,
46+
RegulateCacheDelay: delay,
4247
DecisionCacheCfg: DecisionCacheCfg{SampledCacheSize: 1000, NonSampledCacheSize: 10000},
4348
CompressionEnabled: true,
4449
PolicyConfig: []PolicyConfig{
@@ -151,8 +156,9 @@ func TestLoadConfig(t *testing.T) {
151156
},
152157
{
153158
SharedPolicyConfig: SharedPolicyConfig{
154-
Name: "test-policy-10",
155-
Type: "downgrader",
159+
Name: "test-policy-10",
160+
Type: "downgrader",
161+
EmitSingleSpanForNotSampled: true,
156162
},
157163
DowngraderConfig: DowngraderConfig{
158164
DowngradeTo: "NotSampled",

pkg/processor/atlassiansamplingprocessor/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/atlassian-labs/atlassian-sampling-processor/pkg/processor/atlassiansamplingprocessor
22

3-
go 1.23.4
3+
go 1.23.5
44

55
require (
66
github.com/atlassian-labs/atlassian-sampling-processor/internal/ptraceutil v0.7.6

pkg/processor/atlassiansamplingprocessor/policy.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ type policy struct {
1616
name string
1717
// type is used to identify this policy instance's policyType
1818
policyType PolicyType
19+
// emitSingleSpanForNotSampled is used to determine if a single span should be emitted for a trace that is not sampled.
20+
emitSingleSpanForNotSampled bool
1921
// evaluator that decides if a trace is sampled or not by this policy instance.
2022
evaluator evaluators.PolicyEvaluator
2123
// attribute to use in the telemetry to denote the policy.
@@ -38,10 +40,11 @@ func newPolicies(cfg []PolicyConfig, set component.TelemetrySettings) ([]*policy
3840
return nil, err
3941
}
4042
p := &policy{
41-
name: policyCfg.Name,
42-
policyType: policyCfg.Type,
43-
evaluator: eval,
44-
attribute: metric.WithAttributes(attribute.String("policy", policyCfg.Name)),
43+
name: policyCfg.Name,
44+
policyType: policyCfg.Type,
45+
emitSingleSpanForNotSampled: policyCfg.EmitSingleSpanForNotSampled,
46+
evaluator: eval,
47+
attribute: metric.WithAttributes(attribute.String("policy", policyCfg.Name)),
4548
}
4649
pols[i] = p
4750
}

pkg/processor/atlassiansamplingprocessor/processor.go

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ type atlassianSamplingProcessor struct {
6363
memRegulator memory.RegulatorI
6464
// memTicker controls how often the memRegulator is called
6565
memTicker *time.Ticker
66+
// regulatorStartTime is the time at which the regulator starts being used to regulate cache sizes
67+
regulatorStartTime time.Time
6668
// shutdownStart is a chan used to signal to the async goroutine to start shutdown.
6769
// The deadline time is passed to the chan, so the async goroutine knows when to time out.
6870
shutdownStart chan time.Time
@@ -96,8 +98,8 @@ func newAtlassianSamplingProcessor(cCfg component.Config, set component.Telemetr
9698
compress: cfg.CompressionEnabled,
9799
}
98100

99-
// Start with 80% of max cache size, the memory regulator will adjust the cache size as needed
100-
initialPrimaryCacheSize := int(0.8 * float64(cfg.PrimaryCacheSize))
101+
// Start with 60% of max cache size, the memory regulator will adjust the cache size as needed
102+
initialPrimaryCacheSize := int(0.6 * float64(cfg.PrimaryCacheSize))
101103

102104
primaryCache, err := cache.NewLRUCache[*tracedata.TraceData](
103105
initialPrimaryCacheSize,
@@ -143,7 +145,9 @@ func newAtlassianSamplingProcessor(cCfg component.Config, set component.Telemetr
143145
if rErr != nil {
144146
return nil, rErr
145147
}
148+
146149
asp.memRegulator = memRegulator
150+
asp.regulatorStartTime = time.Now().Add(cfg.RegulateCacheDelay)
147151
}
148152

149153
pols, err := newPolicies(cfg.PolicyConfig, set)
@@ -238,9 +242,9 @@ func (asp *atlassianSamplingProcessor) consumeChan() {
238242
for i := 0; i < resourceSpans.Len(); i++ {
239243
asp.processTraces(ctx, resourceSpans.At(i))
240244
}
241-
case <-asp.memTicker.C:
245+
case t := <-asp.memTicker.C:
242246
// If ticker signals, call the memory regulator
243-
if asp.memRegulator != nil {
247+
if t.After(asp.regulatorStartTime) && asp.memRegulator != nil {
244248
size := asp.memRegulator.RegulateCacheSize()
245249
asp.telemetry.ProcessorAtlassianSamplingPrimaryCacheSize.Record(ctx, int64(size))
246250
}
@@ -316,7 +320,7 @@ func (asp *atlassianSamplingProcessor) processTraces(ctx context.Context, resour
316320
asp.sendSampledTraceData(ctx, currentTrace)
317321
asp.telemetry.ProcessorAtlassianSamplingTracesSampled.Add(ctx, 1)
318322
case evaluators.NotSampled:
319-
asp.releaseNotSampledTrace(ctx, id)
323+
asp.releaseNotSampledTrace(ctx, id, pol)
320324
default:
321325
if finalDecision == evaluators.LowPriority {
322326
td.Metadata.Priority = priority.Low
@@ -382,7 +386,7 @@ func (asp *atlassianSamplingProcessor) cachedDecision(
382386
}
383387
asp.sampledDecisionCache.Put(id, time.Now())
384388
} else {
385-
asp.releaseNotSampledTrace(ctx, id)
389+
asp.releaseNotSampledTrace(ctx, id, nil)
386390
}
387391
return true
388392
}
@@ -406,10 +410,25 @@ func (asp *atlassianSamplingProcessor) sendSampledTraceData(ctx context.Context,
406410

407411
// releaseNotSampledTrace removes references to traces that have been decided to be not sampled.
408412
// It also caches the non sampled decision, and increments count of traces not sampled.
409-
func (asp *atlassianSamplingProcessor) releaseNotSampledTrace(ctx context.Context, id pcommon.TraceID) {
413+
func (asp *atlassianSamplingProcessor) releaseNotSampledTrace(ctx context.Context, id pcommon.TraceID, policy *policy) {
414+
// Check if EmitSingleSpanForNotSampled is true in the policy config
415+
if policy != nil && policy.emitSingleSpanForNotSampled {
416+
// Create a placeholder trace with a single span containing the decision policy name
417+
notSampledTrace := ptrace.NewTraces()
418+
rs := notSampledTrace.ResourceSpans().AppendEmpty()
419+
ss := rs.ScopeSpans().AppendEmpty()
420+
span := ss.Spans().AppendEmpty()
421+
span.SetTraceID(id)
422+
span.SetName("TRACE NOT SAMPLED")
423+
span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(-time.Second)))
424+
span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now()))
425+
span.Attributes().PutStr("sampling.policy", policy.name)
426+
427+
// Send the placeholder trace
428+
asp.sendSampledTraceData(ctx, notSampledTrace)
429+
}
410430
asp.nonSampledDecisionCache.Put(id, time.Now())
411431
asp.traceData.Delete(id)
412-
asp.telemetry.ProcessorAtlassianSamplingTracesNotSampled.Add(ctx, 1)
413432
}
414433

415434
func (asp *atlassianSamplingProcessor) flushAll(ctx context.Context) error {

0 commit comments

Comments
 (0)