diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/LoadMapping.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/LoadMapping.java index 29f2f6ed4e927..d00c8bf6c944c 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/LoadMapping.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/LoadMapping.java @@ -100,21 +100,33 @@ private static void walkMapping(String name, Object value, Map properties = fromEs(content); } boolean docValues = boolSetting(content.get("doc_values"), esDataType.hasDocValues()); + boolean isDimension = boolSetting(content.get("time_series_dimension"), false); + boolean isMetric = content.containsKey("time_series_metric"); + if (isDimension && isMetric) { + throw new IllegalStateException("Field configured as both dimension and metric:" + value); + } + EsField.TimeSeriesFieldType tsType = EsField.TimeSeriesFieldType.NONE; + if (isDimension) { + tsType = EsField.TimeSeriesFieldType.DIMENSION; + } + if (isMetric) { + tsType = EsField.TimeSeriesFieldType.METRIC; + } final EsField field; if (esDataType == TEXT) { - field = new TextEsField(name, properties, docValues, false, EsField.TimeSeriesFieldType.NONE); + field = new TextEsField(name, properties, docValues, false, tsType); } else if (esDataType == KEYWORD) { int length = intSetting(content.get("ignore_above"), Short.MAX_VALUE); boolean normalized = Strings.hasText(textSetting(content.get("normalizer"), null)); - field = new KeywordEsField(name, properties, docValues, length, normalized, false, EsField.TimeSeriesFieldType.NONE); + field = new KeywordEsField(name, properties, docValues, length, normalized, false, tsType); } else if (esDataType == DATETIME) { - field = DateEsField.dateEsField(name, properties, docValues, EsField.TimeSeriesFieldType.NONE); + field = DateEsField.dateEsField(name, properties, docValues, tsType); } else if (esDataType == UNSUPPORTED) { String type = content.get("type").toString(); field = new UnsupportedEsField(name, List.of(type), null, properties); propagateUnsupportedType(name, type, properties); } else { - field = new EsField(name, esDataType, properties, docValues, EsField.TimeSeriesFieldType.NONE); + field = new EsField(name, esDataType, properties, docValues, tsType); } mapping.put(name, field); } else { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java index d67fe8df0ff54..25042c302cece 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java @@ -610,4 +610,86 @@ public void testAvgOrSumOverTimeProfile() { } } } + + public void testNullMetricsAreSkipped() { + Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host", "cluster")).build(); + client().admin() + .indices() + .prepareCreate("sparse-hosts") + .setSettings(settings) + .setMapping( + "@timestamp", + "type=date", + "host", + "type=keyword,time_series_dimension=true", + "cluster", + "type=keyword,time_series_dimension=true", + "memory", + "type=long,time_series_metric=gauge", + "request_count", + "type=integer,time_series_metric=counter" + ) + .get(); + List sparseDocs = new ArrayList<>(); + // generate 100 docs, 50 will have a null metric + // TODO: this is all copied from populateIndex(), refactor it sensibly. + Map hostToClusters = new HashMap<>(); + for (int i = 0; i < 5; i++) { + hostToClusters.put("p" + i, randomFrom("qa", "prod")); + } + long timestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-04-15T00:00:00Z"); + int numDocs = 100; + Map requestCounts = new HashMap<>(); + for (int i = 0; i < numDocs; i++) { + String host = randomFrom(hostToClusters.keySet()); + timestamp += between(1, 10) * 1000L; + int requestCount = requestCounts.compute(host, (k, curr) -> { + if (curr == null) { + return randomIntBetween(0, 10); + } else { + return curr + randomIntBetween(1, 10); + } + }); + int cpu = randomIntBetween(0, 100); + ByteSizeValue memory = ByteSizeValue.ofBytes(randomIntBetween(1024, 1024 * 1024)); + sparseDocs.add(new Doc(host, hostToClusters.get(host), timestamp, requestCount, cpu, memory)); + } + + Randomness.shuffle(sparseDocs); + for (int i = 0; i < numDocs; i++) { + Doc doc = sparseDocs.get(i); + if (i % 2 == 0) { + client().prepareIndex("sparse-hosts") + .setSource("@timestamp", doc.timestamp, "host", doc.host, "cluster", doc.cluster, "request_count", doc.requestCount) + .get(); + } else { + client().prepareIndex("sparse-hosts") + .setSource("@timestamp", doc.timestamp, "host", doc.host, "cluster", doc.cluster, "memory", doc.memory.getBytes()) + .get(); + } + } + client().admin().indices().prepareRefresh("sparse-hosts").get(); + // Control test + try (EsqlQueryResponse resp = run(""" + TS sparse-hosts + | WHERE request_count IS NOT NULL + | STATS sum(rate(request_count)) BY cluster, host + """)) { + assertEquals("Control failed, data loading is broken", 50, resp.documentsFound()); + } + + try (EsqlQueryResponse resp = run(""" + TS sparse-hosts + | STATS sum(max_over_time(memory)) BY cluster, host + """)) { + assertEquals("Did not filter nulls on gauge type", 50, resp.documentsFound()); + } + + try (EsqlQueryResponse resp = run(""" + TS sparse-hosts + | STATS sum(rate(request_count)) BY cluster, host + """)) { + assertEquals("Did not filter nulls on counter type", 50, resp.documentsFound()); + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java index 4afb1418b2585..042500c56daf9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java @@ -196,6 +196,10 @@ public Iterator column(int columnIndex) { return ResponseValueUtils.valuesForColumn(columnIndex, columns.get(columnIndex).type(), pages); } + /** + * @return the number of "documents" we got back from lucene, as input into the compute engine. Note that in this context, we think + * of things like the result of LuceneMaxOperator as single documents. + */ public long documentsFound() { return documentsFound; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index ef4f605b22d36..8296eeede62ec 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -41,6 +41,7 @@ import org.elasticsearch.xpack.esql.plan.QueryPlan; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.Filter; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec; @@ -215,7 +216,7 @@ public static PhysicalPlan localPlan( .map(x -> ((LookupJoinExec) x).right()) .collect(Collectors.toSet()); - var localPhysicalPlan = plan.transformUp(FragmentExec.class, f -> { + PhysicalPlan localPhysicalPlan = plan.transformUp(FragmentExec.class, f -> { if (lookupJoinExecRightChildren.contains(f)) { // Do not optimize the right child of a lookup join exec // The data node does not have the right stats to perform the optimization because the stats are on the lookup node @@ -223,9 +224,9 @@ public static PhysicalPlan localPlan( return f; } isCoordPlan.set(Boolean.FALSE); - var optimizedFragment = logicalOptimizer.localOptimize(f.fragment()); - var physicalFragment = localMapper.map(optimizedFragment); - var filter = f.esFilter(); + LogicalPlan optimizedFragment = logicalOptimizer.localOptimize(f.fragment()); + PhysicalPlan physicalFragment = localMapper.map(optimizedFragment); + QueryBuilder filter = f.esFilter(); if (filter != null) { physicalFragment = physicalFragment.transformUp( EsSourceExec.class, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index 5f42d261abaa7..2771afb6ef3ca 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -225,7 +225,7 @@ private static EsField createField( List rest = fcs.subList(1, fcs.size()); DataType type = EsqlDataTypeRegistry.INSTANCE.fromEs(first.type(), first.metricType()); boolean aggregatable = first.isAggregatable(); - EsField.TimeSeriesFieldType timeSeriesFieldType = EsField.TimeSeriesFieldType.UNKNOWN; + EsField.TimeSeriesFieldType timeSeriesFieldType = EsField.TimeSeriesFieldType.fromIndexFieldCapabilities(first); if (rest.isEmpty() == false) { for (IndexFieldCapabilities fc : rest) { if (first.metricType() != fc.metricType()) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/IgnoreNullMetricsPhysicalPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/IgnoreNullMetricsPhysicalPlannerTests.java new file mode 100644 index 0000000000000..50f7d53eaa67c --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/IgnoreNullMetricsPhysicalPlannerTests.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer; + +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.xpack.esql.action.EsqlCapabilities; +import org.elasticsearch.xpack.esql.core.tree.NodeUtils; +import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.session.Configuration; + +import static org.elasticsearch.index.query.QueryBuilders.existsQuery; +import static org.elasticsearch.xpack.esql.core.querydsl.query.Query.unscore; +import static org.hamcrest.Matchers.is; + +/** + * Tests for the {@link org.elasticsearch.xpack.esql.optimizer.rules.logical.local.IgnoreNullMetrics} planner rule, to + * verify that the filters are being pushed to Lucene. + */ +public class IgnoreNullMetricsPhysicalPlannerTests extends LocalPhysicalPlanOptimizerTests { + public IgnoreNullMetricsPhysicalPlannerTests(String name, Configuration config) { + super(name, config); + } + + /** + * This tests that we get the same end result plan with an explicit isNotNull and the implicit one added by the rule + */ + public void testSamePhysicalPlans() { + assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled()); + String testQuery = """ + TS k8s + | STATS max(rate(network.total_bytes_in)) BY Bucket(@timestamp, 1 hour) + | LIMIT 10 + """; + PhysicalPlan actualPlan = plannerOptimizerTimeSeries.plan(testQuery); + + String controlQuery = """ + TS k8s + | WHERE network.total_bytes_in IS NOT NULL + | STATS max(rate(network.total_bytes_in)) BY Bucket(@timestamp, 1 hour) + | LIMIT 10 + """; + PhysicalPlan expectedPlan = plannerOptimizerTimeSeries.plan(controlQuery); + + assertEquals(NodeUtils.diffString(expectedPlan, actualPlan), expectedPlan, actualPlan); + } + + public void testPushdownOfSimpleCounterQuery() { + assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled()); + String query = """ + TS k8s + | STATS max(rate(network.total_bytes_in)) BY Bucket(@timestamp, 1 hour) + | LIMIT 10 + """; + PhysicalPlan actualPlan = plannerOptimizerTimeSeries.plan(query); + EsQueryExec queryExec = (EsQueryExec) actualPlan.collect(node -> node instanceof EsQueryExec).get(0); + + QueryBuilder expected = unscore(existsQuery("network.total_bytes_in")); + assertThat(queryExec.query().toString(), is(expected.toString())); + } + + public void testPushdownOfSimpleGagueQuery() { + assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled()); + String query = """ + TS k8s + | STATS max(max_over_time(network.eth0.tx)) BY Bucket(@timestamp, 1 hour) + | LIMIT 10 + """; + PhysicalPlan actualPlan = plannerOptimizerTimeSeries.plan(query); + EsQueryExec queryExec = (EsQueryExec) actualPlan.collect(node -> node instanceof EsQueryExec).get(0); + + QueryBuilder expected = unscore(existsQuery("network.eth0.tx")); + assertThat(queryExec.query().toString(), is(expected.toString())); + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java index eedd9aea3cbc1..affe9c26cc67b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java @@ -174,6 +174,7 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase { protected TestPlannerOptimizer plannerOptimizer; private TestPlannerOptimizer plannerOptimizerDateDateNanosUnionTypes; private Analyzer timeSeriesAnalyzer; + protected TestPlannerOptimizer plannerOptimizerTimeSeries; private final Configuration config; private final SearchStats IS_SV_STATS = new TestSearchStats() { @Override @@ -240,6 +241,7 @@ public void init() { ), TEST_VERIFIER ); + plannerOptimizerTimeSeries = new TestPlannerOptimizer(config, timeSeriesAnalyzer); } private Analyzer makeAnalyzer(String mappingFileName, EnrichResolution enrichResolution) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java index 4bd6fa6737041..0d1b4dbe73c93 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java @@ -11,6 +11,7 @@ import org.elasticsearch.xpack.esql.analysis.Analyzer; import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.parser.EsqlParser; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.PlannerUtils; @@ -88,9 +89,9 @@ private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats, E } private PhysicalPlan physicalPlan(String query, Analyzer analyzer) { - var logical = logicalOptimizer.optimize(analyzer.analyze(parser.createStatement(query, EsqlTestUtils.TEST_CFG))); + LogicalPlan logical = logicalOptimizer.optimize(analyzer.analyze(parser.createStatement(query, EsqlTestUtils.TEST_CFG))); // System.out.println("Logical\n" + logical); - var physical = mapper.map(logical); + PhysicalPlan physical = mapper.map(logical); return physical; } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/IgnoreNullMetricsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/IgnoreNullMetricsTests.java index 7b60246d8fc7a..d40cfaaf11bbb 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/IgnoreNullMetricsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/IgnoreNullMetricsTests.java @@ -119,14 +119,15 @@ public void testSimple() { assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled()); LogicalPlan actual = localPlan(""" TS test - | STATS max(max_over_time(metric_1)) + | STATS max(max_over_time(metric_1)) BY BUCKET(@timestamp, 1 min) | LIMIT 10 """); Limit limit = as(actual, Limit.class); Aggregate agg = as(limit.child(), Aggregate.class); // The optimizer expands the STATS out into two STATS steps Aggregate tsAgg = as(agg.child(), Aggregate.class); - Filter filter = as(tsAgg.child(), Filter.class); + Eval bucketEval = as(tsAgg.child(), Eval.class); + Filter filter = as(bucketEval.child(), Filter.class); IsNotNull condition = as(filter.condition(), IsNotNull.class); FieldAttribute attribute = as(condition.field(), FieldAttribute.class); assertEquals("metric_1", attribute.fieldName().string());