Skip to content

Commit ae16eba

Browse files
Big one: have file sources own the config and impl Datasource for FileSoruce
1 parent 815bbce commit ae16eba

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1240
-1192
lines changed

datafusion-examples/examples/advanced_parquet_index.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
5555
use arrow::datatypes::SchemaRef;
5656
use async_trait::async_trait;
5757
use bytes::Bytes;
58+
use datafusion::config::TableParquetOptions;
5859
use datafusion::datasource::memory::DataSourceExec;
5960
use futures::future::BoxFuture;
6061
use futures::FutureExt;
@@ -491,23 +492,22 @@ impl TableProvider for IndexTableProvider {
491492
CachedParquetFileReaderFactory::new(Arc::clone(&self.object_store))
492493
.with_file(indexed_file);
493494

494-
let file_source = Arc::new(
495-
ParquetSource::default()
495+
let file_scan_config = FileScanConfigBuilder::new(object_store_url, schema)
496+
.with_limit(limit)
497+
.with_projection(projection.cloned())
498+
.with_file(partitioned_file)
499+
.build();
500+
501+
let file_source =
502+
ParquetSource::new(TableParquetOptions::default(), file_scan_config.clone())
496503
// provide the predicate so the DataSourceExec can try and prune
497504
// row groups internally
498505
.with_predicate(predicate)
499506
// provide the factory to create parquet reader without re-reading metadata
500-
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
501-
);
502-
let file_scan_config =
503-
FileScanConfigBuilder::new(object_store_url, schema, file_source)
504-
.with_limit(limit)
505-
.with_projection(projection.cloned())
506-
.with_file(partitioned_file)
507-
.build();
507+
.with_parquet_file_reader_factory(Arc::new(reader_factory));
508508

509509
// Finally, put it all together into a DataSourceExec
510-
Ok(DataSourceExec::from_data_source(file_scan_config))
510+
Ok(DataSourceExec::from_data_source(file_source))
511511
}
512512

513513
/// Tell DataFusion to push filters down to the scan method

datafusion-examples/examples/csv_json_opener.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use datafusion::{
2424
file_format::file_compression_type::FileCompressionType,
2525
listing::PartitionedFile,
2626
object_store::ObjectStoreUrl,
27-
physical_plan::{CsvSource, FileSource, FileStream, JsonOpener, JsonSource},
27+
physical_plan::{CsvSource, FileSource, FileStream, JsonOpener},
2828
},
2929
error::Result,
3030
physical_plan::metrics::ExecutionPlanMetricsSet,
@@ -58,17 +58,17 @@ async fn csv_opener() -> Result<()> {
5858
let scan_config = FileScanConfigBuilder::new(
5959
ObjectStoreUrl::local_filesystem(),
6060
Arc::clone(&schema),
61-
Arc::new(CsvSource::default()),
6261
)
6362
.with_projection(Some(vec![12, 0]))
6463
.with_batch_size(Some(8192))
6564
.with_limit(Some(5))
6665
.with_file(PartitionedFile::new(path.display().to_string(), 10))
6766
.build();
6867

69-
let config = CsvSource::new(true, b',', b'"').with_comment(Some(b'#'));
68+
let source =
69+
CsvSource::new(true, b',', b'"', scan_config.clone()).with_comment(Some(b'#'));
7070

71-
let opener = config.create_file_opener(object_store, &scan_config, 0);
71+
let opener = source.create_file_opener(object_store, 0);
7272

7373
let mut result = vec![];
7474
let mut stream =
@@ -118,15 +118,12 @@ async fn json_opener() -> Result<()> {
118118
Arc::new(object_store),
119119
);
120120

121-
let scan_config = FileScanConfigBuilder::new(
122-
ObjectStoreUrl::local_filesystem(),
123-
schema,
124-
Arc::new(JsonSource::default()),
125-
)
126-
.with_projection(Some(vec![1, 0]))
127-
.with_limit(Some(5))
128-
.with_file(PartitionedFile::new(path.to_string(), 10))
129-
.build();
121+
let scan_config =
122+
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), schema)
123+
.with_projection(Some(vec![1, 0]))
124+
.with_limit(Some(5))
125+
.with_file(PartitionedFile::new(path.to_string(), 10))
126+
.build();
130127

131128
let mut stream = FileStream::new(
132129
&scan_config,

datafusion-examples/examples/custom_file_format.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,11 @@ impl FileFormat for TSVFileFormat {
111111
async fn create_physical_plan(
112112
&self,
113113
state: &dyn Session,
114-
conf: FileScanConfig,
114+
source: Arc<dyn FileSource>,
115115
) -> Result<Arc<dyn ExecutionPlan>> {
116-
self.csv_file_format.create_physical_plan(state, conf).await
116+
self.csv_file_format
117+
.create_physical_plan(state, source)
118+
.await
117119
}
118120

119121
async fn create_writer_physical_plan(
@@ -128,8 +130,8 @@ impl FileFormat for TSVFileFormat {
128130
.await
129131
}
130132

131-
fn file_source(&self) -> Arc<dyn FileSource> {
132-
self.csv_file_format.file_source()
133+
fn file_source(&self, config: FileScanConfig) -> Arc<dyn FileSource> {
134+
self.csv_file_format.file_source(config)
133135
}
134136
}
135137

datafusion-examples/examples/default_column_values.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use datafusion::catalog::{Session, TableProvider};
2929
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
3030
use datafusion::common::DFSchema;
3131
use datafusion::common::{Result, ScalarValue};
32+
use datafusion::config::TableParquetOptions;
3233
use datafusion::datasource::listing::PartitionedFile;
3334
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
3435
use datafusion::execution::context::SessionContext;
@@ -235,10 +236,6 @@ impl TableProvider for DefaultValueTableProvider {
235236
&df_schema,
236237
)?;
237238

238-
let parquet_source = ParquetSource::default()
239-
.with_predicate(filter)
240-
.with_pushdown_filters(true);
241-
242239
let object_store_url = ObjectStoreUrl::parse("memory://")?;
243240
let store = state.runtime_env().object_store(object_store_url)?;
244241

@@ -255,19 +252,22 @@ impl TableProvider for DefaultValueTableProvider {
255252
.map(|file| PartitionedFile::new(file.location.clone(), file.size))
256253
.collect();
257254

258-
let file_scan_config = FileScanConfigBuilder::new(
255+
let config = FileScanConfigBuilder::new(
259256
ObjectStoreUrl::parse("memory://")?,
260257
self.schema.clone(),
261-
Arc::new(parquet_source),
262258
)
263259
.with_projection(projection.cloned())
264260
.with_limit(limit)
265261
.with_file_group(file_group)
266-
.with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _));
262+
.with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _))
263+
.build();
264+
265+
let parquet_source =
266+
ParquetSource::new(TableParquetOptions::default(), config.clone())
267+
.with_predicate(filter)
268+
.with_pushdown_filters(true);
267269

268-
Ok(Arc::new(DataSourceExec::new(Arc::new(
269-
file_scan_config.build(),
270-
))))
270+
Ok(DataSourceExec::from_data_source(parquet_source))
271271
}
272272
}
273273

datafusion-examples/examples/json_shredding.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use datafusion::common::tree_node::{
2929
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
3030
};
3131
use datafusion::common::{assert_contains, DFSchema, Result};
32+
use datafusion::config::TableParquetOptions;
3233
use datafusion::datasource::listing::PartitionedFile;
3334
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
3435
use datafusion::execution::context::SessionContext;
@@ -243,10 +244,6 @@ impl TableProvider for ExampleTableProvider {
243244
&df_schema,
244245
)?;
245246

246-
let parquet_source = ParquetSource::default()
247-
.with_predicate(filter)
248-
.with_pushdown_filters(true);
249-
250247
let object_store_url = ObjectStoreUrl::parse("memory://")?;
251248

252249
let store = state.runtime_env().object_store(object_store_url)?;
@@ -264,20 +261,21 @@ impl TableProvider for ExampleTableProvider {
264261
.map(|file| PartitionedFile::new(file.location.clone(), file.size))
265262
.collect();
266263

267-
let file_scan_config = FileScanConfigBuilder::new(
268-
ObjectStoreUrl::parse("memory://")?,
269-
schema,
270-
Arc::new(parquet_source),
271-
)
272-
.with_projection(projection.cloned())
273-
.with_limit(limit)
274-
.with_file_group(file_group)
275-
// if the rewriter needs a reference to the table schema you can bind self.schema() here
276-
.with_expr_adapter(Some(Arc::new(ShreddedJsonRewriterFactory) as _));
277-
278-
Ok(Arc::new(DataSourceExec::new(Arc::new(
279-
file_scan_config.build(),
280-
))))
264+
let config =
265+
FileScanConfigBuilder::new(ObjectStoreUrl::parse("memory://")?, schema)
266+
.with_projection(projection.cloned())
267+
.with_limit(limit)
268+
.with_file_group(file_group)
269+
// if the rewriter needs a reference to the table schema you can bind self.schema() here
270+
.with_expr_adapter(Some(Arc::new(ShreddedJsonRewriterFactory) as _))
271+
.build();
272+
273+
let parquet_source =
274+
ParquetSource::new(TableParquetOptions::default(), config.clone())
275+
.with_predicate(filter)
276+
.with_pushdown_filters(true);
277+
278+
Ok(DataSourceExec::from_data_source(parquet_source))
281279
}
282280
}
283281

datafusion-examples/examples/parquet_embedded_index.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ use arrow_schema::{DataType, Field, Schema, SchemaRef};
117117
use async_trait::async_trait;
118118
use datafusion::catalog::{Session, TableProvider};
119119
use datafusion::common::{exec_err, HashMap, HashSet, Result};
120+
use datafusion::config::TableParquetOptions;
120121
use datafusion::datasource::listing::PartitionedFile;
121122
use datafusion::datasource::memory::DataSourceExec;
122123
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
@@ -426,8 +427,8 @@ impl TableProvider for DistinctIndexTable {
426427

427428
// Build ParquetSource to actually read the files
428429
let url = ObjectStoreUrl::parse("file://")?;
429-
let source = Arc::new(ParquetSource::default().with_enable_page_index(true));
430-
let mut builder = FileScanConfigBuilder::new(url, self.schema.clone(), source);
430+
431+
let mut builder = FileScanConfigBuilder::new(url, self.schema.clone());
431432
for file in files_to_scan {
432433
let path = self.dir.join(file);
433434
let len = std::fs::metadata(&path)?.len();
@@ -438,7 +439,11 @@ impl TableProvider for DistinctIndexTable {
438439
PartitionedFile::new(path.to_str().unwrap().to_string(), len);
439440
builder = builder.with_file(partitioned_file);
440441
}
441-
Ok(DataSourceExec::from_data_source(builder.build()))
442+
443+
let config = builder.build();
444+
let source = ParquetSource::new(TableParquetOptions::default(), config.clone())
445+
.with_enable_page_index(true);
446+
Ok(DataSourceExec::from_data_source(source))
442447
}
443448

444449
/// Tell DataFusion that we can handle filters on the "category" column

datafusion-examples/examples/parquet_exec_visitor.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::sync::Arc;
1919

2020
use datafusion::datasource::file_format::parquet::ParquetFormat;
2121
use datafusion::datasource::listing::ListingOptions;
22-
use datafusion::datasource::physical_plan::{FileGroup, ParquetSource};
22+
use datafusion::datasource::physical_plan::{FileGroup, FileSource, ParquetSource};
2323
use datafusion::datasource::source::DataSourceExec;
2424
use datafusion::error::DataFusionError;
2525
use datafusion::execution::context::SessionContext;
@@ -98,9 +98,11 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
9898
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
9999
// If needed match on a specific `ExecutionPlan` node type
100100
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
101-
if let Some((file_config, _)) =
101+
if let Some(parquet_source) =
102102
data_source_exec.downcast_to_file_source::<ParquetSource>()
103103
{
104+
let file_config = parquet_source.config();
105+
104106
self.file_groups = Some(file_config.file_groups.clone());
105107

106108
let metrics = match data_source_exec.metrics() {

datafusion-examples/examples/parquet_index.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use datafusion::common::pruning::PruningStatistics;
2727
use datafusion::common::{
2828
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
2929
};
30+
use datafusion::config::TableParquetOptions;
3031
use datafusion::datasource::listing::PartitionedFile;
3132
use datafusion::datasource::memory::DataSourceExec;
3233
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
@@ -243,9 +244,9 @@ impl TableProvider for IndexTableProvider {
243244
let files = self.index.get_files(predicate.clone())?;
244245

245246
let object_store_url = ObjectStoreUrl::parse("file://")?;
246-
let source = Arc::new(ParquetSource::default().with_predicate(predicate));
247+
247248
let mut file_scan_config_builder =
248-
FileScanConfigBuilder::new(object_store_url, self.schema(), source)
249+
FileScanConfigBuilder::new(object_store_url, self.schema())
249250
.with_projection(projection.cloned())
250251
.with_limit(limit);
251252

@@ -258,9 +259,12 @@ impl TableProvider for IndexTableProvider {
258259
PartitionedFile::new(canonical_path.display().to_string(), file_size),
259260
);
260261
}
261-
Ok(DataSourceExec::from_data_source(
262-
file_scan_config_builder.build(),
263-
))
262+
263+
let config = file_scan_config_builder.build();
264+
let source = ParquetSource::new(TableParquetOptions::default(), config.clone())
265+
.with_predicate(predicate);
266+
267+
Ok(DataSourceExec::from_data_source(source))
264268
}
265269

266270
/// Tell DataFusion to push filters down to the scan method

datafusion/core/src/datasource/file_format/arrow.rs

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use datafusion_common::{
4949
use datafusion_common_runtime::{JoinSet, SpawnedTask};
5050
use datafusion_datasource::display::FileGroupDisplay;
5151
use datafusion_datasource::file::FileSource;
52-
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
52+
use datafusion_datasource::file_scan_config::FileScanConfig;
5353
use datafusion_datasource::sink::{DataSink, DataSinkExec};
5454
use datafusion_datasource::write::ObjectWriterBuilder;
5555
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
@@ -58,7 +58,6 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement;
5858

5959
use async_trait::async_trait;
6060
use bytes::Bytes;
61-
use datafusion_datasource::source::DataSourceExec;
6261
use futures::stream::BoxStream;
6362
use futures::StreamExt;
6463
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
@@ -173,19 +172,6 @@ impl FileFormat for ArrowFormat {
173172
Ok(Statistics::new_unknown(&table_schema))
174173
}
175174

176-
async fn create_physical_plan(
177-
&self,
178-
_state: &dyn Session,
179-
conf: FileScanConfig,
180-
) -> Result<Arc<dyn ExecutionPlan>> {
181-
let source = Arc::new(ArrowSource::default());
182-
let config = FileScanConfigBuilder::from(conf)
183-
.with_source(source)
184-
.build();
185-
186-
Ok(DataSourceExec::from_data_source(config))
187-
}
188-
189175
async fn create_writer_physical_plan(
190176
&self,
191177
input: Arc<dyn ExecutionPlan>,
@@ -202,8 +188,8 @@ impl FileFormat for ArrowFormat {
202188
Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
203189
}
204190

205-
fn file_source(&self) -> Arc<dyn FileSource> {
206-
Arc::new(ArrowSource::default())
191+
fn file_source(&self, config: FileScanConfig) -> Arc<dyn FileSource> {
192+
Arc::new(ArrowSource::new(config))
207193
}
208194
}
209195

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -80,21 +80,17 @@ pub(crate) mod test_util {
8080
}]
8181
.into()];
8282

83-
let exec = format
84-
.create_physical_plan(
85-
state,
86-
FileScanConfigBuilder::new(
87-
ObjectStoreUrl::local_filesystem(),
88-
file_schema,
89-
format.file_source(),
90-
)
83+
let config =
84+
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_schema)
9185
.with_file_groups(file_groups)
9286
.with_file_source_projected_statistics(statistics)
9387
.with_projection(projection)
9488
.with_limit(limit)
95-
.build(),
96-
)
97-
.await?;
89+
.build();
90+
91+
let source = format.file_source(config);
92+
let exec = format.create_physical_plan(state, source).await?;
93+
9894
Ok(exec)
9995
}
10096
}

0 commit comments

Comments
 (0)