From c3c6475d5047752429e63a8ae272ae4179138c62 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 2 Oct 2025 13:50:12 +0000 Subject: [PATCH 1/5] optimize histogram reservoir performance --- CHANGELOG.md | 1 + .../exemplar/fixed_size_reservoir_test.go | 11 +++- sdk/metric/exemplar/histogram_reservoir.go | 7 +-- sdk/metric/exemplar/storage.go | 52 ++++++++++++------- 4 files changed, 45 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 29d5010a21a..06590ddc9a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The default `TranslationStrategy` in `go.opentelemetry.io/exporters/prometheus` is changed from `otlptranslator.NoUTF8EscapingWithSuffixes` to `otlptranslator.UnderscoreEscapingWithSuffixes`. (#7421) - The `ErrorType` function in `go.opentelemetry.io/otel/semconv/v1.37.0` now handles custom error types. If an error implements an `ErrorType() string` method, the return value of that method will be used as the error type. (#7442) +- Improve the concurrent performance of `HistogramReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar` by 10x. (#7443) diff --git a/sdk/metric/exemplar/fixed_size_reservoir_test.go b/sdk/metric/exemplar/fixed_size_reservoir_test.go index 914a2238d69..55b012e7f5b 100644 --- a/sdk/metric/exemplar/fixed_size_reservoir_test.go +++ b/sdk/metric/exemplar/fixed_size_reservoir_test.go @@ -45,8 +45,15 @@ func TestNewFixedSizeReservoirSamplingCorrectness(t *testing.T) { } var sum float64 - for _, m := range r.measurements { - sum += m.Value.Float64() + for _, val := range r.measurements { + loaded := val.Load() + if loaded == nil { + continue + } + m := loaded.(*measurement) + if m != nil { + sum += m.Value.Float64() + } } mean := sum / float64(sampleSize) diff --git a/sdk/metric/exemplar/histogram_reservoir.go b/sdk/metric/exemplar/histogram_reservoir.go index 12cf8d36a63..6ca448985a8 100644 --- a/sdk/metric/exemplar/histogram_reservoir.go +++ b/sdk/metric/exemplar/histogram_reservoir.go @@ -69,10 +69,5 @@ func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a panic("unknown value type") } - idx := sort.SearchFloat64s(r.bounds, n) - m := newMeasurement(ctx, t, v, a) - - r.mu.Lock() - defer r.mu.Unlock() - r.store(idx, m) + r.store(sort.SearchFloat64s(r.bounds, n), newMeasurement(ctx, t, v, a)) } diff --git a/sdk/metric/exemplar/storage.go b/sdk/metric/exemplar/storage.go index 760c3c87119..ad9d2b87d3e 100644 --- a/sdk/metric/exemplar/storage.go +++ b/sdk/metric/exemplar/storage.go @@ -6,6 +6,7 @@ package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar" import ( "context" "sync" + "sync/atomic" "time" "go.opentelemetry.io/otel/attribute" @@ -19,15 +20,18 @@ type storage struct { // // This does not use []metricdata.Exemplar because it potentially would // require an allocation for trace and span IDs in the hot path of Offer. - measurements []measurement + measurements []atomic.Value } func newStorage(n int) *storage { - return &storage{measurements: make([]measurement, n)} + return &storage{measurements: make([]atomic.Value, n)} } -func (r *storage) store(idx int, m measurement) { - r.measurements[idx] = m +func (r *storage) store(idx int, m *measurement) { + old := r.measurements[idx].Swap(m) + if old != nil { + mPool.Put(old) + } } // Collect returns all the held exemplars. @@ -38,7 +42,12 @@ func (r *storage) Collect(dest *[]Exemplar) { defer r.mu.Unlock() *dest = reset(*dest, len(r.measurements), len(r.measurements)) var n int - for _, m := range r.measurements { + for _, val := range r.measurements { + loaded := val.Load() + if loaded == nil { + continue + } + m := loaded.(*measurement) if !m.valid { continue } @@ -58,20 +67,26 @@ type measurement struct { // Value is the value of the measurement. Value Value // SpanContext is the SpanContext active when a measurement was made. - SpanContext trace.SpanContext + Ctx context.Context valid bool } +var mPool = sync.Pool{ + New: func() any { + return &measurement{} + }, +} + // newMeasurement returns a new non-empty Measurement. -func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []attribute.KeyValue) measurement { - return measurement{ - FilteredAttributes: droppedAttr, - Time: ts, - Value: v, - SpanContext: trace.SpanContextFromContext(ctx), - valid: true, - } +func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []attribute.KeyValue) *measurement { + m := mPool.Get().(*measurement) + m.FilteredAttributes = droppedAttr + m.Time = ts + m.Value = v + m.Ctx = ctx + m.valid = true + return m } // exemplar returns m as an [Exemplar]. @@ -80,15 +95,16 @@ func (m measurement) exemplar(dest *Exemplar) { dest.Time = m.Time dest.Value = m.Value - if m.SpanContext.HasTraceID() { - traceID := m.SpanContext.TraceID() + sc := trace.SpanContextFromContext(m.Ctx) + if sc.HasTraceID() { + traceID := sc.TraceID() dest.TraceID = traceID[:] } else { dest.TraceID = dest.TraceID[:0] } - if m.SpanContext.HasSpanID() { - spanID := m.SpanContext.SpanID() + if sc.HasSpanID() { + spanID := sc.SpanID() dest.SpanID = spanID[:] } else { dest.SpanID = dest.SpanID[:0] From 00d1717bb78faa13289ad8c800e5418e76236805 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 2 Oct 2025 14:45:50 +0000 Subject: [PATCH 2/5] factor nextTracker out of fixed size reservoir --- sdk/metric/exemplar/fixed_size_reservoir.go | 107 +++++++++++--------- 1 file changed, 57 insertions(+), 50 deletions(-) diff --git a/sdk/metric/exemplar/fixed_size_reservoir.go b/sdk/metric/exemplar/fixed_size_reservoir.go index 6afb3bed3af..6252514e3d3 100644 --- a/sdk/metric/exemplar/fixed_size_reservoir.go +++ b/sdk/metric/exemplar/fixed_size_reservoir.go @@ -25,7 +25,10 @@ func FixedSizeReservoirProvider(k int) ReservoirProvider { // sample each one. If there are more than k, the Reservoir will then randomly // sample all additional measurement with a decreasing probability. func NewFixedSizeReservoir(k int) *FixedSizeReservoir { - return newFixedSizeReservoir(newStorage(k)) + return &FixedSizeReservoir{ + storage: newStorage(k), + nextTracker: newNextTracker(k), + } } var _ Reservoir = &FixedSizeReservoir{} @@ -37,39 +40,7 @@ var _ Reservoir = &FixedSizeReservoir{} type FixedSizeReservoir struct { reservoir.ConcurrentSafe *storage - - // count is the number of measurement seen. - count int64 - // next is the next count that will store a measurement at a random index - // once the reservoir has been filled. - next int64 - // w is the largest random number in a distribution that is used to compute - // the next next. - w float64 -} - -func newFixedSizeReservoir(s *storage) *FixedSizeReservoir { - r := &FixedSizeReservoir{ - storage: s, - } - r.reset() - return r -} - -// randomFloat64 returns, as a float64, a uniform pseudo-random number in the -// open interval (0.0,1.0). -func (*FixedSizeReservoir) randomFloat64() float64 { - // TODO: Use an algorithm that avoids rejection sampling. For example: - // - // const precision = 1 << 53 // 2^53 - // // Generate an integer in [1, 2^53 - 1] - // v := rand.Uint64() % (precision - 1) + 1 - // return float64(v) / float64(precision) - f := rand.Float64() - for f == 0 { - f = rand.Float64() - } - return f + *nextTracker } // Offer accepts the parameters associated with a measurement. The @@ -138,12 +109,44 @@ func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a r.count++ } +// Collect returns all the held exemplars. +// +// The Reservoir state is preserved after this call. +func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) { + r.storage.Collect(dest) + // Call reset here even though it will reset r.count and restart the random + // number series. This will persist any old exemplars as long as no new + // measurements are offered, but it will also prioritize those new + // measurements that are made over the older collection cycle ones. + r.reset() +} + +func newNextTracker(k int) *nextTracker { + nt := &nextTracker{measurementsCap: k} + nt.reset() + return nt +} + +type nextTracker struct { + // count is the number of measurement seen. + count int64 + // next is the next count that will store a measurement at a random index + // once the reservoir has been filled. + next int64 + // w is the largest random number in a distribution that is used to compute + // the next next. + w float64 + // measurementsCap is the number of measurements that can be stored in the + // reservoir. + measurementsCap int +} + // reset resets r to the initial state. -func (r *FixedSizeReservoir) reset() { +func (r *nextTracker) reset() { // This resets the number of exemplars known. r.count = 0 // Random index inserts should only happen after the storage is full. - r.next = int64(cap(r.measurements)) + r.next = int64(r.measurementsCap) // Initial random number in the series used to generate r.next. // @@ -154,14 +157,14 @@ func (r *FixedSizeReservoir) reset() { // This maps the uniform random number in (0,1) to a geometric distribution // over the same interval. The mean of the distribution is inversely // proportional to the storage capacity. - r.w = math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.measurements))) + r.w = math.Exp(math.Log(randomFloat64()) / float64(r.measurementsCap)) r.advance() } // advance updates the count at which the offered measurement will overwrite an // existing exemplar. -func (r *FixedSizeReservoir) advance() { +func (r *nextTracker) advance() { // Calculate the next value in the random number series. // // The current value of r.w is based on the max of a distribution of random @@ -174,7 +177,7 @@ func (r *FixedSizeReservoir) advance() { // therefore the next r.w will be based on the same distribution (i.e. // `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by // computing the next random number `u` and take r.w as `w * u^(1/k)`. - r.w *= math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.measurements))) + r.w *= math.Exp(math.Log(randomFloat64()) / float64(r.measurementsCap)) // Use the new random number in the series to calculate the count of the // next measurement that will be stored. // @@ -185,17 +188,21 @@ func (r *FixedSizeReservoir) advance() { // // Important to note, the new r.next will always be at least 1 more than // the last r.next. - r.next += int64(math.Log(r.randomFloat64())/math.Log(1-r.w)) + 1 + r.next += int64(math.Log(randomFloat64())/math.Log(1-r.w)) + 1 } -// Collect returns all the held exemplars. -// -// The Reservoir state is preserved after this call. -func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) { - r.storage.Collect(dest) - // Call reset here even though it will reset r.count and restart the random - // number series. This will persist any old exemplars as long as no new - // measurements are offered, but it will also prioritize those new - // measurements that are made over the older collection cycle ones. - r.reset() +// randomFloat64 returns, as a float64, a uniform pseudo-random number in the +// open interval (0.0,1.0). +func randomFloat64() float64 { + // TODO: Use an algorithm that avoids rejection sampling. For example: + // + // const precision = 1 << 53 // 2^53 + // // Generate an integer in [1, 2^53 - 1] + // v := rand.Uint64() % (precision - 1) + 1 + // return float64(v) / float64(precision) + f := rand.Float64() + for f == 0 { + f = rand.Float64() + } + return f } From 310416e79d1a9d635b7585a99b13a846520d79e0 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 2 Oct 2025 15:20:56 +0000 Subject: [PATCH 3/5] use atomics for count and next --- sdk/metric/exemplar/fixed_size_reservoir.go | 38 +++++++++++++------ .../exemplar/fixed_size_reservoir_test.go | 16 ++++++++ 2 files changed, 42 insertions(+), 12 deletions(-) diff --git a/sdk/metric/exemplar/fixed_size_reservoir.go b/sdk/metric/exemplar/fixed_size_reservoir.go index 6252514e3d3..84b688647bd 100644 --- a/sdk/metric/exemplar/fixed_size_reservoir.go +++ b/sdk/metric/exemplar/fixed_size_reservoir.go @@ -7,6 +7,7 @@ import ( "context" "math" "math/rand/v2" + "sync/atomic" "time" "go.opentelemetry.io/otel/attribute" @@ -98,15 +99,15 @@ func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a r.mu.Lock() defer r.mu.Unlock() - if int(r.count) < cap(r.measurements) { - r.store(int(r.count), newMeasurement(ctx, t, n, a)) - } else if r.count == r.next { + count, next := r.incrementCount() + if int(count) < r.measurementsCap { + r.store(int(count), newMeasurement(ctx, t, n, a)) + } else if count == next { // Overwrite a random existing measurement with the one offered. idx := int(rand.Int64N(int64(cap(r.measurements)))) r.store(idx, newMeasurement(ctx, t, n, a)) r.advance() } - r.count++ } // Collect returns all the held exemplars. @@ -128,11 +129,9 @@ func newNextTracker(k int) *nextTracker { } type nextTracker struct { - // count is the number of measurement seen. - count int64 - // next is the next count that will store a measurement at a random index - // once the reservoir has been filled. - next int64 + // count is the number of measurement seen, and is in the lower 32 bits. + // once the reservoir has been filled, and is in the upper 32 bits. + countAndNext atomic.Uint64 // w is the largest random number in a distribution that is used to compute // the next next. w float64 @@ -144,9 +143,8 @@ type nextTracker struct { // reset resets r to the initial state. func (r *nextTracker) reset() { // This resets the number of exemplars known. - r.count = 0 // Random index inserts should only happen after the storage is full. - r.next = int64(r.measurementsCap) + r.setCountAndNext(0, uint64(r.measurementsCap)) // Initial random number in the series used to generate r.next. // @@ -162,6 +160,22 @@ func (r *nextTracker) reset() { r.advance() } +// returns the count before the increment and next value. +func (r *nextTracker) incrementCount() (uint64, uint64) { + n := r.countAndNext.Add(1) + return n&((1<<32)-1) - 1, n >> 32 +} + +// returns the count before the increment and next value. +func (r *nextTracker) incrementNext(inc uint64) { + r.countAndNext.Add(inc << 32) +} + +// returns the count before the increment and next value. +func (r *nextTracker) setCountAndNext(count uint64, next uint64) { + r.countAndNext.Store(next<<32 + count) +} + // advance updates the count at which the offered measurement will overwrite an // existing exemplar. func (r *nextTracker) advance() { @@ -188,7 +202,7 @@ func (r *nextTracker) advance() { // // Important to note, the new r.next will always be at least 1 more than // the last r.next. - r.next += int64(math.Log(randomFloat64())/math.Log(1-r.w)) + 1 + r.incrementNext(uint64(math.Log(randomFloat64())/math.Log(1-r.w)) + 1) } // randomFloat64 returns, as a float64, a uniform pseudo-random number in the diff --git a/sdk/metric/exemplar/fixed_size_reservoir_test.go b/sdk/metric/exemplar/fixed_size_reservoir_test.go index 55b012e7f5b..90992e54ce6 100644 --- a/sdk/metric/exemplar/fixed_size_reservoir_test.go +++ b/sdk/metric/exemplar/fixed_size_reservoir_test.go @@ -61,3 +61,19 @@ func TestNewFixedSizeReservoirSamplingCorrectness(t *testing.T) { // ensuring no bias in our random sampling algorithm. assert.InDelta(t, 1/mean, intensity, 0.02) // Within 5σ. } + +func TestNextTrackerAtomics(t *testing.T) { + capacity := 10 + nt := newNextTracker(capacity) + nt.setCountAndNext(0, 11) + count, next := nt.incrementCount() + assert.Equal(t, uint64(0), count) + assert.Equal(t, uint64(11), next) + count, secondNext := nt.incrementCount() + assert.Equal(t, uint64(1), count) + assert.Equal(t, next, secondNext) + nt.setCountAndNext(50, 100) + count, next = nt.incrementCount() + assert.Equal(t, uint64(50), count) + assert.Equal(t, uint64(100), next) +} From a894b33d874f761d7b938e9d30eea4b924bb774e Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 2 Oct 2025 16:11:28 +0000 Subject: [PATCH 4/5] only lock during advance and reset --- CHANGELOG.md | 1 + sdk/metric/exemplar/fixed_size_reservoir.go | 34 +++++++++++++-------- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 06590ddc9a7..e835c1d9085 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `ErrorType` function in `go.opentelemetry.io/otel/semconv/v1.37.0` now handles custom error types. If an error implements an `ErrorType() string` method, the return value of that method will be used as the error type. (#7442) - Improve the concurrent performance of `HistogramReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar` by 10x. (#7443) +- Improve the concurrent performance of `FixedSizeReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar` by 3x. (#7447) diff --git a/sdk/metric/exemplar/fixed_size_reservoir.go b/sdk/metric/exemplar/fixed_size_reservoir.go index 84b688647bd..40dd7926c66 100644 --- a/sdk/metric/exemplar/fixed_size_reservoir.go +++ b/sdk/metric/exemplar/fixed_size_reservoir.go @@ -7,6 +7,7 @@ import ( "context" "math" "math/rand/v2" + "sync" "sync/atomic" "time" @@ -26,6 +27,9 @@ func FixedSizeReservoirProvider(k int) ReservoirProvider { // sample each one. If there are more than k, the Reservoir will then randomly // sample all additional measurement with a decreasing probability. func NewFixedSizeReservoir(k int) *FixedSizeReservoir { + if k < 0 { + k = 0 + } return &FixedSizeReservoir{ storage: newStorage(k), nextTracker: newNextTracker(k), @@ -97,15 +101,16 @@ func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a // https://github.com/MrAlias/reservoir-sampling for a performance // comparison of reservoir sampling algorithms. - r.mu.Lock() - defer r.mu.Unlock() count, next := r.incrementCount() - if int(count) < r.measurementsCap { - r.store(int(count), newMeasurement(ctx, t, n, a)) + intCount := int(count) // nolint:gosec // count is at most 32 bits in length + if intCount < r.k { + r.store(intCount, newMeasurement(ctx, t, n, a)) } else if count == next { // Overwrite a random existing measurement with the one offered. - idx := int(rand.Int64N(int64(cap(r.measurements)))) + idx := rand.IntN(r.k) r.store(idx, newMeasurement(ctx, t, n, a)) + r.wMu.Lock() + defer r.wMu.Unlock() r.advance() } } @@ -123,7 +128,7 @@ func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) { } func newNextTracker(k int) *nextTracker { - nt := &nextTracker{measurementsCap: k} + nt := &nextTracker{k: k} nt.reset() return nt } @@ -135,16 +140,19 @@ type nextTracker struct { // w is the largest random number in a distribution that is used to compute // the next next. w float64 - // measurementsCap is the number of measurements that can be stored in the - // reservoir. - measurementsCap int + // wMu ensures w is kept consistent with next during advance and reset. + wMu sync.Mutex + // k is the number of measurements that can be stored in the reservoir. + k int } // reset resets r to the initial state. func (r *nextTracker) reset() { + r.wMu.Lock() + defer r.wMu.Unlock() // This resets the number of exemplars known. // Random index inserts should only happen after the storage is full. - r.setCountAndNext(0, uint64(r.measurementsCap)) + r.setCountAndNext(0, uint64(r.k)) // nolint:gosec // we ensure k is 1 or greater. // Initial random number in the series used to generate r.next. // @@ -155,7 +163,7 @@ func (r *nextTracker) reset() { // This maps the uniform random number in (0,1) to a geometric distribution // over the same interval. The mean of the distribution is inversely // proportional to the storage capacity. - r.w = math.Exp(math.Log(randomFloat64()) / float64(r.measurementsCap)) + r.w = math.Exp(math.Log(randomFloat64()) / float64(r.k)) r.advance() } @@ -172,7 +180,7 @@ func (r *nextTracker) incrementNext(inc uint64) { } // returns the count before the increment and next value. -func (r *nextTracker) setCountAndNext(count uint64, next uint64) { +func (r *nextTracker) setCountAndNext(count, next uint64) { r.countAndNext.Store(next<<32 + count) } @@ -191,7 +199,7 @@ func (r *nextTracker) advance() { // therefore the next r.w will be based on the same distribution (i.e. // `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by // computing the next random number `u` and take r.w as `w * u^(1/k)`. - r.w *= math.Exp(math.Log(randomFloat64()) / float64(r.measurementsCap)) + r.w *= math.Exp(math.Log(randomFloat64()) / float64(r.k)) // Use the new random number in the series to calculate the count of the // next measurement that will be stored. // From 34911fb5476100b951f6506c0f36c1b64b5a9dd7 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Mon, 6 Oct 2025 15:51:05 +0000 Subject: [PATCH 5/5] histogram reservoir uses a time-weighted algorithm for storing exemplars --- sdk/metric/exemplar/benchmark_test.go | 14 +++++++--- sdk/metric/exemplar/histogram_reservoir.go | 30 +++++++++++++++++++--- 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/sdk/metric/exemplar/benchmark_test.go b/sdk/metric/exemplar/benchmark_test.go index f00a570f5cf..f1d90c7243d 100644 --- a/sdk/metric/exemplar/benchmark_test.go +++ b/sdk/metric/exemplar/benchmark_test.go @@ -18,9 +18,9 @@ func BenchmarkFixedSizeReservoirOffer(b *testing.B) { i := 0 for pb.Next() { reservoir.Offer(ctx, ts, val, nil) - // Periodically trigger a reset, because the algorithm for fixed-size - // reservoirs records exemplars very infrequently after a large - // number of collect calls. + // Periodically trigger a reset, because the algorithm records + // exemplars very infrequently after a large number of collect + // calls. if i%100 == 99 { reservoir.mu.Lock() reservoir.reset() @@ -44,6 +44,14 @@ func BenchmarkHistogramReservoirOffer(b *testing.B) { i := 0 for pb.Next() { res.Offer(ctx, ts, values[i%len(values)], nil) + // Periodically trigger a reset, because the algorithm records + // exemplars very infrequently after a large number of collect + // calls. + if i%100 == 99 { + for i := range res.trackers { + res.trackers[i].reset() + } + } i++ } }) diff --git a/sdk/metric/exemplar/histogram_reservoir.go b/sdk/metric/exemplar/histogram_reservoir.go index 6ca448985a8..7da3ca908d8 100644 --- a/sdk/metric/exemplar/histogram_reservoir.go +++ b/sdk/metric/exemplar/histogram_reservoir.go @@ -29,8 +29,9 @@ func HistogramReservoirProvider(bounds []float64) ReservoirProvider { // The passed bounds must be sorted before calling this function. func NewHistogramReservoir(bounds []float64) *HistogramReservoir { return &HistogramReservoir{ - bounds: bounds, - storage: newStorage(len(bounds) + 1), + bounds: bounds, + storage: newStorage(len(bounds) + 1), + trackers: make([]nextTracker, len(bounds)+1), } } @@ -43,6 +44,8 @@ type HistogramReservoir struct { reservoir.ConcurrentSafe *storage + trackers []nextTracker + // bounds are bucket bounds in ascending order. bounds []float64 } @@ -68,6 +71,27 @@ func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a default: panic("unknown value type") } + idx := sort.SearchFloat64s(r.bounds, n) + + count, next := r.trackers[idx].incrementCount() + if count == 0 || count == next { + r.store(idx, newMeasurement(ctx, t, v, a)) + r.trackers[idx].wMu.Lock() + defer r.trackers[idx].wMu.Unlock() + r.trackers[idx].advance() + } +} - r.store(sort.SearchFloat64s(r.bounds, n), newMeasurement(ctx, t, v, a)) +// Collect returns all the held exemplars. +// +// The Reservoir state is preserved after this call. +func (r *HistogramReservoir) Collect(dest *[]Exemplar) { + r.storage.Collect(dest) + // Call reset here even though it will reset r.count and restart the random + // number series. This will persist any old exemplars as long as no new + // measurements are offered, but it will also prioritize those new + // measurements that are made over the older collection cycle ones. + for i := range r.trackers { + r.trackers[i].reset() + } }