Skip to content

Conversation

friendlymatthew
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

This PR simplifies the relationship between FileSources <--> FileScanConfig <--> DataSource.

Currently, FileScanConfig is a struct used to group common parameters shared across different file sources. However, the existing design also makes FileScanConfig impl DataSource. This means to construct a data source execution plan, you must derive it from a configuration struct.

This PR removes that indirection. Instead, each FileSource struct holds a FileScanConfig field, and all types impl FileSource also impl DataSource.

This redesign proves to remove a lot of redundant code. For instance, AvroSource previously duplicated fields from FileScanConfig, which required additional boilerplate to manually get/set values:

Screenshot 2025-08-19 at 3 00 26 PM

--

We still maintain an abstraction bounday between FileSource and DataSources. The DataSource impl remains generic over any T: FileSource

Are there any user-facing changes?

Yes -- and they are substantial.

Note: the current diff does not yet include deprecation strategies for existing methods to keep the review process clearer

@github-actions github-actions bot added core Core DataFusion crate substrait Changes to the substrait crate proto Related to proto crate datasource Changes to the datasource crate labels Aug 19, 2025
@friendlymatthew friendlymatthew force-pushed the friendlymatthew/have-file-source-impl-data-source branch 6 times, most recently from 93b9263 to 152548d Compare August 20, 2025 14:51
@github-actions github-actions bot removed the substrait Changes to the substrait crate label Aug 20, 2025
@friendlymatthew friendlymatthew force-pushed the friendlymatthew/have-file-source-impl-data-source branch 4 times, most recently from b138003 to d6ff4cc Compare August 20, 2025 19:35
@github-actions github-actions bot added documentation Improvements or additions to documentation substrait Changes to the substrait crate labels Aug 21, 2025
@friendlymatthew friendlymatthew force-pushed the friendlymatthew/have-file-source-impl-data-source branch from ae16eba to 3177ca6 Compare August 21, 2025 18:42
@friendlymatthew
Copy link
Contributor Author

friendlymatthew commented Aug 22, 2025

I was able to identify the root cause behind several test failures related to roundtripping physical plans in datafusion/proto

impacted test cases

cases::roundtrip_physical_plan::roundtrip_empty_projection
cases::roundtrip_physical_plan::roundtrip_parquet_select_projection
cases::roundtrip_physical_plan::roundtrip_parquet_select_projection_predicate
cases::roundtrip_physical_plan::roundtrip_parquet_select_star
cases::roundtrip_physical_plan::roundtrip_parquet_select_star_predicate
cases::roundtrip_physical_plan::test_round_trip_date_part_display
cases::roundtrip_physical_plan::test_round_trip_groups_display
cases::roundtrip_physical_plan::test_round_trip_human_display
cases::roundtrip_physical_plan::test_round_trip_tpch_queries

tl;dr, we expect FileScanConfig to be lossless when converting to/from protobuf. However, FileSource is not lossless. Now that FileSource implements DataSource, roundtripping physical plans loses some information, which is expected-- but the current assert_eq fails because of it.

On main, FileScanConfig implements DataSource directly, and when we roundtrip physical plans, we compare the Display output of FileScanConfig. Although FileScanConfig holds a file source struct, we display minimal info (e.g, source type: "parquet"). However in this PR, the ownership model changes: now all FileSource structs own a FileScanConfig, and they implement DataSource directly. Because we #derive(Debug) for FileSource types, the debug output includes fields that are not preserved through serialization.

For example, in ParquetSource, the parquet_file_reader_factory field is not retained after deserialization. Only the TableParquetOptions are serialized and restored; everything else is set to None.

Thoughts on fixes

I see 2 possible directions for resolving the test failures:

  1. Loosen the assertion when verifying DataSourceExec
    Instead of comparing the entire debug output, we could fall back to comparing the Display output of the underlying FileScanConfig, as is done on main. This would ignore non-serialized fields

  2. Encode extra metadata into TableParquetOptions
    We could explicitly encode some of the additional fields (e.g. presence of a custom reader factory) into the serialized metadata. But this feels futile-- since these fields aren't actually restored on deserialization, tracking their prior existence is extra overhead

I'm leaning towards 1 since it reflects the current lossless boundary and avoids overcomplicating serialization. But I'm curious if people have any opinions

cc @adriangb @berkaysynnada @mbrubeck @xudong963 @comphead @blaginin @alamb

@adriangb
Copy link
Contributor

I agree that there was always losses in serialization / deserialization e.g. loss of custom SchemaAdapterFactory. As long as there aren't more losses now than before we should adapt the assertion to roughly match this expectation.

@friendlymatthew friendlymatthew force-pushed the friendlymatthew/have-file-source-impl-data-source branch 4 times, most recently from ccf7308 to a152b7d Compare August 22, 2025 20:10
@friendlymatthew friendlymatthew changed the title Have T: impl FileSource implement DataSource and own FileScanConfig Change ownership model between FileScanConfig and FileSources Aug 22, 2025
@friendlymatthew friendlymatthew changed the title Change ownership model between FileScanConfig and FileSources Redesign ownership model between FileScanConfig and FileSources Aug 22, 2025
@friendlymatthew friendlymatthew force-pushed the friendlymatthew/have-file-source-impl-data-source branch from 513b7aa to 66eac9f Compare August 22, 2025 20:38
@xudong963 xudong963 self-requested a review August 23, 2025 14:30
@friendlymatthew friendlymatthew force-pushed the friendlymatthew/have-file-source-impl-data-source branch 2 times, most recently from fa40cc8 to 5cfd81d Compare August 25, 2025 14:41
@friendlymatthew friendlymatthew force-pushed the friendlymatthew/have-file-source-impl-data-source branch from 5cfd81d to c7ea40a Compare August 25, 2025 14:43
@friendlymatthew friendlymatthew marked this pull request as ready for review August 25, 2025 15:10
@friendlymatthew friendlymatthew force-pushed the friendlymatthew/have-file-source-impl-data-source branch 2 times, most recently from b417b26 to 1bbd3ed Compare August 25, 2025 15:17
Copy link
Contributor Author

@friendlymatthew friendlymatthew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, this PR is ready for review-- It's quite sizable and touches 55+ files.

I've split up the PR into 2 commits:

  • Move all fields shared across FileSource into FileScanConfig
  • Have FileSource own FileScanConfig and directly impl DataSource

I would recommend starting from datafusion/datasource/src/file_scan_config.rs to see which fields were added to FileScanConfig. Then, all concrete types that impl FileSource like ParquetSource.

datafusion/datasource/src/file.rs is the main change, as it implements DataSource for all T: FileSource

Comment on lines -494 to -510
let file_source = Arc::new(
ParquetSource::default()
let file_scan_config = FileScanConfigBuilder::new(object_store_url, schema)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file)
.build();

let file_source =
ParquetSource::new(TableParquetOptions::default(), file_scan_config.clone())
// provide the predicate so the DataSourceExec can try and prune
// row groups internally
.with_predicate(predicate)
// provide the factory to create parquet reader without re-reading metadata
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
);
let file_scan_config =
FileScanConfigBuilder::new(object_store_url, schema, file_source)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file)
.build();
.with_parquet_file_reader_factory(Arc::new(reader_factory));

// Finally, put it all together into a DataSourceExec
Ok(DataSourceExec::from_data_source(file_scan_config))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll see this pattern a lot in this PR.

The old flow goes something like:

  1. Define the file source
  2. Define the file scan config and move the file source inside
  3. Derive a data source plan from the file scan config

Inside this flow, there's a circular dependency (call file source from config, create file opener from file source but also pass in the config).

The new flow goes something like:

  1. Define the config
  2. Define the file source which now owns the config
  3. Derive a data source plan from the file source

@friendlymatthew friendlymatthew force-pushed the friendlymatthew/have-file-source-impl-data-source branch from 1bbd3ed to 0cc2125 Compare August 25, 2025 20:06
Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just small comments for now. will review more tomorrow

Comment on lines -494 to +507
let file_source = Arc::new(
ParquetSource::default()
let file_scan_config = FileScanConfigBuilder::new(object_store_url, schema)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file)
.build();

let file_source =
ParquetSource::new(TableParquetOptions::default(), file_scan_config.clone())
// provide the predicate so the DataSourceExec can try and prune
// row groups internally
.with_predicate(predicate)
// provide the factory to create parquet reader without re-reading metadata
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
);
let file_scan_config =
FileScanConfigBuilder::new(object_store_url, schema, file_source)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file)
.build();
.with_parquet_file_reader_factory(Arc::new(reader_factory));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just moving code around to initialize the FileScanConfig before the ParquetSource ✅

Comment on lines -70 to -72
.with_schema(schema)
.with_batch_size(8192)
.with_projection(&scan_config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense that the schema, batch size and projection are not properties inherent to CSVs and thus should be part of FileScanConfig. In fact they are currently duplicated!

@xudong963
Copy link
Member

I'll start review in recent two days.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate datasource Changes to the datasource crate documentation Improvements or additions to documentation proto Related to proto crate substrait Changes to the substrait crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Unify how various FileSources are applying projections? Clean up APIs around FileScanConfigBuilder, FileScanConfig and FileSource
3 participants