Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are necessary to enable running planner tests that load dimension and metric information from our json schema files. My earlier tests for this feature hard coded a test schema rather than load from json, so I didn't need this before, but we'll want it for many tests going forward.

Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,33 @@ private static void walkMapping(String name, Object value, Map<String, EsField>
properties = fromEs(content);
}
boolean docValues = boolSetting(content.get("doc_values"), esDataType.hasDocValues());
boolean isDimension = boolSetting(content.get("time_series_dimension"), false);
boolean isMetric = content.containsKey("time_series_metric");
if (isDimension && isMetric) {
throw new IllegalArgumentException("Field configured as both dimension and metric:" + value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should never happen here, probably IllegalStateException?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion, but I chose IllegalArgumentException because this method already throws that on line 133 for other invalid mapping states.

}
EsField.TimeSeriesFieldType tsType = EsField.TimeSeriesFieldType.NONE;
if (isDimension) {
tsType = EsField.TimeSeriesFieldType.DIMENSION;
}
if (isMetric) {
tsType = EsField.TimeSeriesFieldType.METRIC;
}
final EsField field;
if (esDataType == TEXT) {
field = new TextEsField(name, properties, docValues, false, EsField.TimeSeriesFieldType.NONE);
field = new TextEsField(name, properties, docValues, false, tsType);
} else if (esDataType == KEYWORD) {
int length = intSetting(content.get("ignore_above"), Short.MAX_VALUE);
boolean normalized = Strings.hasText(textSetting(content.get("normalizer"), null));
field = new KeywordEsField(name, properties, docValues, length, normalized, false, EsField.TimeSeriesFieldType.NONE);
field = new KeywordEsField(name, properties, docValues, length, normalized, false, tsType);
} else if (esDataType == DATETIME) {
field = DateEsField.dateEsField(name, properties, docValues, EsField.TimeSeriesFieldType.NONE);
field = DateEsField.dateEsField(name, properties, docValues, tsType);
} else if (esDataType == UNSUPPORTED) {
String type = content.get("type").toString();
field = new UnsupportedEsField(name, List.of(type), null, properties);
propagateUnsupportedType(name, type, properties);
} else {
field = new EsField(name, esDataType, properties, docValues, EsField.TimeSeriesFieldType.NONE);
field = new EsField(name, esDataType, properties, docValues, tsType);
}
mapping.put(name, field);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.xpack.esql.plan.QueryPlan;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
Expand Down Expand Up @@ -215,17 +216,17 @@ public static PhysicalPlan localPlan(
.map(x -> ((LookupJoinExec) x).right())
.collect(Collectors.toSet());

var localPhysicalPlan = plan.transformUp(FragmentExec.class, f -> {
PhysicalPlan localPhysicalPlan = plan.transformUp(FragmentExec.class, f -> {
if (lookupJoinExecRightChildren.contains(f)) {
// Do not optimize the right child of a lookup join exec
// The data node does not have the right stats to perform the optimization because the stats are on the lookup node
// Also we only ship logical plans across the network, so the plan needs to remain logical
return f;
}
isCoordPlan.set(Boolean.FALSE);
var optimizedFragment = logicalOptimizer.localOptimize(f.fragment());
var physicalFragment = localMapper.map(optimizedFragment);
var filter = f.esFilter();
LogicalPlan optimizedFragment = logicalOptimizer.localOptimize(f.fragment());
PhysicalPlan physicalFragment = localMapper.map(optimizedFragment);
QueryBuilder filter = f.esFilter();
if (filter != null) {
physicalFragment = physicalFragment.transformUp(
EsSourceExec.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
import org.elasticsearch.xpack.esql.core.tree.NodeUtils;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.session.Configuration;

import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
import static org.elasticsearch.xpack.esql.core.querydsl.query.Query.unscore;
import static org.hamcrest.Matchers.is;

/**
* Tests for the {@link org.elasticsearch.xpack.esql.optimizer.rules.logical.local.IgnoreNullMetrics} planner rule, to
* verify that the filters are being pushed to Lucene.
*/
public class IgnoreNullMetricsPhysicalPlannerTests extends LocalPhysicalPlanOptimizerTests {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inheriting from LocalPhysicalPlanOptimizerTests seems wrong, but there's no base class between that and MapperTestCase, and this is the strategy ReplaceRoundToWithQueryAndTagsTests uses. This does mean that all the (many) tests in LocalPhysicalPlanOptimizerTests will be re-run in this test class. I recommend refactoring the vaious plannerOptimizer objects into an abstract base class that all three of these classes can inherit their scaffolding from, but such a refactoring is out of scope for this PR, and I will defer to the people who frequently look at this code.

public IgnoreNullMetricsPhysicalPlannerTests(String name, Configuration config) {
super(name, config);
}

/**
* This tests that we get the same end result plan with an explicit isNotNull and the implicit one added by the rule
*/
public void testSamePhysicalPlans() {
assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled());
String testQuery = """
TS k8s
| STATS max(rate(network.total_bytes_in)) BY Bucket(@timestamp, 1 hour)
| LIMIT 10
""";
PhysicalPlan actualPlan = plannerOptimizerTimeSeries.plan(testQuery);

String controlQuery = """
TS k8s
| WHERE network.total_bytes_in IS NOT NULL
| STATS max(rate(network.total_bytes_in)) BY Bucket(@timestamp, 1 hour)
| LIMIT 10
""";
PhysicalPlan expectedPlan = plannerOptimizerTimeSeries.plan(controlQuery);

assertEquals(NodeUtils.diffString(expectedPlan, actualPlan), expectedPlan, actualPlan);
}

public void testPushdownOfSimpleQuery() {
assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled());
String query = """
TS k8s
| STATS max(rate(network.total_bytes_in)) BY Bucket(@timestamp, 1 hour)
| LIMIT 10
""";
PhysicalPlan actualPlan = plannerOptimizerTimeSeries.plan(query);
EsQueryExec queryExec = (EsQueryExec) actualPlan.collect(node -> node instanceof EsQueryExec).get(0);

QueryBuilder expected = unscore(existsQuery("network.total_bytes_in"));
assertThat(queryExec.query().toString(), is(expected.toString()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
protected TestPlannerOptimizer plannerOptimizer;
private TestPlannerOptimizer plannerOptimizerDateDateNanosUnionTypes;
private Analyzer timeSeriesAnalyzer;
protected TestPlannerOptimizer plannerOptimizerTimeSeries;
private final Configuration config;
private final SearchStats IS_SV_STATS = new TestSearchStats() {
@Override
Expand Down Expand Up @@ -240,6 +241,7 @@ public void init() {
),
TEST_VERIFIER
);
plannerOptimizerTimeSeries = new TestPlannerOptimizer(config, timeSeriesAnalyzer);
}

private Analyzer makeAnalyzer(String mappingFileName, EnrichResolution enrichResolution) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.xpack.esql.analysis.Analyzer;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.parser.EsqlParser;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
Expand Down Expand Up @@ -87,10 +88,14 @@ private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats, E
return l;
}

public LogicalPlan logicalPlan(String query) {
return logicalOptimizer.optimize(analyzer.analyze(parser.createStatement(query, EsqlTestUtils.TEST_CFG)));
}

private PhysicalPlan physicalPlan(String query, Analyzer analyzer) {
var logical = logicalOptimizer.optimize(analyzer.analyze(parser.createStatement(query, EsqlTestUtils.TEST_CFG)));
LogicalPlan logical = logicalOptimizer.optimize(analyzer.analyze(parser.createStatement(query, EsqlTestUtils.TEST_CFG)));
// System.out.println("Logical\n" + logical);
var physical = mapper.map(logical);
PhysicalPlan physical = mapper.map(logical);
return physical;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,15 @@ public void testSimple() {
assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled());
LogicalPlan actual = localPlan("""
TS test
| STATS max(max_over_time(metric_1))
| STATS max(max_over_time(metric_1)) BY BUCKET(@timestamp, 1 min)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sure we're testing this rule and not InferNonNullAggConstraint

| LIMIT 10
""");
Limit limit = as(actual, Limit.class);
Aggregate agg = as(limit.child(), Aggregate.class);
// The optimizer expands the STATS out into two STATS steps
Aggregate tsAgg = as(agg.child(), Aggregate.class);
Filter filter = as(tsAgg.child(), Filter.class);
Eval bucketEval = as(tsAgg.child(), Eval.class);
Filter filter = as(bucketEval.child(), Filter.class);
IsNotNull condition = as(filter.condition(), IsNotNull.class);
FieldAttribute attribute = as(condition.field(), FieldAttribute.class);
assertEquals("metric_1", attribute.fieldName().string());
Expand Down