Skip to content

Commit 2c82611

Browse files
committed
lock around each measurement in exemplar reservoir storage
1 parent 9dea78c commit 2c82611

File tree

7 files changed

+117
-35
lines changed

7 files changed

+117
-35
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
5252
- The `ErrorType` function in `go.opentelemetry.io/otel/semconv/v1.37.0` now handles custom error types.
5353
If an error implements an `ErrorType() string` method, the return value of that method will be used as the error type. (#7442)
5454
- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427)
55+
- Improve the concurrent performance of `HistogramReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar` by 4x. (#7443)
5556

5657
<!-- Released section -->
5758
<!-- Don't change this section unless doing release -->

sdk/metric/exemplar/fixed_size_reservoir.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,11 @@ func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a
130130
r.mu.Lock()
131131
defer r.mu.Unlock()
132132
if int(r.count) < cap(r.measurements) {
133-
r.store(int(r.count), newMeasurement(ctx, t, n, a))
133+
r.store(ctx, int(r.count), t, n, a)
134134
} else if r.count == r.next {
135135
// Overwrite a random existing measurement with the one offered.
136136
idx := int(rand.Int64N(int64(cap(r.measurements))))
137-
r.store(idx, newMeasurement(ctx, t, n, a))
137+
r.store(ctx, idx, t, n, a)
138138
r.advance()
139139
}
140140
r.count++

sdk/metric/exemplar/fixed_size_reservoir_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,14 @@ func TestNewFixedSizeReservoirSamplingCorrectness(t *testing.T) {
5454
// ensuring no bias in our random sampling algorithm.
5555
assert.InDelta(t, 1/mean, intensity, 0.02) // Within 5σ.
5656
}
57+
58+
func TestFixedSizeReservoirConcurrentSafe(t *testing.T) {
59+
t.Run("Int64", reservoirConcurrentSafeTest[int64](func(n int) (ReservoirProvider, int) {
60+
return FixedSizeReservoirProvider(n), n
61+
}))
62+
63+
t.Run("Float64", reservoirConcurrentSafeTest[float64](func(n int) (ReservoirProvider, int) {
64+
return FixedSizeReservoirProvider(n), n
65+
}))
66+
67+
}

sdk/metric/exemplar/histogram_reservoir.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"context"
88
"slices"
99
"sort"
10-
"sync"
1110
"time"
1211

1312
"go.opentelemetry.io/otel/attribute"
@@ -43,7 +42,6 @@ var _ Reservoir = &HistogramReservoir{}
4342
type HistogramReservoir struct {
4443
reservoir.ConcurrentSafe
4544
*storage
46-
mu sync.Mutex
4745

4846
// bounds are bucket bounds in ascending order.
4947
bounds []float64
@@ -72,18 +70,13 @@ func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a
7270
}
7371

7472
idx := sort.SearchFloat64s(r.bounds, n)
75-
m := newMeasurement(ctx, t, v, a)
7673

77-
r.mu.Lock()
78-
defer r.mu.Unlock()
79-
r.store(idx, m)
74+
r.store(ctx, idx, t, v, a)
8075
}
8176

8277
// Collect returns all the held exemplars.
8378
//
8479
// The Reservoir state is preserved after this call.
8580
func (r *HistogramReservoir) Collect(dest *[]Exemplar) {
86-
r.mu.Lock()
87-
defer r.mu.Unlock()
8881
r.storage.Collect(dest)
8982
}

sdk/metric/exemplar/histogram_reservoir_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,15 @@ func TestHist(t *testing.T) {
1515
return HistogramReservoirProvider(bounds), len(bounds)
1616
}))
1717
}
18+
19+
func TestHistogramReservoirConcurrentSafe(t *testing.T) {
20+
bounds := []float64{0, 100}
21+
t.Run("Int64", reservoirConcurrentSafeTest[int64](func(int) (ReservoirProvider, int) {
22+
return HistogramReservoirProvider(bounds), len(bounds)
23+
}))
24+
25+
t.Run("Float64", reservoirConcurrentSafeTest[float64](func(int) (ReservoirProvider, int) {
26+
return HistogramReservoirProvider(bounds), len(bounds)
27+
}))
28+
29+
}

sdk/metric/exemplar/reservoir_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
package exemplar
55

66
import (
7+
"context"
8+
"sync"
79
"testing"
810
"time"
911

@@ -144,3 +146,63 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
144146
})
145147
}
146148
}
149+
150+
func reservoirConcurrentSafeTest[N int64 | float64](f factory) func(*testing.T) {
151+
return func(t *testing.T) {
152+
t.Helper()
153+
rp, n := f(1)
154+
if n < 1 {
155+
t.Skip("skipping, reservoir capacity less than 1:", n)
156+
}
157+
r := rp(*attribute.EmptySet())
158+
159+
var wg sync.WaitGroup
160+
161+
// Call Offer concurrently with another Offer, and with Collect.
162+
for i := range 2 {
163+
wg.Add(1)
164+
go func() {
165+
ctx, ts, val, attrs := generateOfferInputs[N](t, i+1)
166+
r.Offer(ctx, ts, val, attrs)
167+
wg.Done()
168+
}()
169+
}
170+
var dest []Exemplar
171+
r.Collect(&dest)
172+
for _, e := range dest {
173+
validateExemplar[N](t, e)
174+
}
175+
wg.Wait()
176+
}
177+
}
178+
179+
func generateOfferInputs[N int64 | float64](t *testing.T, i int) (context.Context, time.Time, Value, []attribute.KeyValue) {
180+
sc := trace.NewSpanContext(trace.SpanContextConfig{
181+
TraceID: trace.TraceID([16]byte{byte(i)}),
182+
SpanID: trace.SpanID([8]byte{byte(i)}),
183+
TraceFlags: trace.FlagsSampled,
184+
})
185+
ctx := trace.ContextWithSpanContext(t.Context(), sc)
186+
ts := time.Unix(int64(i), int64(i))
187+
val := NewValue(N(i))
188+
attrs := []attribute.KeyValue{attribute.Int("i", i)}
189+
return ctx, ts, val, attrs
190+
}
191+
192+
func validateExemplar[N int64 | float64](t *testing.T, e Exemplar) {
193+
i := 0
194+
switch e.Value.Type() {
195+
case Int64ValueType:
196+
i = int(e.Value.Int64())
197+
case Float64ValueType:
198+
i = int(e.Value.Float64())
199+
}
200+
ctx, ts, _, attrs := generateOfferInputs[N](t, i)
201+
sc := trace.SpanContextFromContext(ctx)
202+
tID := sc.TraceID()
203+
sID := sc.SpanID()
204+
assert.Equal(t, tID[:], e.TraceID)
205+
assert.Equal(t, sID[:], e.SpanID)
206+
assert.Equal(t, ts, e.Time)
207+
assert.Equal(t, attrs, e.FilteredAttributes)
208+
}

sdk/metric/exemplar/storage.go

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
55

66
import (
77
"context"
8+
"sync"
89
"time"
910

1011
"go.opentelemetry.io/otel/attribute"
@@ -24,8 +25,14 @@ func newStorage(n int) *storage {
2425
return &storage{measurements: make([]measurement, n)}
2526
}
2627

27-
func (r *storage) store(idx int, m measurement) {
28-
r.measurements[idx] = m
28+
func (r *storage) store(ctx context.Context, idx int, ts time.Time, v Value, droppedAttr []attribute.KeyValue) {
29+
r.measurements[idx].mux.Lock()
30+
defer r.measurements[idx].mux.Unlock()
31+
r.measurements[idx].FilteredAttributes = droppedAttr
32+
r.measurements[idx].Time = ts
33+
r.measurements[idx].Value = v
34+
r.measurements[idx].Ctx = ctx
35+
r.measurements[idx].valid = true
2936
}
3037

3138
// Collect returns all the held exemplars.
@@ -34,61 +41,57 @@ func (r *storage) store(idx int, m measurement) {
3441
func (r *storage) Collect(dest *[]Exemplar) {
3542
*dest = reset(*dest, len(r.measurements), len(r.measurements))
3643
var n int
37-
for _, m := range r.measurements {
38-
if !m.valid {
39-
continue
44+
for i := range r.measurements {
45+
if r.measurements[i].exemplar(&(*dest)[n]) {
46+
n++
4047
}
41-
42-
m.exemplar(&(*dest)[n])
43-
n++
4448
}
4549
*dest = (*dest)[:n]
4650
}
4751

4852
// measurement is a measurement made by a telemetry system.
4953
type measurement struct {
54+
mux sync.Mutex
5055
// FilteredAttributes are the attributes dropped during the measurement.
5156
FilteredAttributes []attribute.KeyValue
5257
// Time is the time when the measurement was made.
5358
Time time.Time
5459
// Value is the value of the measurement.
5560
Value Value
56-
// SpanContext is the SpanContext active when a measurement was made.
57-
SpanContext trace.SpanContext
61+
// Ctx is the context active when a measurement was made.
62+
Ctx context.Context
5863

5964
valid bool
6065
}
6166

62-
// newMeasurement returns a new non-empty Measurement.
63-
func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []attribute.KeyValue) measurement {
64-
return measurement{
65-
FilteredAttributes: droppedAttr,
66-
Time: ts,
67-
Value: v,
68-
SpanContext: trace.SpanContextFromContext(ctx),
69-
valid: true,
67+
// exemplar returns m as an [Exemplar].
68+
// returns true if it populated the exemplar.
69+
func (m *measurement) exemplar(dest *Exemplar) bool {
70+
m.mux.Lock()
71+
defer m.mux.Unlock()
72+
if !m.valid {
73+
return false
7074
}
71-
}
7275

73-
// exemplar returns m as an [Exemplar].
74-
func (m measurement) exemplar(dest *Exemplar) {
7576
dest.FilteredAttributes = m.FilteredAttributes
7677
dest.Time = m.Time
7778
dest.Value = m.Value
7879

79-
if m.SpanContext.HasTraceID() {
80-
traceID := m.SpanContext.TraceID()
80+
sc := trace.SpanContextFromContext(m.Ctx)
81+
if sc.HasTraceID() {
82+
traceID := sc.TraceID()
8183
dest.TraceID = traceID[:]
8284
} else {
8385
dest.TraceID = dest.TraceID[:0]
8486
}
8587

86-
if m.SpanContext.HasSpanID() {
87-
spanID := m.SpanContext.SpanID()
88+
if sc.HasSpanID() {
89+
spanID := sc.SpanID()
8890
dest.SpanID = spanID[:]
8991
} else {
9092
dest.SpanID = dest.SpanID[:0]
9193
}
94+
return true
9295
}
9396

9497
func reset[T any](s []T, length, capacity int) []T {

0 commit comments

Comments
 (0)