Skip to content

KAFKA-19632: Handle overlap batch on partition re-assignment #20395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 73 additions & 12 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,16 @@ public long nextFetchOffset() {
nextFetchOffset = gapStartOffset;
break;
}
gapStartOffset = entry.getValue().lastOffset() + 1;
// If the gapStartOffset is already past the last offset of the in-flight batch,
// then do not consider this batch for finding the next fetch offset. For example,
// consider during initialization, the initialReadGapOffset is set to 5 and the
// first cached batch is 15-18. First read will happen at offset 5 and say the data
// fetched is [5-6], now next fetch offset should be 7. This works fine but say
// subsequent read returns batch 8-11, and the gapStartOffset will be 12. Without
// the max check, the next fetch offset returned will be 7 which is incorrect.
// The natural gaps for which no data is available shall be considered hence
// take the max of the gapStartOffset and the last offset of the in-flight batch.
gapStartOffset = Math.max(entry.getValue().lastOffset() + 1, gapStartOffset);
}

// Check if the state is maintained per offset or batch. If the offsetState
Expand Down Expand Up @@ -699,16 +708,33 @@ public ShareAcquiredRecords acquire(

// Find the floor batch record for the request batch. The request batch could be
// for a subset of the in-flight batch i.e. cached batch of offset 10-14 and request batch
// of 12-13. Hence, floor entry is fetched to find the sub-map.
// of 12-13. Hence, floor entry is fetched to find the sub-map. Secondly, when the share
// partition is initialized with persisted state, the start offset might be moved to a later
// offset. In such case, the first batch base offset might be less than the start offset.
Map.Entry<Long, InFlightBatch> floorEntry = cachedState.floorEntry(baseOffset);
// We might find a batch with floor entry but not necessarily that batch has an overlap,
// if the request batch base offset is ahead of last offset from floor entry i.e. cached
// batch of 10-14 and request batch of 15-18, though floor entry is found but no overlap.
// Such scenario will be handled in the next step when considering the subMap. However,
// if the floor entry is found and the request batch base offset is within the floor entry
// then adjust the base offset to the floor entry so that acquire method can still work on
// previously cached batch boundaries.
if (floorEntry != null && floorEntry.getValue().lastOffset() >= baseOffset) {
if (floorEntry == null) {
// The initialize method check that there couldn't be any batches prior to the start offset.
// And once share partition starts fetching records, it will always fetch records, at least,
// from the start offset, but there could be cases where the batch base offset is prior
// to the start offset. This can happen when the share partition is initialized with
// partial persisted state and moved start offset i.e. start offset is not the batch's
// first offset. In such case, we need to adjust the base offset to the start offset.
// It's safe to adjust the base offset to the start offset when there isn't any floor
// i.e. no cached batches available prior to the request batch base offset. Hence,
// check for the floor entry and adjust the base offset accordingly.
if (baseOffset < startOffset) {
log.info("Adjusting base offset for the fetch as it's prior to start offset: {}-{}"
+ "from {} to {}", groupId, topicIdPartition, baseOffset, startOffset);
baseOffset = startOffset;
}
} else if (floorEntry.getValue().lastOffset() >= baseOffset) {
// We might find a batch with floor entry but not necessarily that batch has an overlap,
// if the request batch base offset is ahead of last offset from floor entry i.e. cached
// batch of 10-14 and request batch of 15-18, though floor entry is found but no overlap.
// Such scenario will be handled in the next step when considering the subMap. However,
// if the floor entry is found and the request batch base offset is within the floor entry
// then adjust the base offset to the floor entry so that acquire method can still work on
// previously cached batch boundaries.
baseOffset = floorEntry.getKey();
}
// Validate if the fetch records are already part of existing batches and if available.
Expand Down Expand Up @@ -755,7 +781,8 @@ public ShareAcquiredRecords acquire(
result.addAll(shareAcquiredRecords.acquiredRecords());
acquiredCount += shareAcquiredRecords.count();
}
// Set nextBatchStartOffset as the last offset of the current in-flight batch + 1
// Set nextBatchStartOffset as the last offset of the current in-flight batch + 1.
// Hence, after the loop iteration the next gap can be considered.
maybeGapStartOffset = inFlightBatch.lastOffset() + 1;
// If the acquired count is equal to the max fetch records then break the loop.
if (acquiredCount >= maxRecordsToAcquire) {
Expand Down Expand Up @@ -1057,10 +1084,24 @@ private Optional<Throwable> releaseAcquiredRecordsForCompleteBatch(String member
/**
* Updates the cached state, start and end offsets of the share partition as per the new log
* start offset. The method is called when the log start offset is moved for the share partition.
* <p>
* This method only archives the available records in the cached state that are before the new log
* start offset. It does not persist the archived state batches to the persister, rather it
* updates the cached state and offsets to reflect the new log start offset. The state in persister
* will be updated lazily during the acknowledge/release records API calls or acquisition lock timeout.
* <p>
* The AVAILABLE state records can either have ongoing state transition or not. Hence, the archive
* records method will update the state of the records to ARCHIVED and set the terminal state flag
* hence if the transition is rolled back then the state will not be AVAILABLE again. However,
* the ACQUIRED state records will not be archived as they are still in-flight and acknowledge
* method also do not allow the state update for any offsets post the log start offset, hence those
* records will only be archived once acquisition lock timeout occurs.
*
* @param logStartOffset The new log start offset.
*/
void updateCacheAndOffsets(long logStartOffset) {
log.debug("Updating cached states for share partition: {}-{} with new log start offset: {}",
groupId, topicIdPartition, logStartOffset);
lock.writeLock().lock();
try {
if (logStartOffset <= startOffset) {
Expand Down Expand Up @@ -1432,7 +1473,11 @@ private void maybeUpdateReadGapFetchOffset(long offset) {
lock.writeLock().lock();
try {
if (initialReadGapOffset != null) {
if (initialReadGapOffset.endOffset() == endOffset) {
// When last cached batch for initial read gap window is acquired, then endOffset is
// same as the initialReadGapOffset's endOffset, but the gap offset to update is
// endOffset + 1. Hence, do not update the gap start offset if the request offset
// is ahead of the endOffset.
if (initialReadGapOffset.endOffset() == endOffset && offset <= initialReadGapOffset.endOffset()) {
initialReadGapOffset.gapStartOffset(offset);
} else {
// The initial read gap offset is not valid anymore as the end offset has moved
Expand All @@ -1445,6 +1490,15 @@ private void maybeUpdateReadGapFetchOffset(long offset) {
}
}

/**
* The method calculates the last offset and maximum records to acquire. The adjustment is needed
* to ensure that the records acquired do not exceed the maximum in-flight messages limit.
*
* @param fetchOffset The offset from which the records are fetched.
* @param maxFetchRecords The maximum number of records to acquire.
* @param lastOffset The last offset to acquire records to, which is the last offset of the fetched batch.
* @return LastOffsetAndMaxRecords object, containing the last offset to acquire and the maximum records to acquire.
*/
private LastOffsetAndMaxRecords lastOffsetAndMaxRecordsToAcquire(long fetchOffset, int maxFetchRecords, long lastOffset) {
// There can always be records fetched exceeding the max in-flight messages limit. Hence,
// we need to check if the share partition has reached the max in-flight messages limit
Expand Down Expand Up @@ -1512,6 +1566,13 @@ private ShareAcquiredRecords acquireNewBatchRecords(
// which falls under the max messages limit. As the max fetch records is the soft
// limit, the last offset can be higher than the max messages.
lastAcquiredOffset = lastOffsetFromBatchWithRequestOffset(batches, firstAcquiredOffset + maxFetchRecords - 1);
// If the initial read gap offset window is active then it's not guaranteed that the
// batches align on batch boundaries. Hence, reset to last offset itself if the batch's
// last offset is greater than the last offset for acquisition, else there could be
// a situation where the batch overlaps with the initial read gap offset window batch.
if (isInitialReadGapOffsetWindowActive() && lastAcquiredOffset > lastOffset) {
lastAcquiredOffset = lastOffset;
}
}

// Create batches of acquired records.
Expand Down
Loading