Skip to content

Commit 198888f

Browse files
committed
Fix ambigious column names in substrate conversion as a result of literals having the same names
1 parent 18c5a6c commit 198888f

File tree

6 files changed

+339
-1
lines changed

6 files changed

+339
-1
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/substrait/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ prost = { workspace = true }
4242
substrait = { version = "0.58", features = ["serde"] }
4343
url = { workspace = true }
4444
tokio = { workspace = true, features = ["fs"] }
45+
uuid = { version = "1.17.0", features = ["v4"] }
4546

4647
[dev-dependencies]
4748
datafusion = { workspace = true, features = ["nested_expressions"] }

datafusion/substrait/src/logical_plan/consumer/expr/literal.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,17 @@ use substrait::proto::expression::literal::{
4545
LiteralType,
4646
};
4747
use substrait::proto::expression::Literal;
48+
use uuid::Uuid;
4849

4950
pub async fn from_literal(
5051
consumer: &impl SubstraitConsumer,
5152
expr: &Literal,
5253
) -> datafusion::common::Result<Expr> {
5354
let scalar_value = from_substrait_literal_without_names(consumer, expr)?;
54-
Ok(Expr::Literal(scalar_value, None))
55+
// Since substrait removes aliases, we need to assign literals with a UUID alias to avoid
56+
// ambiguous names when the same literal is used before and after a join.
57+
let name = Uuid::new_v4().to_string();
58+
Ok(Expr::Literal(scalar_value, None).alias(name))
5559
}
5660

5761
pub(crate) fn from_substrait_literal_without_names(
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{"run_id":"1755898767-732982000","line":161,"new":{"module_name":"substrait_integration__cases__logical_plans__tests","snapshot_name":"null_literal_before_and_after_joins","metadata":{"source":"datafusion/substrait/tests/cases/logical_plans.rs","assertion_line":161,"expression":"plan"},"snapshot":"Projection: left.A, left.0a73c546-e641-4dc5-95b0-b5dd96f06c8f AS C, right.D, Utf8(NULL) AS 353fecc4-bfd5-479a-93eb-5904549fc4e8 AS E\n Left Join: left.A = right.A\n SubqueryAlias: left\n Union\n Projection: A.A, Utf8(NULL) AS 0a73c546-e641-4dc5-95b0-b5dd96f06c8f\n TableScan: A\n Projection: B.A, CAST(B.C AS Utf8)\n TableScan: B\n SubqueryAlias: right\n TableScan: C"},"old":{"module_name":"substrait_integration__cases__logical_plans__tests","metadata":{},"snapshot":"Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS EXPR$0, row_number() PARTITION BY [DATA.A] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS EXPR$1\n WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\n WindowAggr: windowExpr=[[row_number() PARTITION BY [DATA.A] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\n TableScan: DATA"}}
2+
{"run_id":"1755898908-401749000","line":161,"new":{"module_name":"substrait_integration__cases__logical_plans__tests","snapshot_name":"null_literal_before_and_after_joins","metadata":{"source":"datafusion/substrait/tests/cases/logical_plans.rs","assertion_line":161,"expression":"plan"},"snapshot":"Projection: left.A, left.b9ab1bf0-47e8-474e-9050-c169d0df6778 AS C, right.D, Utf8(NULL) AS c7ef1d90-7f46-498a-bfab-f8a622fbb700 AS E\n Left Join: left.A = right.A\n SubqueryAlias: left\n Union\n Projection: A.A, Utf8(NULL) AS b9ab1bf0-47e8-474e-9050-c169d0df6778\n TableScan: A\n Projection: B.A, CAST(B.C AS Utf8)\n TableScan: B\n SubqueryAlias: right\n TableScan: C"},"old":{"module_name":"substrait_integration__cases__logical_plans__tests","metadata":{},"snapshot":"Projection: left.A, left.0a73c546-e641-4dc5-95b0-b5dd96f06c8f AS C, right.D, Utf8(NULL) AS 353fecc4-bfd5-479a-93eb-5904549fc4e8 AS E\n Left Join: left.A = right.A\n SubqueryAlias: left\n Union\n Projection: A.A, Utf8(NULL) AS 0a73c546-e641-4dc5-95b0-b5dd96f06c8f\n TableScan: A\n Projection: B.A, CAST(B.C AS Utf8)\n TableScan: B\n SubqueryAlias: right\n TableScan: C"}}
3+
{"run_id":"1755899076-200504000","line":161,"new":null,"old":null}
4+
{"run_id":"1755899504-611535000","line":166,"new":null,"old":null}

datafusion/substrait/tests/cases/logical_plans.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,47 @@ mod tests {
144144
Ok(())
145145
}
146146

147+
#[tokio::test]
148+
async fn null_literal_before_and_after_joins() -> Result<()> {
149+
// Confirms that literals used before and after a join but for different columns
150+
// are correctly handled.
151+
152+
// File generated with substrait-java's Isthmus:
153+
// ./isthmus-cli/build/graal/isthmus --create "create table A (a int); create table B (a int, c int); create table C (a int, d int)" "select t.*, C.d, CAST(NULL AS VARCHAR) as e from (select a, CAST(NULL AS VARCHAR) as c from A UNION ALL select a, c from B) t LEFT JOIN C ON t.a = C.a"
154+
let proto_plan =
155+
read_json("tests/testdata/test_plans/null_literals.substrait.json");
156+
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?;
157+
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;
158+
159+
let mut settings = insta::Settings::clone_current();
160+
settings.add_filter(
161+
r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}",
162+
"[UUID]",
163+
);
164+
settings.bind(|| {
165+
assert_snapshot!(
166+
plan,
167+
@r#"
168+
Projection: left.A, left.[UUID] AS C, right.D, Utf8(NULL) AS [UUID] AS E
169+
Left Join: left.A = right.A
170+
SubqueryAlias: left
171+
Union
172+
Projection: A.A, Utf8(NULL) AS [UUID]
173+
TableScan: A
174+
Projection: B.A, CAST(B.C AS Utf8)
175+
TableScan: B
176+
SubqueryAlias: right
177+
TableScan: C
178+
"#
179+
);
180+
});
181+
182+
// Trigger execution to ensure plan validity
183+
DataFrame::new(ctx.state(), plan).show().await?;
184+
185+
Ok(())
186+
}
187+
147188
#[tokio::test]
148189
async fn non_nullable_lists() -> Result<()> {
149190
// DataFusion's Substrait consumer treats all lists as nullable, even if the Substrait plan specifies them as non-nullable.
Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
{
2+
"extensionUris": [{
3+
"extensionUriAnchor": 1,
4+
"uri": "/functions_comparison.yaml"
5+
}],
6+
"extensions": [{
7+
"extensionFunction": {
8+
"extensionUriReference": 1,
9+
"functionAnchor": 1,
10+
"name": "equal:any_any"
11+
}
12+
}],
13+
"relations": [{
14+
"root": {
15+
"input": {
16+
"project": {
17+
"common": {
18+
"emit": {
19+
"outputMapping": [4, 5, 6, 7]
20+
}
21+
},
22+
"input": {
23+
"join": {
24+
"common": {
25+
"direct": {
26+
}
27+
},
28+
"left": {
29+
"set": {
30+
"common": {
31+
"direct": {
32+
}
33+
},
34+
"inputs": [{
35+
"project": {
36+
"common": {
37+
"emit": {
38+
"outputMapping": [1, 2]
39+
}
40+
},
41+
"input": {
42+
"read": {
43+
"common": {
44+
"direct": {
45+
}
46+
},
47+
"baseSchema": {
48+
"names": ["A"],
49+
"struct": {
50+
"types": [{
51+
"i32": {
52+
"typeVariationReference": 0,
53+
"nullability": "NULLABILITY_NULLABLE"
54+
}
55+
}],
56+
"typeVariationReference": 0,
57+
"nullability": "NULLABILITY_REQUIRED"
58+
}
59+
},
60+
"namedTable": {
61+
"names": ["A"]
62+
}
63+
}
64+
},
65+
"expressions": [{
66+
"selection": {
67+
"directReference": {
68+
"structField": {
69+
"field": 0
70+
}
71+
},
72+
"rootReference": {
73+
}
74+
}
75+
}, {
76+
"literal": {
77+
"null": {
78+
"string": {
79+
"typeVariationReference": 0,
80+
"nullability": "NULLABILITY_NULLABLE"
81+
}
82+
},
83+
"nullable": false,
84+
"typeVariationReference": 0
85+
}
86+
}]
87+
}
88+
}, {
89+
"project": {
90+
"common": {
91+
"emit": {
92+
"outputMapping": [2, 3]
93+
}
94+
},
95+
"input": {
96+
"read": {
97+
"common": {
98+
"direct": {
99+
}
100+
},
101+
"baseSchema": {
102+
"names": ["A", "C"],
103+
"struct": {
104+
"types": [{
105+
"i32": {
106+
"typeVariationReference": 0,
107+
"nullability": "NULLABILITY_NULLABLE"
108+
}
109+
}, {
110+
"i32": {
111+
"typeVariationReference": 0,
112+
"nullability": "NULLABILITY_NULLABLE"
113+
}
114+
}],
115+
"typeVariationReference": 0,
116+
"nullability": "NULLABILITY_REQUIRED"
117+
}
118+
},
119+
"namedTable": {
120+
"names": ["B"]
121+
}
122+
}
123+
},
124+
"expressions": [{
125+
"selection": {
126+
"directReference": {
127+
"structField": {
128+
"field": 0
129+
}
130+
},
131+
"rootReference": {
132+
}
133+
}
134+
}, {
135+
"cast": {
136+
"type": {
137+
"string": {
138+
"typeVariationReference": 0,
139+
"nullability": "NULLABILITY_NULLABLE"
140+
}
141+
},
142+
"input": {
143+
"selection": {
144+
"directReference": {
145+
"structField": {
146+
"field": 1
147+
}
148+
},
149+
"rootReference": {
150+
}
151+
}
152+
},
153+
"failureBehavior": "FAILURE_BEHAVIOR_THROW_EXCEPTION"
154+
}
155+
}]
156+
}
157+
}],
158+
"op": "SET_OP_UNION_ALL"
159+
}
160+
},
161+
"right": {
162+
"read": {
163+
"common": {
164+
"direct": {
165+
}
166+
},
167+
"baseSchema": {
168+
"names": ["A", "D"],
169+
"struct": {
170+
"types": [{
171+
"i32": {
172+
"typeVariationReference": 0,
173+
"nullability": "NULLABILITY_NULLABLE"
174+
}
175+
}, {
176+
"i32": {
177+
"typeVariationReference": 0,
178+
"nullability": "NULLABILITY_NULLABLE"
179+
}
180+
}],
181+
"typeVariationReference": 0,
182+
"nullability": "NULLABILITY_REQUIRED"
183+
}
184+
},
185+
"namedTable": {
186+
"names": ["C"]
187+
}
188+
}
189+
},
190+
"expression": {
191+
"scalarFunction": {
192+
"functionReference": 1,
193+
"args": [],
194+
"outputType": {
195+
"bool": {
196+
"typeVariationReference": 0,
197+
"nullability": "NULLABILITY_NULLABLE"
198+
}
199+
},
200+
"arguments": [{
201+
"value": {
202+
"selection": {
203+
"directReference": {
204+
"structField": {
205+
"field": 0
206+
}
207+
},
208+
"rootReference": {
209+
}
210+
}
211+
}
212+
}, {
213+
"value": {
214+
"selection": {
215+
"directReference": {
216+
"structField": {
217+
"field": 2
218+
}
219+
},
220+
"rootReference": {
221+
}
222+
}
223+
}
224+
}],
225+
"options": []
226+
}
227+
},
228+
"type": "JOIN_TYPE_LEFT"
229+
}
230+
},
231+
"expressions": [{
232+
"selection": {
233+
"directReference": {
234+
"structField": {
235+
"field": 0
236+
}
237+
},
238+
"rootReference": {
239+
}
240+
}
241+
}, {
242+
"selection": {
243+
"directReference": {
244+
"structField": {
245+
"field": 1
246+
}
247+
},
248+
"rootReference": {
249+
}
250+
}
251+
}, {
252+
"selection": {
253+
"directReference": {
254+
"structField": {
255+
"field": 3
256+
}
257+
},
258+
"rootReference": {
259+
}
260+
}
261+
}, {
262+
"literal": {
263+
"null": {
264+
"string": {
265+
"typeVariationReference": 0,
266+
"nullability": "NULLABILITY_NULLABLE"
267+
}
268+
},
269+
"nullable": false,
270+
"typeVariationReference": 0
271+
}
272+
}]
273+
}
274+
},
275+
"names": ["A", "C", "D", "E"]
276+
}
277+
}],
278+
"expectedTypeUrls": [],
279+
"version": {
280+
"majorNumber": 0,
281+
"minorNumber": 74,
282+
"patchNumber": 0,
283+
"gitHash": "",
284+
"producer": "isthmus"
285+
},
286+
"parameterBindings": []
287+
}

0 commit comments

Comments
 (0)