Skip to content

KAFKA-19632: Handle overlap batch on partition re-assignment #20395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from

Conversation

apoorvmittal10
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 commented Aug 21, 2025

The PR fixes the batch alignment issue when partitions are re-assigned.
During initial read of state the batches can be broken arbitrarily. Say
the start offset is 10 and cache contains [15-18] batch during
initialization. When fetch happens at offset 10 and say the fetched
batch contain 10 records i.e. [10-19] then correct batches will be
created if maxFetchRecords is greater than 10. But if maxFetchRecords is
less than 10 then last offset of batch is determined, which will be 19.
Hence acquire method will incorrectly create a batch of [10-19] while
[15-18] already exists. Below check is required t resolve the issue:

if (isInitialReadGapOffsetWindowActive() && lastAcquiredOffset >
lastOffset) {
     lastAcquiredOffset = lastOffset;
}

While testing with other cases, other issues were determined while
updating the gap offset, acquire of records prior share partitions end
offset and determining next fetch offset with compacted topics. All
these issues can arise mainly during initial read window after partition
re-assignment.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker KIP-932 Queues for Kafka labels Aug 21, 2025
@apoorvmittal10 apoorvmittal10 added ci-approved and removed triage PRs from the community labels Aug 21, 2025
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I've done an initial review and the testing looks comprehensive. I want to take another pass on the SharePartition code.


// Create a single batch record that covers the entire range from 10 to 30 of initial read gap.
// The records in the batch are from 10 to 49.
MemoryRecords records = memoryRecords(40, 10);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: One thing that makes this a bit harder than necessary to review is the inconsistency in the conventions about the offset ranges. For example, this could read memoryRecords(10,49) which would align with the firstOffset, lastOffset convention used in the persister. Not something that needs to be fixed on this PR, but potentially something to refactor later on.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants