-
Notifications
You must be signed in to change notification settings - Fork 337
feat(io): UnpartitionedWriter + TaskWriter #1769
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| /// - A partitioned table is provided without a partition splitter | ||
| pub fn new( | ||
| writer: W, | ||
| partition_splitter: Option<RecordBatchPartitionSplitter>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may make more sense if we just construct partition_splitter within new?
Looking at Splitter::new, it takes a input_schema: ArrowSchemaRef, but in reality we can get the input_schema directly from the RecordBatch. If we initialize the partition splitter lazily then we don't need to ask users to build a partition_splitter themselves
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe it's a good idea to change RecordBatchPartitionSplitter to PartitionSplitter<I = RecordBatch>? This way the TaskWriter implementation can also be generic. Haven't explored too much on that front tho. What I have in mind rn:
trait PartitionSplitter<I = DefaultInput> {
fn split(&self, input: I) -> Result<Vec<(PartitionKey, I)>>
}
impl<I: PositionalDeletes> PartitionSplitter<I> for PositionalDeletePartitionSplitter
cc @ZENOTME
| /// 2. Split the input record batch into multiple record batches based on the partitioned record batch. | ||
| // # TODO | ||
| // Remove this after partition writer supported. | ||
| #[allow(dead_code)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this
|
|
||
| // # TODO | ||
| // Remove this after partition writer supported. | ||
| #[allow(dead_code)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this
e875e8e to
f4b72ef
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are on the right track. I left some comments, and we need to split them into smaller prs.
| schema: SchemaRef, | ||
| partition_spec: PartitionSpecRef, | ||
| projector: RecordBatchProjector, | ||
| projector: Option<RecordBatchProjector>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is somehow ugly. The splitter could be split into two parts:
- Calculate partition value.
- Split record batch according to partition value.
We could abstract out the process of calculating partition value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can reuse the PartititonValueCalculator here.
Are you suggesting that we should have "calculate" and "split" two functions within the splitter? I think that way it would be hard for people to tell when should they call calculate before calling split.
Maybe this would look better?
impl RecordBatchPartitionSplitter {
pub fn new(
iceberg_schema: SchemaRef,
partition_spec: PartitionSpecRef,
// if some, then calculate the partition value, otherwise use `_partition`
calculator: Option<PartitionValueCalculator>,
)
}
| Fanout(FanoutWriter<B>), | ||
| /// Writer for partitioned tables with sorted data (maintains single active writer) | ||
| Clustered(ClusteredWriter<B>), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could simplify this as
Partitioned {
splitter: RecordBatchSplitter,
partitioned_writer: Arc<dyn PartitionedWriter>
}
Which issue does this PR close?
What changes are included in this PR?
Are these changes tested?