diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleGroupingAggregatorFunction.java index 298317d0d7fd2..7d70e948bc72b 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleGroupingAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateDoubleGroupingAggregatorFunction.java @@ -36,6 +36,13 @@ public final class RateDoubleGroupingAggregatorFunction implements GroupingAggregatorFunction { public static final class FunctionSupplier implements AggregatorFunctionSupplier { + // Overriding constructor to support isRateOverTime flag + private final boolean isRateOverTime; + + public FunctionSupplier(boolean isRateOverTime) { + this.isRateOverTime = isRateOverTime; + } + @Override public List nonGroupingIntermediateStateDesc() { throw new UnsupportedOperationException("non-grouping aggregator is not supported"); @@ -53,7 +60,7 @@ public AggregatorFunction aggregator(DriverContext driverContext, List @Override public RateDoubleGroupingAggregatorFunction groupingAggregator(DriverContext driverContext, List channels) { - return new RateDoubleGroupingAggregatorFunction(channels, driverContext); + return new RateDoubleGroupingAggregatorFunction(channels, driverContext, isRateOverTime); } @Override @@ -74,11 +81,13 @@ public String describe() { private final DriverContext driverContext; private final BigArrays bigArrays; private ObjectArray reducedStates; + private final boolean isRateOverTime; - public RateDoubleGroupingAggregatorFunction(List channels, DriverContext driverContext) { + public RateDoubleGroupingAggregatorFunction(List channels, DriverContext driverContext, boolean isRateOverTime) { this.channels = channels; this.driverContext = driverContext; this.bigArrays = driverContext.bigArrays(); + this.isRateOverTime = isRateOverTime; ObjectArray buffers = driverContext.bigArrays().newObjectArray(256); try { this.reducedStates = driverContext.bigArrays().newObjectArray(256); @@ -550,7 +559,7 @@ public final void evaluateFinal(Block[] blocks, int offset, IntVector selected, } final double rate; if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { - rate = extrapolateRate(state, tsContext.rangeStartInMillis(group), tsContext.rangeEndInMillis(group)); + rate = extrapolateRate(state, tsContext.rangeStartInMillis(group), tsContext.rangeEndInMillis(group), isRateOverTime); } else { rate = computeRateWithoutExtrapolate(state); } @@ -666,7 +675,7 @@ private static double computeRateWithoutExtrapolate(ReducedState state) { * We still extrapolate the rate in this case, but not all the way to the boundary, only by half of the average duration between * samples (which is our guess for where the series actually starts or ends). */ - private static double extrapolateRate(ReducedState state, long rangeStart, long rangeEnd) { + private static double extrapolateRate(ReducedState state, long rangeStart, long rangeEnd, boolean isRateOverTime) { final int len = state.timestamps.length; assert len >= 2 : "rate requires at least two samples; got " + len; final long firstTS = state.timestamps[state.timestamps.length - 1]; @@ -696,7 +705,11 @@ private static double extrapolateRate(ReducedState state, long rangeStart, long } lastValue = lastValue + endGap * slope; } - return (lastValue - firstValue) * 1000.0 / (rangeEnd - rangeStart); + if (isRateOverTime) { + return (lastValue - firstValue) * 1000.0 / (rangeEnd - rangeStart); + } else { + return lastValue - firstValue; + } } // TODO: copied from old rate - simplify this or explain why we need it? diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntGroupingAggregatorFunction.java index ccd355b101ce3..38161a2ba7bd1 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntGroupingAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateIntGroupingAggregatorFunction.java @@ -36,6 +36,13 @@ public final class RateIntGroupingAggregatorFunction implements GroupingAggregatorFunction { public static final class FunctionSupplier implements AggregatorFunctionSupplier { + // Overriding constructor to support isRateOverTime flag + private final boolean isRateOverTime; + + public FunctionSupplier(boolean isRateOverTime) { + this.isRateOverTime = isRateOverTime; + } + @Override public List nonGroupingIntermediateStateDesc() { throw new UnsupportedOperationException("non-grouping aggregator is not supported"); @@ -53,7 +60,7 @@ public AggregatorFunction aggregator(DriverContext driverContext, List @Override public RateIntGroupingAggregatorFunction groupingAggregator(DriverContext driverContext, List channels) { - return new RateIntGroupingAggregatorFunction(channels, driverContext); + return new RateIntGroupingAggregatorFunction(channels, driverContext, isRateOverTime); } @Override @@ -74,11 +81,13 @@ public String describe() { private final DriverContext driverContext; private final BigArrays bigArrays; private ObjectArray reducedStates; + private final boolean isRateOverTime; - public RateIntGroupingAggregatorFunction(List channels, DriverContext driverContext) { + public RateIntGroupingAggregatorFunction(List channels, DriverContext driverContext, boolean isRateOverTime) { this.channels = channels; this.driverContext = driverContext; this.bigArrays = driverContext.bigArrays(); + this.isRateOverTime = isRateOverTime; ObjectArray buffers = driverContext.bigArrays().newObjectArray(256); try { this.reducedStates = driverContext.bigArrays().newObjectArray(256); @@ -550,7 +559,7 @@ public final void evaluateFinal(Block[] blocks, int offset, IntVector selected, } final double rate; if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { - rate = extrapolateRate(state, tsContext.rangeStartInMillis(group), tsContext.rangeEndInMillis(group)); + rate = extrapolateRate(state, tsContext.rangeStartInMillis(group), tsContext.rangeEndInMillis(group), isRateOverTime); } else { rate = computeRateWithoutExtrapolate(state); } @@ -666,7 +675,7 @@ private static double computeRateWithoutExtrapolate(ReducedState state) { * We still extrapolate the rate in this case, but not all the way to the boundary, only by half of the average duration between * samples (which is our guess for where the series actually starts or ends). */ - private static double extrapolateRate(ReducedState state, long rangeStart, long rangeEnd) { + private static double extrapolateRate(ReducedState state, long rangeStart, long rangeEnd, boolean isRateOverTime) { final int len = state.timestamps.length; assert len >= 2 : "rate requires at least two samples; got " + len; final long firstTS = state.timestamps[state.timestamps.length - 1]; @@ -696,7 +705,11 @@ private static double extrapolateRate(ReducedState state, long rangeStart, long } lastValue = lastValue + endGap * slope; } - return (lastValue - firstValue) * 1000.0 / (rangeEnd - rangeStart); + if (isRateOverTime) { + return (lastValue - firstValue) * 1000.0 / (rangeEnd - rangeStart); + } else { + return lastValue - firstValue; + } } // TODO: copied from old rate - simplify this or explain why we need it? diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongGroupingAggregatorFunction.java index d14d5c75534c4..acf72ae471afe 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongGroupingAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/RateLongGroupingAggregatorFunction.java @@ -36,6 +36,13 @@ public final class RateLongGroupingAggregatorFunction implements GroupingAggregatorFunction { public static final class FunctionSupplier implements AggregatorFunctionSupplier { + // Overriding constructor to support isRateOverTime flag + private final boolean isRateOverTime; + + public FunctionSupplier(boolean isRateOverTime) { + this.isRateOverTime = isRateOverTime; + } + @Override public List nonGroupingIntermediateStateDesc() { throw new UnsupportedOperationException("non-grouping aggregator is not supported"); @@ -53,7 +60,7 @@ public AggregatorFunction aggregator(DriverContext driverContext, List @Override public RateLongGroupingAggregatorFunction groupingAggregator(DriverContext driverContext, List channels) { - return new RateLongGroupingAggregatorFunction(channels, driverContext); + return new RateLongGroupingAggregatorFunction(channels, driverContext, isRateOverTime); } @Override @@ -74,11 +81,13 @@ public String describe() { private final DriverContext driverContext; private final BigArrays bigArrays; private ObjectArray reducedStates; + private final boolean isRateOverTime; - public RateLongGroupingAggregatorFunction(List channels, DriverContext driverContext) { + public RateLongGroupingAggregatorFunction(List channels, DriverContext driverContext, boolean isRateOverTime) { this.channels = channels; this.driverContext = driverContext; this.bigArrays = driverContext.bigArrays(); + this.isRateOverTime = isRateOverTime; ObjectArray buffers = driverContext.bigArrays().newObjectArray(256); try { this.reducedStates = driverContext.bigArrays().newObjectArray(256); @@ -550,7 +559,7 @@ public final void evaluateFinal(Block[] blocks, int offset, IntVector selected, } final double rate; if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { - rate = extrapolateRate(state, tsContext.rangeStartInMillis(group), tsContext.rangeEndInMillis(group)); + rate = extrapolateRate(state, tsContext.rangeStartInMillis(group), tsContext.rangeEndInMillis(group), isRateOverTime); } else { rate = computeRateWithoutExtrapolate(state); } @@ -666,7 +675,7 @@ private static double computeRateWithoutExtrapolate(ReducedState state) { * We still extrapolate the rate in this case, but not all the way to the boundary, only by half of the average duration between * samples (which is our guess for where the series actually starts or ends). */ - private static double extrapolateRate(ReducedState state, long rangeStart, long rangeEnd) { + private static double extrapolateRate(ReducedState state, long rangeStart, long rangeEnd, boolean isRateOverTime) { final int len = state.timestamps.length; assert len >= 2 : "rate requires at least two samples; got " + len; final long firstTS = state.timestamps[state.timestamps.length - 1]; @@ -696,7 +705,11 @@ private static double extrapolateRate(ReducedState state, long rangeStart, long } lastValue = lastValue + endGap * slope; } - return (lastValue - firstValue) * 1000.0 / (rangeEnd - rangeStart); + if (isRateOverTime) { + return (lastValue - firstValue) * 1000.0 / (rangeEnd - rangeStart); + } else { + return lastValue - firstValue; + } } // TODO: copied from old rate - simplify this or explain why we need it? diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateGroupingAggregatorFunction.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateGroupingAggregatorFunction.java.st index 0a9e50b7758a9..2cba6a241aec3 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateGroupingAggregatorFunction.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-RateGroupingAggregatorFunction.java.st @@ -36,6 +36,12 @@ import java.util.List; public final class Rate$Type$GroupingAggregatorFunction implements GroupingAggregatorFunction { public static final class FunctionSupplier implements AggregatorFunctionSupplier { + // Overriding constructor to support isRateOverTime flag + private final boolean isRateOverTime; + public FunctionSupplier(boolean isRateOverTime) { + this.isRateOverTime = isRateOverTime; + } + @Override public List nonGroupingIntermediateStateDesc() { throw new UnsupportedOperationException("non-grouping aggregator is not supported"); @@ -53,7 +59,7 @@ public final class Rate$Type$GroupingAggregatorFunction implements GroupingAggre @Override public Rate$Type$GroupingAggregatorFunction groupingAggregator(DriverContext driverContext, List channels) { - return new Rate$Type$GroupingAggregatorFunction(channels, driverContext); + return new Rate$Type$GroupingAggregatorFunction(channels, driverContext, isRateOverTime); } @Override @@ -74,11 +80,12 @@ public final class Rate$Type$GroupingAggregatorFunction implements GroupingAggre private final DriverContext driverContext; private final BigArrays bigArrays; private ObjectArray reducedStates; - - public Rate$Type$GroupingAggregatorFunction(List channels, DriverContext driverContext) { + private final boolean isRateOverTime; + public Rate$Type$GroupingAggregatorFunction(List channels, DriverContext driverContext, boolean isRateOverTime) { this.channels = channels; this.driverContext = driverContext; this.bigArrays = driverContext.bigArrays(); + this.isRateOverTime = isRateOverTime; ObjectArray buffers = driverContext.bigArrays().newObjectArray(256); try { this.reducedStates = driverContext.bigArrays().newObjectArray(256); @@ -550,7 +557,7 @@ public final class Rate$Type$GroupingAggregatorFunction implements GroupingAggre } final double rate; if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) { - rate = extrapolateRate(state, tsContext.rangeStartInMillis(group), tsContext.rangeEndInMillis(group)); + rate = extrapolateRate(state, tsContext.rangeStartInMillis(group), tsContext.rangeEndInMillis(group), isRateOverTime); } else { rate = computeRateWithoutExtrapolate(state); } @@ -666,7 +673,7 @@ public final class Rate$Type$GroupingAggregatorFunction implements GroupingAggre * We still extrapolate the rate in this case, but not all the way to the boundary, only by half of the average duration between * samples (which is our guess for where the series actually starts or ends). */ - private static double extrapolateRate(ReducedState state, long rangeStart, long rangeEnd) { + private static double extrapolateRate(ReducedState state, long rangeStart, long rangeEnd, boolean isRateOverTime) { final int len = state.timestamps.length; assert len >= 2 : "rate requires at least two samples; got " + len; final long firstTS = state.timestamps[state.timestamps.length - 1]; @@ -696,7 +703,11 @@ public final class Rate$Type$GroupingAggregatorFunction implements GroupingAggre } lastValue = lastValue + endGap * slope; } - return (lastValue - firstValue) * 1000.0 / (rangeEnd - rangeStart); + if (isRateOverTime) { + return (lastValue - firstValue) * 1000.0 / (rangeEnd - rangeStart); + } else { + return lastValue - firstValue; + } } // TODO: copied from old rate - simplify this or explain why we need it? diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-increase.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-increase.csv-spec new file mode 100644 index 0000000000000..c31307d66e43e --- /dev/null +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-increase.csv-spec @@ -0,0 +1,225 @@ +rate_of_long_no_grouping +required_capability: metrics_command +TS k8s +| STATS rate_bytes_in=avg(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,1minute) +| SORT rate_bytes_in DESC, time_bucket DESC | LIMIT 10; + +rate_bytes_in:double | time_bucket:datetime +null | 2024-05-10T00:01:00.000Z +28.80297619047619 | 2024-05-10T00:15:00.000Z +25.422222222222224 | 2024-05-10T00:04:00.000Z +24.28946078431372 | 2024-05-10T00:19:00.000Z +24.05555555555555 | 2024-05-10T00:10:00.000Z +22.546794871794873 | 2024-05-10T00:13:00.000Z +21.039819662685943 | 2024-05-10T00:17:00.000Z +20.420667304320105 | 2024-05-10T00:18:00.000Z +19.745833333333334 | 2024-05-10T00:00:00.000Z +19.4 | 2024-05-10T00:14:00.000Z +; + +rate_of_long_grouping +required_capability: metrics_command +TS k8s +| STATS rate_bytes_in=avg(rate(network.total_bytes_in)) BY cluster, time_bucket = bucket(@timestamp,5minute) +| SORT rate_bytes_in DESC, time_bucket, cluster | LIMIT 10; + +rate_bytes_in:double | cluster:keyword | time_bucket:datetime +15.025267167998313 | qa | 2024-05-10T00:15:00.000Z +13.638384356589611 | qa | 2024-05-10T00:05:00.000Z +11.761724575728252 | prod | 2024-05-10T00:15:00.000Z +7.453275209904956 | qa | 2024-05-10T00:10:00.000Z +7.307225056633641 | staging | 2024-05-10T00:05:00.000Z +7.203958127639015 | prod | 2024-05-10T00:10:00.000Z +6.34494062999877 | staging | 2024-05-10T00:10:00.000Z +5.700488689624205 | prod | 2024-05-10T00:20:00.000Z +5.4539153439153445 | prod | 2024-05-10T00:00:00.000Z +5.241187469367376 | staging | 2024-05-10T00:00:00.000Z + +; + +rate_of_double_no_grouping +required_capability: metrics_command +TS k8s +| STATS rate_cost=sum(rate(network.total_cost)) BY time_bucket = bucket(@timestamp,1minute) +| SORT rate_cost DESC, time_bucket | LIMIT 10; + +rate_cost:double | time_bucket:datetime +null | 2024-05-10T00:01:00.000Z +1.7945690883190881 | 2024-05-10T00:09:00.000Z +1.0657479136112855 | 2024-05-10T00:17:00.000Z +0.7229258685181391 | 2024-05-10T00:18:00.000Z +0.696875 | 2024-05-10T00:13:00.000Z +0.6763888888888887 | 2024-05-10T00:08:00.000Z +0.6425595238095237 | 2024-05-10T00:06:00.000Z +0.5818946678321678 | 2024-05-10T00:20:00.000Z +0.5125 | 2024-05-10T00:11:00.000Z +0.500297619047619 | 2024-05-10T00:15:00.000Z + +; + +rate_with_filtering +required_capability: metrics_command +TS k8s | WHERE pod == "one" +| STATS rate_bytes_in = sum(rate(network.total_bytes_in)) BY cluster, time_bucket = bucket(@timestamp, 10minute) +| SORT time_bucket, cluster | LIMIT 10; + +rate_bytes_in:double | cluster:keyword | time_bucket:datetime +4.0314581958195825 | prod | 2024-05-10T00:00:00.000Z +9.955833333333333 | qa | 2024-05-10T00:00:00.000Z +4.242445473251029 | staging | 2024-05-10T00:00:00.000Z +11.188380281690138 | prod | 2024-05-10T00:10:00.000Z +12.222592592592592 | qa | 2024-05-10T00:10:00.000Z +3.050371490280777 | staging | 2024-05-10T00:10:00.000Z +2.210158359293873 | prod | 2024-05-10T00:20:00.000Z +0.8955555555555565 | qa | 2024-05-10T00:20:00.000Z +0.595 | staging | 2024-05-10T00:20:00.000Z +; + +rate_with_inline_filtering +required_capability: metrics_command +TS k8s +| STATS rate_bytes_in = sum(rate(network.total_bytes_in)) WHERE pod == "one" BY cluster, time_bucket = bucket(@timestamp, 10minute) +| SORT time_bucket, cluster | LIMIT 10; + +rate_bytes_in:double | cluster:keyword | time_bucket:datetime +4.0314581958195825 | prod | 2024-05-10T00:00:00.000Z +9.955833333333333 | qa | 2024-05-10T00:00:00.000Z +4.242445473251029 | staging | 2024-05-10T00:00:00.000Z +11.188380281690138 | prod | 2024-05-10T00:10:00.000Z +12.222592592592592 | qa | 2024-05-10T00:10:00.000Z +3.050371490280777 | staging | 2024-05-10T00:10:00.000Z +2.210158359293873 | prod | 2024-05-10T00:20:00.000Z +0.8955555555555565 | qa | 2024-05-10T00:20:00.000Z +0.595 | staging | 2024-05-10T00:20:00.000Z +; + +eval_on_rate +required_capability: metrics_command +TS k8s +| STATS rate_bytes = avg(rate(network.total_bytes_in)) BY cluster, time_bucket = bucket(@timestamp, 10minute) +| EVAL rate_kb = rate_bytes / 1024.0 +| LIMIT 10 | SORT time_bucket, cluster ; + +rate_bytes:double | cluster:keyword | time_bucket:datetime | rate_kb:double +4.489054151267086 | prod | 2024-05-10T00:00:00.000Z | 0.004383841944596764 +9.933630806480615 | qa | 2024-05-10T00:00:00.000Z | 0.009700811334453725 +6.2165285757677635 | staging | 2024-05-10T00:00:00.000Z | 0.0060708286872732066 +9.435761829894686 | prod | 2024-05-10T00:10:00.000Z | 0.009214611162006529 +11.21976172297114 | qa | 2024-05-10T00:10:00.000Z | 0.010956798557589005 +5.861581261551023 | staging | 2024-05-10T00:10:00.000Z | 0.005724200450733421 +2.8502443448121024 | prod | 2024-05-10T00:20:00.000Z | 0.0027834417429805688 +1.4602777777777782 | qa | 2024-05-10T00:20:00.000Z | 0.0014260525173611116 +1.1196474788041055 | staging | 2024-05-10T00:20:00.000Z | 0.0010934057410196343 + + +; + +rate_of_aggregate_metric +required_capability: metrics_command +TS k8s-downsampled +| STATS sum_bytes = sum(rate(network.total_bytes_in)), + max_bytes = max(rate(network.total_bytes_in)), + min_bytes = min(rate(network.total_bytes_in)), + avg_bytes = avg(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp, 30minute) +| SORT time_bucket | LIMIT 10; + +sum_bytes:double | max_bytes:double | min_bytes:double | avg_bytes:double | time_bucket:datetime +0.7751851851851852 | 0.17694444444444443 | 0.025925925925925936 | 0.08613168724279835 | 2024-05-09T23:30:00.000Z + +; + +rate_of_expression +required_capability: metrics_command +TS k8s +| STATS rate_bytes_in=avg(rate(network.total_bytes_in) + 10) BY time_bucket = bucket(@timestamp,1minute) +| SORT rate_bytes_in DESC, time_bucket DESC | LIMIT 10; + +rate_bytes_in:double | time_bucket:datetime +null | 2024-05-10T00:01:00.000Z +38.80297619047619 | 2024-05-10T00:15:00.000Z +35.422222222222224 | 2024-05-10T00:04:00.000Z +34.28946078431372 | 2024-05-10T00:19:00.000Z +34.05555555555555 | 2024-05-10T00:10:00.000Z +32.54679487179487 | 2024-05-10T00:13:00.000Z +31.039819662685943 | 2024-05-10T00:17:00.000Z +30.4206673043201 | 2024-05-10T00:18:00.000Z +29.745833333333334 | 2024-05-10T00:00:00.000Z +29.4 | 2024-05-10T00:14:00.000Z +; + +rate_combined_avg +required_capability: metrics_command +TS k8s +| STATS avg_rate_bytes = avg(rate(network.total_bytes_in)), avg_rate_cost = avg(rate(network.total_cost)) BY cluster, time_bucket = bucket(@timestamp, 10minute) +| EVAL ratio = avg_rate_bytes / avg_rate_cost +| SORT time_bucket, cluster | LIMIT 10; + +avg_rate_bytes:double | avg_rate_cost:double | cluster:keyword | time_bucket:datetime | ratio:double +4.489054151267086 | 0.08727848310006986 | prod | 2024-05-10T00:00:00.000Z | 51.43368665241494 +9.933630806480615 | 0.1480786773651426 | qa | 2024-05-10T00:00:00.000Z | 67.08346524453067 +6.2165285757677635 | 0.08125606293033183 | staging | 2024-05-10T00:00:00.000Z | 76.50541204657867 +9.435761829894686 | 0.09789990135274192 | prod | 2024-05-10T00:10:00.000Z | 96.38172970059296 +11.21976172297114 | 0.13555566451825743 | qa | 2024-05-10T00:10:00.000Z | 82.76866749054221 +5.861581261551023 | 0.06909345577029162 | staging | 2024-05-10T00:10:00.000Z | 84.8355491298403 +2.8502443448121024 | 0.03328125 | prod | 2024-05-10T00:20:00.000Z | 85.6411446328519 +1.4602777777777782 | 0.03146701388888888 | qa | 2024-05-10T00:20:00.000Z | 46.4066206896552 +1.1196474788041055 | 0.026953703703703702 | staging | 2024-05-10T00:20:00.000Z | 41.53965225381086 +; + +rate_combined_sum +required_capability: metrics_command +TS k8s +| STATS sum_rate_bytes = sum(rate(network.total_bytes_in)), sum_rate_cost = sum(rate(network.total_cost)) BY cluster, time_bucket = bucket(@timestamp, 10minute) +| EVAL ratio = sum_rate_bytes / sum_rate_cost +| SORT time_bucket, cluster | LIMIT 10; + +sum_rate_bytes:double | sum_rate_cost:double | cluster:keyword | time_bucket:datetime | ratio:double +13.46716245380126 | 0.2618354493002096 | prod | 2024-05-10T00:00:00.000Z | 51.43368665241495 +29.800892419441844 | 0.44423603209542784 | qa | 2024-05-10T00:00:00.000Z | 67.08346524453067 +18.64958572730329 | 0.2437681887909955 | staging | 2024-05-10T00:00:00.000Z | 76.50541204657867 +28.307285489684055 | 0.29369970405822576 | prod | 2024-05-10T00:10:00.000Z | 96.38172970059294 +33.65928516891342 | 0.4066669935547723 | qa | 2024-05-10T00:10:00.000Z | 82.76866749054221 +17.584743784653067 | 0.20728036731087485 | staging | 2024-05-10T00:10:00.000Z | 84.8355491298403 +5.700488689624205 | 0.0665625 | prod | 2024-05-10T00:20:00.000Z | 85.6411446328519 +2.9205555555555565 | 0.06293402777777776 | qa | 2024-05-10T00:20:00.000Z | 46.4066206896552 +3.358942436412317 | 0.0808611111111111 | staging | 2024-05-10T00:20:00.000Z | 41.53965225381086 +; + +rate_of_ratio +required_capability: metrics_command +TS k8s +| STATS rate_of_ratio = sum(rate(network.total_cost) / rate(network.total_bytes_in)) BY cluster, time_bucket = bucket(@timestamp, 10minute) +| SORT time_bucket, cluster | LIMIT 10; + +rate_of_ratio:double | cluster:keyword | time_bucket:datetime +0.05729672905133547 | prod | 2024-05-10T00:00:00.000Z +0.04435305398953446 | qa | 2024-05-10T00:00:00.000Z +0.041268397742417876 | staging | 2024-05-10T00:00:00.000Z +0.030827104158756384 | prod | 2024-05-10T00:10:00.000Z +0.03694112891727294 | qa | 2024-05-10T00:10:00.000Z +0.03371556294613671 | staging | 2024-05-10T00:10:00.000Z +0.02425073340627254 | prod | 2024-05-10T00:20:00.000Z +0.05616345770915657 | qa | 2024-05-10T00:20:00.000Z +0.06756155922294359 | staging | 2024-05-10T00:20:00.000Z +; + +rate_of_long_grouping_1min_nulls +required_capability: metrics_command +TS k8s +| STATS rate_bytes_in=avg(rate(network.total_bytes_in)) BY cluster, time_bucket = bucket(@timestamp,2minute) +| SORT rate_bytes_in NULLS FIRST, time_bucket, cluster | LIMIT 10; + + rate_bytes_in:double | cluster:keyword | time_bucket:datetime + null | qa | 2024-05-10T00:00:00.000Z + null | staging | 2024-05-10T00:00:00.000Z + null | prod | 2024-05-10T00:06:00.000Z + null | prod | 2024-05-10T00:10:00.000Z + null | staging | 2024-05-10T00:10:00.000Z + null | qa | 2024-05-10T00:22:00.000Z + 0.0371323529411787 | staging | 2024-05-10T00:22:00.000Z + 0.42291666666666666 | qa | 2024-05-10T00:12:00.000Z + 2.27097222222222 | qa | 2024-05-10T00:20:00.000Z + 4.084868421052632 | staging | 2024-05-10T00:14:00.000Z + +; + diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java index e7913c90c99ff..3c1844becc440 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java @@ -56,8 +56,8 @@ @SuppressWarnings("unchecked") @ESIntegTestCase.ClusterScope(maxNumDataNodes = 1) public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase { - private static final Long NUM_DOCS = 2000L; - private static final Long TIME_RANGE_SECONDS = 3600L; + private static final Long NUM_DOCS = 200L; + private static final Long TIME_RANGE_SECONDS = 360L; private static final String DATASTREAM_NAME = "tsit_ds"; private static final Integer SECONDS_IN_WINDOW = 60; private static final List> WINDOW_OPTIONS = List.of( @@ -74,6 +74,7 @@ public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase { private static final List> DELTA_AGG_OPTIONS = List.of( Tuple.tuple("rate", DeltaAgg.RATE), Tuple.tuple("irate", DeltaAgg.IRATE), + Tuple.tuple("increase", DeltaAgg.INCREASE), Tuple.tuple("idelta", DeltaAgg.IDELTA) ); private static final Map DELTA_AGG_METRIC_MAP = Map.of( @@ -82,7 +83,9 @@ public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase { DeltaAgg.IRATE, "counterl_hdd.bytes.read", DeltaAgg.IDELTA, - "gaugel_hdd.bytes.used" + "gaugel_hdd.bytes.used", + DeltaAgg.INCREASE, + "counterl_hdd.bytes.read" ); private List documents; @@ -273,6 +276,7 @@ enum DeltaAgg { RATE, IRATE, IDELTA, + INCREASE } // A record that holds min, max, avg, count and sum of rates calculated from a timeseries. @@ -308,9 +312,9 @@ static RateStats calculateDeltaAggregation( return new RateRange(idelta * 0.999, idelta * 1.001); // Add 0.1% tolerance } } - assert deltaAgg == DeltaAgg.RATE; + assert deltaAgg == DeltaAgg.RATE || deltaAgg == DeltaAgg.INCREASE; Double lastValue = null; - Double counterGrowth = 0.0; + double counterGrowth = 0.0; for (Tuple> point : timeseries) { var currentValue = point.v2().v2(); if (currentValue == null) { @@ -328,10 +332,18 @@ static RateStats calculateDeltaAggregation( } lastValue = currentValue; // Update last value for next iteration } - return new RateRange( + var res = new RateRange( counterGrowth / secondsInWindow * 0.99, // Add 1% tolerance to the lower bound 1000.0 * counterGrowth / (lastTs.toEpochMilli() - firstTs.toEpochMilli()) * 1.01 // Add 1% tolerance to the upper bound ); + if (deltaAgg.equals(DeltaAgg.INCREASE)) { + return new RateRange( + res.lower * secondsInWindow, // INCREASE is RATE multiplied by the window size + res.upper * secondsInWindow + ); + } else { + return res; + } }).filter(Objects::nonNull).toList(); if (allRates.isEmpty()) { return new RateStats(0L, null, null, null, null); @@ -399,14 +411,16 @@ void checkWithin(Double actual, RateRange expected) { assertThat(actual, allOf(lessThanOrEqualTo(expected.upper), not(lessThan(expected.lower)))); } - void assertNoFailedWindows(List failedWindows, List> rows) { + void assertNoFailedWindows(List failedWindows, List> rows, String agg) { if (failedWindows.isEmpty() == false) { var pctFailures = (double) failedWindows.size() / rows.size() * 100; var failureDetails = String.join("\n", failedWindows); if (failureDetails.length() > 2000) { failureDetails = failureDetails.substring(0, 2000) + "\n... (truncated)"; } - throw new AssertionError("Failed " + failedWindows.size() + " windows(" + pctFailures + "%):\n" + failureDetails); + throw new AssertionError( + "Failed. Agg: " + agg + " | Failed windows: " + failedWindows.size() + " windows(" + pctFailures + "%):\n" + failureDetails + ); } } @@ -455,7 +469,7 @@ public void testRateGroupBySubset() { failedWindows.add("Failed for row:\n" + row + "\nWanted: " + rateAgg + "\nException: " + e.getMessage()); } } - assertNoFailedWindows(failedWindows, rows); + assertNoFailedWindows(failedWindows, rows, deltaAgg.v2().name()); } } @@ -492,7 +506,7 @@ public void testRateGroupByNothing() { failedWindows.add("Failed for row:\n" + row + "\nWanted: " + rateAgg + "\nException: " + e.getMessage()); } } - assertNoFailedWindows(failedWindows, rows); + assertNoFailedWindows(failedWindows, rows, "RATE"); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java index 13065dd91854a..2579e5e710302 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java @@ -30,6 +30,7 @@ import org.elasticsearch.xpack.esql.expression.function.aggregate.First; import org.elasticsearch.xpack.esql.expression.function.aggregate.FirstOverTime; import org.elasticsearch.xpack.esql.expression.function.aggregate.Idelta; +import org.elasticsearch.xpack.esql.expression.function.aggregate.Increase; import org.elasticsearch.xpack.esql.expression.function.aggregate.Irate; import org.elasticsearch.xpack.esql.expression.function.aggregate.Last; import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime; @@ -516,6 +517,7 @@ private static FunctionDefinition[][] snapshotFunctions() { def(Rate.class, uni(Rate::new), "rate"), def(Irate.class, uni(Irate::new), "irate"), def(Idelta.class, uni(Idelta::new), "idelta"), + def(Increase.class, uni(Increase::new), "increase"), def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"), def(MinOverTime.class, uni(MinOverTime::new), "min_over_time"), def(SumOverTime.class, uni(SumOverTime::new), "sum_over_time"), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java index 5d65453fd7c08..28ea968c7f64d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java @@ -27,6 +27,7 @@ public static List getNamedWriteables() { Rate.ENTRY, Irate.ENTRY, Idelta.ENTRY, + Increase.ENTRY, Sample.ENTRY, SpatialCentroid.ENTRY, SpatialExtent.ENTRY, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Increase.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Increase.java new file mode 100644 index 0000000000000..8cd499d57953c --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Increase.java @@ -0,0 +1,138 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function.aggregate; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.RateDoubleGroupingAggregatorFunction; +import org.elasticsearch.compute.aggregation.RateIntGroupingAggregatorFunction; +import org.elasticsearch.compute.aggregation.RateLongGroupingAggregatorFunction; +import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.FunctionAppliesTo; +import org.elasticsearch.xpack.esql.expression.function.FunctionAppliesToLifecycle; +import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; +import org.elasticsearch.xpack.esql.expression.function.FunctionType; +import org.elasticsearch.xpack.esql.expression.function.OptionalArgument; +import org.elasticsearch.xpack.esql.expression.function.Param; +import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; +import org.elasticsearch.xpack.esql.planner.ToAggregator; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.FIRST; +import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType; + +public class Increase extends TimeSeriesAggregateFunction implements OptionalArgument, ToAggregator { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Increase", Increase::new); + + private final Expression timestamp; + + @FunctionInfo( + type = FunctionType.TIME_SERIES_AGGREGATE, + returnType = { "double" }, + description = "The absolute increase of a counter field in a time window.", + appliesTo = { @FunctionAppliesTo(lifeCycle = FunctionAppliesToLifecycle.UNAVAILABLE) }, + note = "Available with the [TS](/reference/query-languages/esql/commands/source-commands.md#esql-ts) command in snapshot builds" + ) + public Increase( + Source source, + @Param(name = "field", type = { "counter_long", "counter_integer", "counter_double" }) Expression field + ) { + this(source, field, new UnresolvedAttribute(source, "@timestamp")); + } + + public Increase( + Source source, + @Param(name = "field", type = { "counter_long", "counter_integer", "counter_double" }) Expression field, + Expression timestamp + ) { + this(source, field, Literal.TRUE, timestamp); + } + + // compatibility constructor used when reading from the stream + private Increase(Source source, Expression field, Expression filter, List children) { + this(source, field, filter, children.getFirst()); + } + + private Increase(Source source, Expression field, Expression filter, Expression timestamp) { + super(source, field, filter, List.of(timestamp)); + this.timestamp = timestamp; + } + + public Increase(StreamInput in) throws IOException { + this( + Source.readFrom((PlanStreamInput) in), + in.readNamedWriteable(Expression.class), + in.readNamedWriteable(Expression.class), + in.readNamedWriteableCollectionAsList(Expression.class) + ); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, Increase::new, field(), timestamp); + } + + @Override + public Increase replaceChildren(List newChildren) { + if (newChildren.size() != 3) { + assert false : "expected 3 children for field, filter, @timestamp; got " + newChildren; + throw new IllegalArgumentException("expected 3 children for field, filter, @timestamp; got " + newChildren); + } + return new Increase(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + } + + @Override + public Increase withFilter(Expression filter) { + return new Increase(source(), field(), filter, timestamp); + } + + @Override + public DataType dataType() { + return DataType.DOUBLE; + } + + @Override + protected TypeResolution resolveType() { + return isType(field(), dt -> DataType.isCounter(dt), sourceText(), FIRST, "counter_long", "counter_integer", "counter_double"); + } + + @Override + public AggregatorFunctionSupplier supplier() { + final DataType type = field().dataType(); + return switch (type) { + case COUNTER_LONG -> new RateLongGroupingAggregatorFunction.FunctionSupplier(false); + case COUNTER_INTEGER -> new RateIntGroupingAggregatorFunction.FunctionSupplier(false); + case COUNTER_DOUBLE -> new RateDoubleGroupingAggregatorFunction.FunctionSupplier(false); + default -> throw EsqlIllegalArgumentException.illegalDataType(type); + }; + } + + @Override + public Increase perTimeSeriesAggregation() { + return this; + } + + @Override + public String toString() { + return "increase(" + field() + ")"; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Rate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Rate.java index b87cb8d40f34d..667ff1c42ebf6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Rate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Rate.java @@ -118,9 +118,9 @@ protected TypeResolution resolveType() { public AggregatorFunctionSupplier supplier() { final DataType type = field().dataType(); return switch (type) { - case COUNTER_LONG -> new RateLongGroupingAggregatorFunction.FunctionSupplier(); - case COUNTER_INTEGER -> new RateIntGroupingAggregatorFunction.FunctionSupplier(); - case COUNTER_DOUBLE -> new RateDoubleGroupingAggregatorFunction.FunctionSupplier(); + case COUNTER_LONG -> new RateLongGroupingAggregatorFunction.FunctionSupplier(true); + case COUNTER_INTEGER -> new RateIntGroupingAggregatorFunction.FunctionSupplier(true); + case COUNTER_DOUBLE -> new RateDoubleGroupingAggregatorFunction.FunctionSupplier(true); default -> throw EsqlIllegalArgumentException.illegalDataType(type); }; }