Skip to content

Commit cb571d9

Browse files
committed
Fix: ListingTableFactory hive column detection
- Fixes an issue in the ListingTableFactory where hive columns are not detected and incorporated into the table schema when an explicit schema has not been set by the user - Fixes an issue where subdirectories that do not follow Hive formatting (e.g. key=value) could be erroneously interpreted as contributing to the table schema
1 parent d750ade commit cb571d9

File tree

3 files changed

+93
-11
lines changed

3 files changed

+93
-11
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -802,6 +802,9 @@ impl ListingOptions {
802802
.rev()
803803
.skip(1) // get parents only; skip the file itself
804804
.rev()
805+
// Partitions are expected to follow the format "column_name=value", so we
806+
// should ignore any path part that cannot be parsed into the expected format
807+
.filter(|s| s.contains('='))
805808
.map(|s| s.split('=').take(1).collect())
806809
.collect_vec()
807810
})

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 62 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,25 @@ impl TableProviderFactory for ListingTableFactory {
6464
.create(session_state, &cmd.options)?;
6565

6666
let file_extension = get_extension(cmd.location.as_str());
67+
let mut table_path = ListingTableUrl::parse(&cmd.location)?;
68+
let mut options = ListingOptions::new(file_format)
69+
.with_session_config_options(session_state.config())
70+
.with_file_extension(file_extension);
6771

6872
let (provided_schema, table_partition_cols) = if cmd.schema.fields().is_empty() {
73+
let part_cols = match cmd.table_partition_cols.is_empty() {
74+
true => options
75+
.infer_partitions(session_state, &table_path)
76+
.await?
77+
.into_iter(),
78+
false => cmd.table_partition_cols.clone().into_iter(),
79+
};
6980
(
7081
None,
71-
cmd.table_partition_cols
72-
.iter()
73-
.map(|x| {
82+
part_cols
83+
.map(|p| {
7484
(
75-
x.clone(),
85+
p,
7686
DataType::Dictionary(
7787
Box::new(DataType::UInt16),
7888
Box::new(DataType::Utf8),
@@ -108,13 +118,7 @@ impl TableProviderFactory for ListingTableFactory {
108118
(Some(schema), table_partition_cols)
109119
};
110120

111-
let mut table_path = ListingTableUrl::parse(&cmd.location)?;
112-
113-
let options = ListingOptions::new(file_format)
114-
.with_file_extension(&file_extension)
115-
.with_session_config_options(session_state.config())
116-
.with_table_partition_cols(table_partition_cols);
117-
121+
options = options.with_table_partition_cols(table_partition_cols);
118122
options
119123
.validate_partitions(session_state, &table_path)
120124
.await?;
@@ -189,6 +193,8 @@ fn get_extension(path: &str) -> String {
189193
mod tests {
190194
use glob::Pattern;
191195
use std::collections::HashMap;
196+
use std::fs;
197+
use std::path::PathBuf;
192198

193199
use super::*;
194200
use crate::{
@@ -375,4 +381,49 @@ mod tests {
375381
Pattern::new("*.csv").unwrap()
376382
);
377383
}
384+
385+
#[tokio::test]
386+
async fn test_create_with_hive_partitions() {
387+
let dir = tempfile::tempdir().unwrap();
388+
let mut path = PathBuf::from(dir.path());
389+
path.extend(["key1=value1", "key2=value2"]);
390+
fs::create_dir_all(&path).unwrap();
391+
path.push("data.parquet");
392+
fs::File::create_new(&path).unwrap();
393+
394+
let factory = ListingTableFactory::new();
395+
let context = SessionContext::new();
396+
let state = context.state();
397+
let name = TableReference::bare("foo");
398+
399+
let cmd = CreateExternalTable {
400+
name,
401+
location: dir.path().to_str().unwrap().to_string(),
402+
file_type: "parquet".to_string(),
403+
schema: Arc::new(DFSchema::empty()),
404+
table_partition_cols: vec![],
405+
if_not_exists: false,
406+
temporary: false,
407+
definition: None,
408+
order_exprs: vec![],
409+
unbounded: false,
410+
options: HashMap::new(),
411+
constraints: Constraints::default(),
412+
column_defaults: HashMap::new(),
413+
};
414+
let table_provider = factory.create(&state, &cmd).await.unwrap();
415+
let listing_table = table_provider
416+
.as_any()
417+
.downcast_ref::<ListingTable>()
418+
.unwrap();
419+
420+
let listing_options = listing_table.options();
421+
let dtype =
422+
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8));
423+
let expected_cols = vec![
424+
(String::from("key1"), dtype.clone()),
425+
(String::from("key2"), dtype.clone()),
426+
];
427+
assert_eq!(expected_cols, listing_options.table_partition_cols);
428+
}
378429
}

datafusion/sqllogictest/test_files/insert_to_external.slt

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,34 @@ select * from partitioned_insert_test order by a,b,c
175175
1 20 200
176176
2 20 200
177177

178+
statement count 0
179+
CREATE EXTERNAL TABLE
180+
partitioned_insert_test_readback
181+
STORED AS csv
182+
LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned/';
183+
184+
query TTT
185+
describe partitioned_insert_test_readback;
186+
----
187+
c Int64 YES
188+
a Dictionary(UInt16, Utf8) NO
189+
b Dictionary(UInt16, Utf8) NO
190+
191+
query ITT
192+
select * from partitioned_insert_test_readback order by a,b,c;
193+
----
194+
1 10 100
195+
1 10 200
196+
1 20 100
197+
2 20 100
198+
1 20 200
199+
2 20 200
200+
201+
query I
202+
select count(*) from partitioned_insert_test_readback where b=100;
203+
----
204+
3
205+
178206
statement ok
179207
CREATE EXTERNAL TABLE
180208
partitioned_insert_test_verify(c bigint)

0 commit comments

Comments
 (0)