Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are necessary to enable running planner tests that load dimension and metric information from our json schema files. My earlier tests for this feature hard coded a test schema rather than load from json, so I didn't need this before, but we'll want it for many tests going forward.

Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,33 @@ private static void walkMapping(String name, Object value, Map<String, EsField>
properties = fromEs(content);
}
boolean docValues = boolSetting(content.get("doc_values"), esDataType.hasDocValues());
boolean isDimension = boolSetting(content.get("time_series_dimension"), false);
boolean isMetric = content.containsKey("time_series_metric");
if (isDimension && isMetric) {
throw new IllegalStateException("Field configured as both dimension and metric:" + value);
}
EsField.TimeSeriesFieldType tsType = EsField.TimeSeriesFieldType.NONE;
if (isDimension) {
tsType = EsField.TimeSeriesFieldType.DIMENSION;
}
if (isMetric) {
tsType = EsField.TimeSeriesFieldType.METRIC;
}
final EsField field;
if (esDataType == TEXT) {
field = new TextEsField(name, properties, docValues, false, EsField.TimeSeriesFieldType.NONE);
field = new TextEsField(name, properties, docValues, false, tsType);
} else if (esDataType == KEYWORD) {
int length = intSetting(content.get("ignore_above"), Short.MAX_VALUE);
boolean normalized = Strings.hasText(textSetting(content.get("normalizer"), null));
field = new KeywordEsField(name, properties, docValues, length, normalized, false, EsField.TimeSeriesFieldType.NONE);
field = new KeywordEsField(name, properties, docValues, length, normalized, false, tsType);
} else if (esDataType == DATETIME) {
field = DateEsField.dateEsField(name, properties, docValues, EsField.TimeSeriesFieldType.NONE);
field = DateEsField.dateEsField(name, properties, docValues, tsType);
} else if (esDataType == UNSUPPORTED) {
String type = content.get("type").toString();
field = new UnsupportedEsField(name, List.of(type), null, properties);
propagateUnsupportedType(name, type, properties);
} else {
field = new EsField(name, esDataType, properties, docValues, EsField.TimeSeriesFieldType.NONE);
field = new EsField(name, esDataType, properties, docValues, tsType);
}
mapping.put(name, field);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,4 +610,86 @@ public void testAvgOrSumOverTimeProfile() {
}
}
}

public void testNullMetricsAreSkipped() {
Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host", "cluster")).build();
client().admin()
.indices()
.prepareCreate("sparse-hosts")
.setSettings(settings)
.setMapping(
"@timestamp",
"type=date",
"host",
"type=keyword,time_series_dimension=true",
"cluster",
"type=keyword,time_series_dimension=true",
"memory",
"type=long,time_series_metric=gauge",
"request_count",
"type=integer,time_series_metric=counter"
)
.get();
List<Doc> sparseDocs = new ArrayList<>();
// generate 100 docs, 50 will have a null metric
// TODO: this is all copied from populateIndex(), refactor it sensibly.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't want to refactor this test today, and copying a few dozen lines of test code isn't that big a sin. I'm going to leave this TODO for someone to take on a slow Friday.

Map<String, String> hostToClusters = new HashMap<>();
for (int i = 0; i < 5; i++) {
hostToClusters.put("p" + i, randomFrom("qa", "prod"));
}
long timestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-04-15T00:00:00Z");
int numDocs = 100;
Map<String, Integer> requestCounts = new HashMap<>();
for (int i = 0; i < numDocs; i++) {
String host = randomFrom(hostToClusters.keySet());
timestamp += between(1, 10) * 1000L;
int requestCount = requestCounts.compute(host, (k, curr) -> {
if (curr == null) {
return randomIntBetween(0, 10);
} else {
return curr + randomIntBetween(1, 10);
}
});
int cpu = randomIntBetween(0, 100);
ByteSizeValue memory = ByteSizeValue.ofBytes(randomIntBetween(1024, 1024 * 1024));
sparseDocs.add(new Doc(host, hostToClusters.get(host), timestamp, requestCount, cpu, memory));
}

Randomness.shuffle(sparseDocs);
for (int i = 0; i < numDocs; i++) {
Doc doc = sparseDocs.get(i);
if (i % 2 == 0) {
client().prepareIndex("sparse-hosts")
.setSource("@timestamp", doc.timestamp, "host", doc.host, "cluster", doc.cluster, "request_count", doc.requestCount)
.get();
} else {
client().prepareIndex("sparse-hosts")
.setSource("@timestamp", doc.timestamp, "host", doc.host, "cluster", doc.cluster, "memory", doc.memory.getBytes())
.get();
}
}
client().admin().indices().prepareRefresh("sparse-hosts").get();
// Control test
try (EsqlQueryResponse resp = run("""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftover?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably worth keeping, I just forgot to uncomment it after running the debugger.

TS sparse-hosts
| WHERE request_count IS NOT NULL
| STATS sum(rate(request_count)) BY cluster, host
""")) {
assertEquals("Control failed, data loading is broken", 50, resp.documentsFound());
}

try (EsqlQueryResponse resp = run("""
TS sparse-hosts
| STATS sum(max_over_time(memory)) BY cluster, host
""")) {
assertEquals("Did not filter nulls on gauge type", 50, resp.documentsFound());
}

try (EsqlQueryResponse resp = run("""
TS sparse-hosts
| STATS sum(rate(request_count)) BY cluster, host
""")) {
assertEquals("Did not filter nulls on counter type", 50, resp.documentsFound());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ public Iterator<Object> column(int columnIndex) {
return ResponseValueUtils.valuesForColumn(columnIndex, columns.get(columnIndex).type(), pages);
}

/**
* @return the number of "documents" we got back from lucene, as input into the compute engine. Note that in this context, we think
* of things like the result of LuceneMaxOperator as single documents.
*/
public long documentsFound() {
return documentsFound;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.xpack.esql.plan.QueryPlan;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
Expand Down Expand Up @@ -215,17 +216,17 @@ public static PhysicalPlan localPlan(
.map(x -> ((LookupJoinExec) x).right())
.collect(Collectors.toSet());

var localPhysicalPlan = plan.transformUp(FragmentExec.class, f -> {
PhysicalPlan localPhysicalPlan = plan.transformUp(FragmentExec.class, f -> {
if (lookupJoinExecRightChildren.contains(f)) {
// Do not optimize the right child of a lookup join exec
// The data node does not have the right stats to perform the optimization because the stats are on the lookup node
// Also we only ship logical plans across the network, so the plan needs to remain logical
return f;
}
isCoordPlan.set(Boolean.FALSE);
var optimizedFragment = logicalOptimizer.localOptimize(f.fragment());
var physicalFragment = localMapper.map(optimizedFragment);
var filter = f.esFilter();
LogicalPlan optimizedFragment = logicalOptimizer.localOptimize(f.fragment());
PhysicalPlan physicalFragment = localMapper.map(optimizedFragment);
QueryBuilder filter = f.esFilter();
if (filter != null) {
physicalFragment = physicalFragment.transformUp(
EsSourceExec.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private static EsField createField(
List<IndexFieldCapabilities> rest = fcs.subList(1, fcs.size());
DataType type = EsqlDataTypeRegistry.INSTANCE.fromEs(first.type(), first.metricType());
boolean aggregatable = first.isAggregatable();
EsField.TimeSeriesFieldType timeSeriesFieldType = EsField.TimeSeriesFieldType.UNKNOWN;
EsField.TimeSeriesFieldType timeSeriesFieldType = EsField.TimeSeriesFieldType.fromIndexFieldCapabilities(first);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the entire bug fix. Everything else in this PR is tests and test scaffolding.

if (rest.isEmpty() == false) {
for (IndexFieldCapabilities fc : rest) {
if (first.metricType() != fc.metricType()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
import org.elasticsearch.xpack.esql.core.tree.NodeUtils;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.session.Configuration;

import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
import static org.elasticsearch.xpack.esql.core.querydsl.query.Query.unscore;
import static org.hamcrest.Matchers.is;

/**
* Tests for the {@link org.elasticsearch.xpack.esql.optimizer.rules.logical.local.IgnoreNullMetrics} planner rule, to
* verify that the filters are being pushed to Lucene.
*/
public class IgnoreNullMetricsPhysicalPlannerTests extends LocalPhysicalPlanOptimizerTests {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inheriting from LocalPhysicalPlanOptimizerTests seems wrong, but there's no base class between that and MapperTestCase, and this is the strategy ReplaceRoundToWithQueryAndTagsTests uses. This does mean that all the (many) tests in LocalPhysicalPlanOptimizerTests will be re-run in this test class. I recommend refactoring the vaious plannerOptimizer objects into an abstract base class that all three of these classes can inherit their scaffolding from, but such a refactoring is out of scope for this PR, and I will defer to the people who frequently look at this code.

public IgnoreNullMetricsPhysicalPlannerTests(String name, Configuration config) {
super(name, config);
}

/**
* This tests that we get the same end result plan with an explicit isNotNull and the implicit one added by the rule
*/
public void testSamePhysicalPlans() {
assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled());
String testQuery = """
TS k8s
| STATS max(rate(network.total_bytes_in)) BY Bucket(@timestamp, 1 hour)
| LIMIT 10
""";
PhysicalPlan actualPlan = plannerOptimizerTimeSeries.plan(testQuery);

String controlQuery = """
TS k8s
| WHERE network.total_bytes_in IS NOT NULL
| STATS max(rate(network.total_bytes_in)) BY Bucket(@timestamp, 1 hour)
| LIMIT 10
""";
PhysicalPlan expectedPlan = plannerOptimizerTimeSeries.plan(controlQuery);

assertEquals(NodeUtils.diffString(expectedPlan, actualPlan), expectedPlan, actualPlan);
}

public void testPushdownOfSimpleCounterQuery() {
assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled());
String query = """
TS k8s
| STATS max(rate(network.total_bytes_in)) BY Bucket(@timestamp, 1 hour)
| LIMIT 10
""";
PhysicalPlan actualPlan = plannerOptimizerTimeSeries.plan(query);
EsQueryExec queryExec = (EsQueryExec) actualPlan.collect(node -> node instanceof EsQueryExec).get(0);

QueryBuilder expected = unscore(existsQuery("network.total_bytes_in"));
assertThat(queryExec.query().toString(), is(expected.toString()));
}

public void testPushdownOfSimpleGagueQuery() {
assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled());
String query = """
TS k8s
| STATS max(max_over_time(network.eth0.tx)) BY Bucket(@timestamp, 1 hour)
| LIMIT 10
""";
PhysicalPlan actualPlan = plannerOptimizerTimeSeries.plan(query);
EsQueryExec queryExec = (EsQueryExec) actualPlan.collect(node -> node instanceof EsQueryExec).get(0);

QueryBuilder expected = unscore(existsQuery("network.eth0.tx"));
assertThat(queryExec.query().toString(), is(expected.toString()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
protected TestPlannerOptimizer plannerOptimizer;
private TestPlannerOptimizer plannerOptimizerDateDateNanosUnionTypes;
private Analyzer timeSeriesAnalyzer;
protected TestPlannerOptimizer plannerOptimizerTimeSeries;
private final Configuration config;
private final SearchStats IS_SV_STATS = new TestSearchStats() {
@Override
Expand Down Expand Up @@ -240,6 +241,7 @@ public void init() {
),
TEST_VERIFIER
);
plannerOptimizerTimeSeries = new TestPlannerOptimizer(config, timeSeriesAnalyzer);
}

private Analyzer makeAnalyzer(String mappingFileName, EnrichResolution enrichResolution) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.xpack.esql.analysis.Analyzer;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.parser.EsqlParser;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
Expand Down Expand Up @@ -88,9 +89,9 @@ private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats, E
}

private PhysicalPlan physicalPlan(String query, Analyzer analyzer) {
var logical = logicalOptimizer.optimize(analyzer.analyze(parser.createStatement(query, EsqlTestUtils.TEST_CFG)));
LogicalPlan logical = logicalOptimizer.optimize(analyzer.analyze(parser.createStatement(query, EsqlTestUtils.TEST_CFG)));
// System.out.println("Logical\n" + logical);
var physical = mapper.map(logical);
PhysicalPlan physical = mapper.map(logical);
return physical;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,15 @@ public void testSimple() {
assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled());
LogicalPlan actual = localPlan("""
TS test
| STATS max(max_over_time(metric_1))
| STATS max(max_over_time(metric_1)) BY BUCKET(@timestamp, 1 min)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sure we're testing this rule and not InferNonNullAggConstraint

| LIMIT 10
""");
Limit limit = as(actual, Limit.class);
Aggregate agg = as(limit.child(), Aggregate.class);
// The optimizer expands the STATS out into two STATS steps
Aggregate tsAgg = as(agg.child(), Aggregate.class);
Filter filter = as(tsAgg.child(), Filter.class);
Eval bucketEval = as(tsAgg.child(), Eval.class);
Filter filter = as(bucketEval.child(), Filter.class);
IsNotNull condition = as(filter.condition(), IsNotNull.class);
FieldAttribute attribute = as(condition.field(), FieldAttribute.class);
assertEquals("metric_1", attribute.fieldName().string());
Expand Down