Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions datafusion/datasource/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ pub struct MemorySourceConfig {
projected_schema: SchemaRef,
/// Optional projection
projection: Option<Vec<usize>>,
/// Sort information: one or more equivalent orderings
sort_information: Vec<LexOrdering>,
/// Sort information of `partitions`: one or more equivalent orderings
underlying_sort_information: Vec<LexOrdering>,
/// Sort information after the optional projection is applied
projected_sort_information: Option<Vec<LexOrdering>>,
/// if partition sizes should be displayed
show_sizes: bool,
/// The maximum number of records to read from this plan. If `None`,
Expand Down Expand Up @@ -100,7 +102,7 @@ impl DataSource for MemorySourceConfig {
self.partitions.iter().map(|b| b.len()).collect();

let output_ordering = self
.sort_information
.sort_information()
.first()
.map(|output_ordering| format!(", output_ordering={output_ordering}"))
.unwrap_or_default();
Expand Down Expand Up @@ -186,7 +188,7 @@ impl DataSource for MemorySourceConfig {
fn eq_properties(&self) -> EquivalenceProperties {
EquivalenceProperties::new_with_orderings(
Arc::clone(&self.projected_schema),
self.sort_information.clone(),
self.sort_information().to_vec(),
)
}

Expand Down Expand Up @@ -225,12 +227,17 @@ impl DataSource for MemorySourceConfig {
self.projection().as_ref().unwrap_or(&all_projections),
);

MemorySourceConfig::try_new_exec(
MemorySourceConfig::try_new(
self.partitions(),
self.original_schema(),
Some(new_projections),
)
.map(|e| e as _)
.and_then(|memory_source| {
let s = memory_source.try_with_sort_information(
self.underlying_sort_information.clone(),
)?;
Ok(Arc::new(DataSourceExec::new(Arc::new(s))) as _)
})
})
.transpose()
}
Expand All @@ -245,12 +252,14 @@ impl MemorySourceConfig {
projection: Option<Vec<usize>>,
) -> Result<Self> {
let projected_schema = project_schema(&schema, projection.as_ref())?;
let projected_sort_information = projection.as_ref().map(|_| vec![]);
Ok(Self {
partitions: partitions.to_vec(),
schema,
projected_schema,
projection,
sort_information: vec![],
underlying_sort_information: vec![],
projected_sort_information,
show_sizes: true,
fetch: None,
})
Expand Down Expand Up @@ -350,7 +359,8 @@ impl MemorySourceConfig {
schema: Arc::clone(&schema),
projected_schema: Arc::clone(&schema),
projection: None,
sort_information: vec![],
underlying_sort_information: vec![],
projected_sort_information: None,
show_sizes: true,
fetch: None,
};
Expand Down Expand Up @@ -384,9 +394,13 @@ impl MemorySourceConfig {
self.show_sizes
}

/// Ref to sort information
/// Ref to output sort information, after projection is applied
pub fn sort_information(&self) -> &[LexOrdering] {
&self.sort_information
if let Some(si) = &self.projected_sort_information {
si
} else {
&self.underlying_sort_information
}
}

/// A memory table can be ordered by multiple expressions simultaneously.
Expand All @@ -410,7 +424,7 @@ impl MemorySourceConfig {
/// also applied to the given `sort_information`.
pub fn try_with_sort_information(
mut self,
mut sort_information: Vec<LexOrdering>,
sort_information: Vec<LexOrdering>,
) -> Result<Self> {
// All sort expressions must refer to the original schema
let fields = self.schema.fields();
Expand Down Expand Up @@ -442,15 +456,16 @@ impl MemorySourceConfig {
ProjectionMapping::try_new(proj_exprs, &base_schema)?;
let base_eqp = EquivalenceProperties::new_with_orderings(
Arc::clone(&base_schema),
sort_information,
sort_information.clone(),
);
let proj_eqp =
base_eqp.project(&projection_mapping, Arc::clone(&self.projected_schema));
let oeq_class: OrderingEquivalenceClass = proj_eqp.into();
sort_information = oeq_class.into();
self.projected_sort_information = Some(oeq_class.into());
} else {
self.projected_sort_information = None;
}

self.sort_information = sort_information;
self.underlying_sort_information = sort_information;
Ok(self)
}

Expand Down