Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@ public enum HistogramAggregationParam {
new DoubleExplicitBucketHistogramAggregator(
ExplicitBucketHistogramUtils.createBoundaryArray(
ExplicitBucketHistogramUtils.DEFAULT_HISTOGRAM_BUCKET_BOUNDARIES),
ExemplarReservoir::doubleNoSamples,
ExemplarReservoir::noSamples,
IMMUTABLE_DATA)),
EXPLICIT_SINGLE_BUCKET(
new DoubleExplicitBucketHistogramAggregator(
ExplicitBucketHistogramUtils.createBoundaryArray(Collections.emptyList()),
ExemplarReservoir::doubleNoSamples,
ExemplarReservoir::noSamples,
IMMUTABLE_DATA)),
EXPONENTIAL_SMALL_CIRCULAR_BUFFER(
new DoubleBase2ExponentialHistogramAggregator(
ExemplarReservoir::doubleNoSamples, 20, 0, IMMUTABLE_DATA)),
ExemplarReservoir::noSamples, 20, 0, IMMUTABLE_DATA)),
EXPONENTIAL_CIRCULAR_BUFFER(
new DoubleBase2ExponentialHistogramAggregator(
ExemplarReservoir::doubleNoSamples, 160, 0, IMMUTABLE_DATA));
ExemplarReservoir::noSamples, 160, 0, IMMUTABLE_DATA));

private final Aggregator<?, ?> aggregator;
private final Aggregator<?> aggregator;

HistogramAggregationParam(Aggregator<?, ?> aggregator) {
HistogramAggregationParam(Aggregator<?> aggregator) {
this.aggregator = aggregator;
}

public Aggregator<?, ?> getAggregator() {
public Aggregator<?> getAggregator() {
return this.aggregator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class HistogramBenchmark {
public static class ThreadState {
@Param HistogramValueGenerator valueGen;
@Param HistogramAggregationParam aggregation;
private AggregatorHandle<?, ?> aggregatorHandle;
private AggregatorHandle<?> aggregatorHandle;
private DoubleSupplier valueSupplier;

@Setup(Level.Trial)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class HistogramScaleBenchmark {
public static class ThreadState {
@Param HistogramValueGenerator valueGen;
@Param HistogramAggregationParam aggregation;
private AggregatorHandle<?, ?> aggregatorHandle;
private AggregatorHandle<?> aggregatorHandle;
private DoubleSupplier valueSupplier;

@Setup(Level.Invocation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ WriteableMetricStorage registerSynchronousMetricStorage(InstrumentDescriptor ins
/** Register new asynchronous storage associated with a given instrument. */
SdkObservableMeasurement registerObservableMeasurement(
InstrumentDescriptor instrumentDescriptor) {
List<AsynchronousMetricStorage<?, ?>> registeredStorages = new ArrayList<>();
List<AsynchronousMetricStorage<?>> registeredStorages = new ArrayList<>();
for (Map.Entry<RegisteredReader, MetricStorageRegistry> entry :
readerStorageRegistries.entrySet()) {
RegisteredReader reader = entry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;

abstract class AbstractSumAggregator<T extends PointData, U extends ExemplarData>
implements Aggregator<T, U> {
implements Aggregator<T> {
private final boolean isMonotonic;

AbstractSumAggregator(InstrumentDescriptor instrumentDescriptor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.MetricDataType;
import io.opentelemetry.sdk.metrics.data.PointData;
Expand All @@ -25,9 +23,9 @@
* at any time.
*/
@Immutable
public interface Aggregator<T extends PointData, U extends ExemplarData> {
public interface Aggregator<T extends PointData> {
/** Returns the drop aggregator, an aggregator that drops measurements. */
static Aggregator<?, DoubleExemplarData> drop() {
static Aggregator<?> drop() {
return DropAggregator.INSTANCE;
}

Expand All @@ -37,7 +35,7 @@ static Aggregator<?, DoubleExemplarData> drop() {
*
* @return a new {@link AggregatorHandle}.
*/
AggregatorHandle<T, U> createHandle();
AggregatorHandle<T> createHandle();

/**
* Returns a new DELTA point by computing the difference between two cumulative points.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package io.opentelemetry.sdk.metrics.internal.aggregator;

import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
Expand All @@ -31,7 +30,7 @@ public interface AggregatorFactory {
* @return a new {@link Aggregator}. {@link Aggregator#drop()} indicates no measurements should be
* recorded.
*/
<T extends PointData, U extends ExemplarData> Aggregator<T, U> createAggregator(
<T extends PointData> Aggregator<T> createAggregator(
InstrumentDescriptor instrumentDescriptor,
ExemplarFilter exemplarFilter,
MemoryMode memoryMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import java.util.List;
Expand All @@ -24,14 +25,16 @@
* at any time.
*/
@ThreadSafe
public abstract class AggregatorHandle<T extends PointData, U extends ExemplarData> {
public abstract class AggregatorHandle<T extends PointData> {

// A reservoir of sampled exemplars for this time period.
private final ExemplarReservoir<U> exemplarReservoir;
private final ExemplarReservoir exemplarReservoir;
private volatile boolean valuesRecorded = false;
private final boolean isDoubleType;

protected AggregatorHandle(ExemplarReservoir<U> exemplarReservoir) {
protected AggregatorHandle(ExemplarReservoir exemplarReservoir) {
this.exemplarReservoir = exemplarReservoir;
this.isDoubleType = isDoubleType();
}

/**
Expand All @@ -44,21 +47,54 @@ public final T aggregateThenMaybeReset(
valuesRecorded = false;
}

return doAggregateThenMaybeReset(
if (isDoubleType) {
return doAggregateThenMaybeResetDoubles(
startEpochNanos,
epochNanos,
attributes,
exemplarReservoir.collectAndResetDoubles(attributes),
reset);
}
return doAggregateThenMaybeResetLongs(
startEpochNanos,
epochNanos,
attributes,
exemplarReservoir.collectAndReset(attributes),
exemplarReservoir.collectAndResetLongs(attributes),
reset);
}

/**
* Indicates whether this {@link AggregatorHandle} supports double or long values.
*
* <p>If it supports doubles, it MUST implement {@link #doAggregateThenMaybeResetDoubles(long,
* long, Attributes, List, boolean)} and {@link #doRecordDouble(double)}.
*
* <p>If it supports long, it MUST implement {@link #doAggregateThenMaybeResetLongs(long, long,
* Attributes, List, boolean)} and {@link #doRecordLong(long)}.
*
* @return true if it supports doubles, false if it supports longs.
*/
protected abstract boolean isDoubleType();

/** Implementation of the {@link #aggregateThenMaybeReset(long, long, Attributes, boolean)} . */
protected T doAggregateThenMaybeResetDoubles(
long startEpochNanos,
long epochNanos,
Attributes attributes,
List<DoubleExemplarData> exemplars,
boolean reset) {
throw new UnsupportedOperationException("This aggregator does not support double values.");
}

/** Implementation of the {@link #aggregateThenMaybeReset(long, long, Attributes, boolean)} . */
protected abstract T doAggregateThenMaybeReset(
protected T doAggregateThenMaybeResetLongs(
long startEpochNanos,
long epochNanos,
Attributes attributes,
List<U> exemplars,
boolean reset);
List<LongExemplarData> exemplars,
boolean reset) {
throw new UnsupportedOperationException("This aggregator does not support long values.");
}

public final void recordLong(long value, Attributes attributes, Context context) {
exemplarReservoir.offerLongMeasurement(value, attributes, context);
Expand All @@ -82,8 +118,7 @@ public final void recordLong(long value) {
* values.
*/
protected void doRecordLong(long value) {
throw new UnsupportedOperationException(
"This aggregator does not support recording long values.");
throw new UnsupportedOperationException("This aggregator does not support long values.");
}

public final void recordDouble(double value, Attributes attributes, Context context) {
Expand All @@ -108,8 +143,7 @@ public final void recordDouble(double value) {
* double values.
*/
protected void doRecordDouble(double value) {
throw new UnsupportedOperationException(
"This aggregator does not support recording double values.");
throw new UnsupportedOperationException("This aggregator does not support double values.");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
* at any time.
*/
public final class DoubleBase2ExponentialHistogramAggregator
implements Aggregator<ExponentialHistogramPointData, DoubleExemplarData> {
implements Aggregator<ExponentialHistogramPointData> {

private final Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier;
private final Supplier<ExemplarReservoir> reservoirSupplier;
private final int maxBuckets;
private final int maxScale;
private final MemoryMode memoryMode;
Expand All @@ -48,7 +48,7 @@ public final class DoubleBase2ExponentialHistogramAggregator
* @param reservoirSupplier Supplier of exemplar reservoirs per-stream.
*/
public DoubleBase2ExponentialHistogramAggregator(
Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier,
Supplier<ExemplarReservoir> reservoirSupplier,
int maxBuckets,
int maxScale,
MemoryMode memoryMode) {
Expand All @@ -59,7 +59,7 @@ public DoubleBase2ExponentialHistogramAggregator(
}

@Override
public AggregatorHandle<ExponentialHistogramPointData, DoubleExemplarData> createHandle() {
public AggregatorHandle<ExponentialHistogramPointData> createHandle() {
return new Handle(reservoirSupplier.get(), maxBuckets, maxScale, memoryMode);
}

Expand All @@ -79,8 +79,7 @@ public MetricData toMetricData(
ImmutableExponentialHistogramData.create(temporality, points));
}

static final class Handle
extends AggregatorHandle<ExponentialHistogramPointData, DoubleExemplarData> {
static final class Handle extends AggregatorHandle<ExponentialHistogramPointData> {
private final int maxBuckets;
private final int maxScale;
@Nullable private DoubleBase2ExponentialHistogramBuckets positiveBuckets;
Expand All @@ -96,11 +95,7 @@ static final class Handle
// Used only when MemoryMode = REUSABLE_DATA
@Nullable private final MutableExponentialHistogramPointData reusablePoint;

Handle(
ExemplarReservoir<DoubleExemplarData> reservoir,
int maxBuckets,
int maxScale,
MemoryMode memoryMode) {
Handle(ExemplarReservoir reservoir, int maxBuckets, int maxScale, MemoryMode memoryMode) {
super(reservoir);
this.maxBuckets = maxBuckets;
this.maxScale = maxScale;
Expand All @@ -118,7 +113,7 @@ static final class Handle
}

@Override
protected synchronized ExponentialHistogramPointData doAggregateThenMaybeReset(
protected synchronized ExponentialHistogramPointData doAggregateThenMaybeResetDoubles(
long startEpochNanos,
long epochNanos,
Attributes attributes,
Expand Down Expand Up @@ -261,6 +256,11 @@ protected synchronized void doRecordDouble(double value) {
}
}

@Override
protected boolean isDoubleType() {
return true;
}

@Override
protected void doRecordLong(long value) {
doRecordDouble((double) value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@
* at any time.
*/
public final class DoubleExplicitBucketHistogramAggregator
implements Aggregator<HistogramPointData, DoubleExemplarData> {
implements Aggregator<HistogramPointData> {
private final double[] boundaries;
private final MemoryMode memoryMode;

// a cache for converting to MetricData
private final List<Double> boundaryList;

private final Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier;
private final Supplier<ExemplarReservoir> reservoirSupplier;

/**
* Constructs an explicit bucket histogram aggregator.
Expand All @@ -53,9 +53,7 @@ public final class DoubleExplicitBucketHistogramAggregator
* @param memoryMode The {@link MemoryMode} to use in this aggregator.
*/
public DoubleExplicitBucketHistogramAggregator(
double[] boundaries,
Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier,
MemoryMode memoryMode) {
double[] boundaries, Supplier<ExemplarReservoir> reservoirSupplier, MemoryMode memoryMode) {
this.boundaries = boundaries;
this.memoryMode = memoryMode;

Expand All @@ -68,7 +66,7 @@ public DoubleExplicitBucketHistogramAggregator(
}

@Override
public AggregatorHandle<HistogramPointData, DoubleExemplarData> createHandle() {
public AggregatorHandle<HistogramPointData> createHandle() {
return new Handle(this.boundaryList, this.boundaries, reservoirSupplier.get(), memoryMode);
}

Expand All @@ -88,7 +86,7 @@ public MetricData toMetricData(
ImmutableHistogramData.create(temporality, pointData));
}

static final class Handle extends AggregatorHandle<HistogramPointData, DoubleExemplarData> {
static final class Handle extends AggregatorHandle<HistogramPointData> {
// read-only
private final List<Double> boundaryList;
// read-only
Expand Down Expand Up @@ -117,7 +115,7 @@ static final class Handle extends AggregatorHandle<HistogramPointData, DoubleExe
Handle(
List<Double> boundaryList,
double[] boundaries,
ExemplarReservoir<DoubleExemplarData> reservoir,
ExemplarReservoir reservoir,
MemoryMode memoryMode) {
super(reservoir);
this.boundaryList = boundaryList;
Expand All @@ -133,7 +131,12 @@ static final class Handle extends AggregatorHandle<HistogramPointData, DoubleExe
}

@Override
protected HistogramPointData doAggregateThenMaybeReset(
protected boolean isDoubleType() {
return true;
}

@Override
protected HistogramPointData doAggregateThenMaybeResetDoubles(
long startEpochNanos,
long epochNanos,
Attributes attributes,
Expand Down
Loading
Loading