Skip to content

Conversation

@CTTY
Copy link
Contributor

@CTTY CTTY commented Oct 20, 2025

Which issue does this PR close?

What changes are included in this PR?

Are these changes tested?

/// - A partitioned table is provided without a partition splitter
pub fn new(
writer: W,
partition_splitter: Option<RecordBatchPartitionSplitter>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may make more sense if we just construct partition_splitter within new?

Looking at Splitter::new, it takes a input_schema: ArrowSchemaRef, but in reality we can get the input_schema directly from the RecordBatch. If we initialize the partition splitter lazily then we don't need to ask users to build a partition_splitter themselves

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it's a good idea to change RecordBatchPartitionSplitter to PartitionSplitter<I = RecordBatch>? This way the TaskWriter implementation can also be generic. Haven't explored too much on that front tho. What I have in mind rn:

trait PartitionSplitter<I = DefaultInput> {
     fn split(&self, input: I) -> Result<Vec<(PartitionKey, I)>>
}

impl<I: PositionalDeletes> PartitionSplitter<I> for PositionalDeletePartitionSplitter

cc @ZENOTME

/// 2. Split the input record batch into multiple record batches based on the partitioned record batch.
// # TODO
// Remove this after partition writer supported.
#[allow(dead_code)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this


// # TODO
// Remove this after partition writer supported.
#[allow(dead_code)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this

@CTTY CTTY force-pushed the ctty/task-writer branch from e875e8e to f4b72ef Compare October 23, 2025 03:05
Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are on the right track. I left some comments, and we need to split them into smaller prs.

schema: SchemaRef,
partition_spec: PartitionSpecRef,
projector: RecordBatchProjector,
projector: Option<RecordBatchProjector>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is somehow ugly. The splitter could be split into two parts:

  1. Calculate partition value.
  2. Split record batch according to partition value.

We could abstract out the process of calculating partition value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can reuse the PartititonValueCalculator here.

Are you suggesting that we should have "calculate" and "split" two functions within the splitter? I think that way it would be hard for people to tell when should they call calculate before calling split.

Maybe this would look better?

impl RecordBatchPartitionSplitter {
 pub fn new(
        iceberg_schema: SchemaRef,
        partition_spec: PartitionSpecRef,
       // if some, then calculate the partition value, otherwise use `_partition`
        calculator: Option<PartitionValueCalculator>, 
    )
}
 

Comment on lines +85 to +87
Fanout(FanoutWriter<B>),
/// Writer for partitioned tables with sorted data (maintains single active writer)
Clustered(ClusteredWriter<B>),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could simplify this as

Partitioned {
   splitter: RecordBatchSplitter,
   partitioned_writer: Arc<dyn PartitionedWriter>
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add TaskWriter

2 participants