-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Redesign ownership model between FileScanConfig
and FileSource
s
#17242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Redesign ownership model between FileScanConfig
and FileSource
s
#17242
Conversation
93b9263
to
152548d
Compare
b138003
to
d6ff4cc
Compare
ae16eba
to
3177ca6
Compare
I was able to identify the root cause behind several test failures related to roundtripping physical plans in impacted test cases
tl;dr, we expect On For example, in Thoughts on fixesI see 2 possible directions for resolving the test failures:
I'm leaning towards 1 since it reflects the current lossless boundary and avoids overcomplicating serialization. But I'm curious if people have any opinions cc @adriangb @berkaysynnada @mbrubeck @xudong963 @comphead @blaginin @alamb |
I agree that there was always losses in serialization / deserialization e.g. loss of custom |
ccf7308
to
a152b7d
Compare
FileScanConfig
and FileSource
s
FileScanConfig
and FileSource
sFileScanConfig
and FileSource
s
513b7aa
to
66eac9f
Compare
fa40cc8
to
5cfd81d
Compare
5cfd81d
to
c7ea40a
Compare
b417b26
to
1bbd3ed
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, this PR is ready for review-- It's quite sizable and touches 55+ files.
I've split up the PR into 2 commits:
- Move all fields shared across
FileSource
intoFileScanConfig
- Have
FileSource
ownFileScanConfig
and directly implDataSource
I would recommend starting from datafusion/datasource/src/file_scan_config.rs
to see which fields were added to FileScanConfig
. Then, all concrete types that impl FileSource
like ParquetSource.
datafusion/datasource/src/file.rs
is the main change, as it implements DataSource
for all T: FileSource
let file_source = Arc::new( | ||
ParquetSource::default() | ||
let file_scan_config = FileScanConfigBuilder::new(object_store_url, schema) | ||
.with_limit(limit) | ||
.with_projection(projection.cloned()) | ||
.with_file(partitioned_file) | ||
.build(); | ||
|
||
let file_source = | ||
ParquetSource::new(TableParquetOptions::default(), file_scan_config.clone()) | ||
// provide the predicate so the DataSourceExec can try and prune | ||
// row groups internally | ||
.with_predicate(predicate) | ||
// provide the factory to create parquet reader without re-reading metadata | ||
.with_parquet_file_reader_factory(Arc::new(reader_factory)), | ||
); | ||
let file_scan_config = | ||
FileScanConfigBuilder::new(object_store_url, schema, file_source) | ||
.with_limit(limit) | ||
.with_projection(projection.cloned()) | ||
.with_file(partitioned_file) | ||
.build(); | ||
.with_parquet_file_reader_factory(Arc::new(reader_factory)); | ||
|
||
// Finally, put it all together into a DataSourceExec | ||
Ok(DataSourceExec::from_data_source(file_scan_config)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll see this pattern a lot in this PR.
The old flow goes something like:
- Define the file source
- Define the file scan config and move the file source inside
- Derive a data source plan from the file scan config
Inside this flow, there's a circular dependency (call file source from config, create file opener from file source but also pass in the config).
The new flow goes something like:
- Define the config
- Define the file source which now owns the config
- Derive a data source plan from the file source
1bbd3ed
to
0cc2125
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just small comments for now. will review more tomorrow
let file_source = Arc::new( | ||
ParquetSource::default() | ||
let file_scan_config = FileScanConfigBuilder::new(object_store_url, schema) | ||
.with_limit(limit) | ||
.with_projection(projection.cloned()) | ||
.with_file(partitioned_file) | ||
.build(); | ||
|
||
let file_source = | ||
ParquetSource::new(TableParquetOptions::default(), file_scan_config.clone()) | ||
// provide the predicate so the DataSourceExec can try and prune | ||
// row groups internally | ||
.with_predicate(predicate) | ||
// provide the factory to create parquet reader without re-reading metadata | ||
.with_parquet_file_reader_factory(Arc::new(reader_factory)), | ||
); | ||
let file_scan_config = | ||
FileScanConfigBuilder::new(object_store_url, schema, file_source) | ||
.with_limit(limit) | ||
.with_projection(projection.cloned()) | ||
.with_file(partitioned_file) | ||
.build(); | ||
.with_parquet_file_reader_factory(Arc::new(reader_factory)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just moving code around to initialize the FileScanConfig before the ParquetSource ✅
.with_schema(schema) | ||
.with_batch_size(8192) | ||
.with_projection(&scan_config); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense that the schema, batch size and projection are not properties inherent to CSVs and thus should be part of FileScanConfig. In fact they are currently duplicated!
I'll start review in recent two days. |
Which issue does this PR close?
FileSource
s are applying projections? #17095FileScanConfigBuilder
,FileScanConfig
andFileSource
#15952Rationale for this change
This PR simplifies the relationship between
FileSource
s <-->FileScanConfig
<-->DataSource
.Currently,
FileScanConfig
is a struct used to group common parameters shared across different file sources. However, the existing design also makesFileScanConfig
implDataSource
. This means to construct a data source execution plan, you must derive it from a configuration struct.This PR removes that indirection. Instead, each
FileSource
struct holds aFileScanConfig
field, and all types implFileSource
also implDataSource
.This redesign proves to remove a lot of redundant code. For instance,
AvroSource
previously duplicated fields fromFileScanConfig
, which required additional boilerplate to manually get/set values:--
We still maintain an abstraction bounday between
FileSource
andDataSource
s. TheDataSource
impl remains generic over anyT: FileSource
Are there any user-facing changes?
Yes -- and they are substantial.
Note: the current diff does not yet include deprecation strategies for existing methods to keep the review process clearer