Skip to content

Commit 67df837

Browse files
committed
add concurrent safe test for histogram reservoir
1 parent 46403cc commit 67df837

File tree

3 files changed

+86
-0
lines changed

3 files changed

+86
-0
lines changed

sdk/metric/exemplar/fixed_size_reservoir_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,14 @@ func TestNewFixedSizeReservoirSamplingCorrectness(t *testing.T) {
5858
// ensuring no bias in our random sampling algorithm.
5959
assert.InDelta(t, 1/mean, intensity, 0.02) // Within 5σ.
6060
}
61+
62+
func TestFixedSizeReservoirConcurrentSafe(t *testing.T) {
63+
t.Run("Int64", reservoirConcurrentSafeTest[int64](func(n int) (ReservoirProvider, int) {
64+
return FixedSizeReservoirProvider(n), n
65+
}))
66+
67+
t.Run("Float64", reservoirConcurrentSafeTest[float64](func(n int) (ReservoirProvider, int) {
68+
return FixedSizeReservoirProvider(n), n
69+
}))
70+
71+
}

sdk/metric/exemplar/histogram_reservoir_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,15 @@ func TestHist(t *testing.T) {
1515
return HistogramReservoirProvider(bounds), len(bounds)
1616
}))
1717
}
18+
19+
func TestHistogramReservoirConcurrentSafe(t *testing.T) {
20+
bounds := []float64{0, 100}
21+
t.Run("Int64", reservoirConcurrentSafeTest[int64](func(int) (ReservoirProvider, int) {
22+
return HistogramReservoirProvider(bounds), len(bounds)
23+
}))
24+
25+
t.Run("Float64", reservoirConcurrentSafeTest[float64](func(int) (ReservoirProvider, int) {
26+
return HistogramReservoirProvider(bounds), len(bounds)
27+
}))
28+
29+
}

sdk/metric/exemplar/reservoir_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
package exemplar
55

66
import (
7+
"context"
8+
"sync"
79
"testing"
810
"time"
911

@@ -144,3 +146,64 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
144146
})
145147
}
146148
}
149+
150+
func reservoirConcurrentSafeTest[N int64 | float64](f factory) func(*testing.T) {
151+
return func(t *testing.T) {
152+
t.Helper()
153+
rp, n := f(1)
154+
if n < 1 {
155+
t.Skip("skipping, reservoir capacity less than 1:", n)
156+
}
157+
r := rp(*attribute.EmptySet())
158+
159+
var wg sync.WaitGroup
160+
161+
for i := range 5 {
162+
wg.Add(1)
163+
go func() {
164+
ctx, ts, val, attrs := generateOfferInputs[N](t, i+1)
165+
r.Offer(ctx, ts, val, attrs)
166+
wg.Done()
167+
}()
168+
}
169+
for range 2 {
170+
wg.Add(1)
171+
var dest []Exemplar
172+
r.Collect(&dest)
173+
for _, e := range dest {
174+
validateExemplar[N](t, e)
175+
}
176+
wg.Done()
177+
}
178+
wg.Wait()
179+
}
180+
}
181+
182+
func generateOfferInputs[N int64 | float64](t *testing.T, i int) (context.Context, time.Time, Value, []attribute.KeyValue) {
183+
sc := trace.NewSpanContext(trace.SpanContextConfig{
184+
TraceID: trace.TraceID([16]byte{byte(i)}),
185+
SpanID: trace.SpanID([8]byte{byte(i)}),
186+
TraceFlags: trace.FlagsSampled,
187+
})
188+
ctx := trace.ContextWithSpanContext(t.Context(), sc)
189+
ts := time.Unix(int64(i), int64(i))
190+
val := NewValue(N(i))
191+
attrs := []attribute.KeyValue{attribute.Int("i", i)}
192+
return ctx, ts, val, attrs
193+
}
194+
195+
func validateExemplar[N int64 | float64](t *testing.T, e Exemplar) {
196+
i := 0
197+
switch e.Value.Type() {
198+
case Int64ValueType:
199+
i = int(e.Value.Int64())
200+
case Float64ValueType:
201+
i = int(e.Value.Float64())
202+
}
203+
ctx, ts, _, attrs := generateOfferInputs[N](t, i)
204+
sc := trace.SpanContextFromContext(ctx)
205+
assert.Equal(t, sc.TraceID(), e.TraceID)
206+
assert.Equal(t, sc.SpanID(), e.SpanID)
207+
assert.Equal(t, ts, e.Time)
208+
assert.Equal(t, attrs, e.FilteredAttributes)
209+
}

0 commit comments

Comments
 (0)