Skip to content

Commit d41b564

Browse files
KAFKA-19632: Handle overlap batch on partition re-assignment
1 parent eeb6a0d commit d41b564

File tree

3 files changed

+742
-12
lines changed

3 files changed

+742
-12
lines changed

core/src/main/java/kafka/server/share/SharePartition.java

Lines changed: 73 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,16 @@ public long nextFetchOffset() {
572572
nextFetchOffset = gapStartOffset;
573573
break;
574574
}
575-
gapStartOffset = entry.getValue().lastOffset() + 1;
575+
// If the gapStartOffset is already past the last offset of the in-flight batch,
576+
// then do not consider this batch for finding the next fetch offset. For example,
577+
// consider during initialization, the initialReadGapOffset is set to 5 and the
578+
// first cached batch is 15-18. First read will happen at offset 5 and say the data
579+
// fetched is [5-6], now next fetch offset should be 7. This works fine but say
580+
// subsequent read returns batch 8-11, and the gapStartOffset will be 12. Without
581+
// the max check, the next fetch offset returned will be 7 which is incorrect.
582+
// The natural gaps for which no data is available shall be considered hence
583+
// take the max of the gapStartOffset and the last offset of the in-flight batch.
584+
gapStartOffset = Math.max(entry.getValue().lastOffset() + 1, gapStartOffset);
576585
}
577586

578587
// Check if the state is maintained per offset or batch. If the offsetState
@@ -699,16 +708,33 @@ public ShareAcquiredRecords acquire(
699708

700709
// Find the floor batch record for the request batch. The request batch could be
701710
// for a subset of the in-flight batch i.e. cached batch of offset 10-14 and request batch
702-
// of 12-13. Hence, floor entry is fetched to find the sub-map.
711+
// of 12-13. Hence, floor entry is fetched to find the sub-map. Secondly, when the share
712+
// partition is initialized with persisted state, the start offset might be moved to a later
713+
// offset. In such case, the first batch base offset might be less than the start offset.
703714
Map.Entry<Long, InFlightBatch> floorEntry = cachedState.floorEntry(baseOffset);
704-
// We might find a batch with floor entry but not necessarily that batch has an overlap,
705-
// if the request batch base offset is ahead of last offset from floor entry i.e. cached
706-
// batch of 10-14 and request batch of 15-18, though floor entry is found but no overlap.
707-
// Such scenario will be handled in the next step when considering the subMap. However,
708-
// if the floor entry is found and the request batch base offset is within the floor entry
709-
// then adjust the base offset to the floor entry so that acquire method can still work on
710-
// previously cached batch boundaries.
711-
if (floorEntry != null && floorEntry.getValue().lastOffset() >= baseOffset) {
715+
if (floorEntry == null) {
716+
// The initialize method check that there couldn't be any batches prior to the start offset.
717+
// And once share partition starts fetching records, it will always fetch records, at least,
718+
// from the start offset, but there could be cases where the batch base offset is prior
719+
// to the start offset. This can happen when the share partition is initialized with
720+
// partial persisted state and moved start offset i.e. start offset is not the batch's
721+
// first offset. In such case, we need to adjust the base offset to the start offset.
722+
// It's safe to adjust the base offset to the start offset when there isn't any floor
723+
// i.e. no cached batches available prior to the request batch base offset. Hence,
724+
// check for the floor entry and adjust the base offset accordingly.
725+
if (baseOffset < startOffset) {
726+
log.info("Adjusting base offset for the fetch as it's prior to start offset: {}-{}"
727+
+ "from {} to {}", groupId, topicIdPartition, baseOffset, startOffset);
728+
baseOffset = startOffset;
729+
}
730+
} else if (floorEntry.getValue().lastOffset() >= baseOffset) {
731+
// We might find a batch with floor entry but not necessarily that batch has an overlap,
732+
// if the request batch base offset is ahead of last offset from floor entry i.e. cached
733+
// batch of 10-14 and request batch of 15-18, though floor entry is found but no overlap.
734+
// Such scenario will be handled in the next step when considering the subMap. However,
735+
// if the floor entry is found and the request batch base offset is within the floor entry
736+
// then adjust the base offset to the floor entry so that acquire method can still work on
737+
// previously cached batch boundaries.
712738
baseOffset = floorEntry.getKey();
713739
}
714740
// Validate if the fetch records are already part of existing batches and if available.
@@ -755,7 +781,8 @@ public ShareAcquiredRecords acquire(
755781
result.addAll(shareAcquiredRecords.acquiredRecords());
756782
acquiredCount += shareAcquiredRecords.count();
757783
}
758-
// Set nextBatchStartOffset as the last offset of the current in-flight batch + 1
784+
// Set nextBatchStartOffset as the last offset of the current in-flight batch + 1.
785+
// Hence, after the loop iteration the next gap can be considered.
759786
maybeGapStartOffset = inFlightBatch.lastOffset() + 1;
760787
// If the acquired count is equal to the max fetch records then break the loop.
761788
if (acquiredCount >= maxRecordsToAcquire) {
@@ -1057,10 +1084,24 @@ private Optional<Throwable> releaseAcquiredRecordsForCompleteBatch(String member
10571084
/**
10581085
* Updates the cached state, start and end offsets of the share partition as per the new log
10591086
* start offset. The method is called when the log start offset is moved for the share partition.
1087+
* <p>
1088+
* This method only archives the available records in the cached state that are before the new log
1089+
* start offset. It does not persist the archived state batches to the persister, rather it
1090+
* updates the cached state and offsets to reflect the new log start offset. The state in persister
1091+
* will be updated lazily during the acknowledge/release records API calls or acquisition lock timeout.
1092+
* <p>
1093+
* The AVAILABLE state records can either have ongoing state transition or not. Hence, the archive
1094+
* records method will update the state of the records to ARCHIVED and set the terminal state flag
1095+
* hence if the transition is rolled back then the state will not be AVAILABLE again. However,
1096+
* the ACQUIRED state records will not be archived as they are still in-flight and acknowledge
1097+
* method also do not allow the state update for any offsets post the log start offset, hence those
1098+
* records will only be archived once acquisition lock timeout occurs.
10601099
*
10611100
* @param logStartOffset The new log start offset.
10621101
*/
10631102
void updateCacheAndOffsets(long logStartOffset) {
1103+
log.debug("Updating cached states for share partition: {}-{} with new log start offset: {}",
1104+
groupId, topicIdPartition, logStartOffset);
10641105
lock.writeLock().lock();
10651106
try {
10661107
if (logStartOffset <= startOffset) {
@@ -1432,7 +1473,11 @@ private void maybeUpdateReadGapFetchOffset(long offset) {
14321473
lock.writeLock().lock();
14331474
try {
14341475
if (initialReadGapOffset != null) {
1435-
if (initialReadGapOffset.endOffset() == endOffset) {
1476+
// When last cached batch for initial read gap window is acquired, then endOffset is
1477+
// same as the initialReadGapOffset's endOffset, but the gap offset to update is
1478+
// endOffset + 1. Hence, do not update the gap start offset if the request offset
1479+
// is ahead of the endOffset.
1480+
if (initialReadGapOffset.endOffset() == endOffset && offset <= initialReadGapOffset.endOffset()) {
14361481
initialReadGapOffset.gapStartOffset(offset);
14371482
} else {
14381483
// The initial read gap offset is not valid anymore as the end offset has moved
@@ -1445,6 +1490,15 @@ private void maybeUpdateReadGapFetchOffset(long offset) {
14451490
}
14461491
}
14471492

1493+
/**
1494+
* The method calculates the last offset and maximum records to acquire. The adjustment is needed
1495+
* to ensure that the records acquired do not exceed the maximum in-flight messages limit.
1496+
*
1497+
* @param fetchOffset The offset from which the records are fetched.
1498+
* @param maxFetchRecords The maximum number of records to acquire.
1499+
* @param lastOffset The last offset to acquire records to, which is the last offset of the fetched batch.
1500+
* @return LastOffsetAndMaxRecords object, containing the last offset to acquire and the maximum records to acquire.
1501+
*/
14481502
private LastOffsetAndMaxRecords lastOffsetAndMaxRecordsToAcquire(long fetchOffset, int maxFetchRecords, long lastOffset) {
14491503
// There can always be records fetched exceeding the max in-flight messages limit. Hence,
14501504
// we need to check if the share partition has reached the max in-flight messages limit
@@ -1512,6 +1566,13 @@ private ShareAcquiredRecords acquireNewBatchRecords(
15121566
// which falls under the max messages limit. As the max fetch records is the soft
15131567
// limit, the last offset can be higher than the max messages.
15141568
lastAcquiredOffset = lastOffsetFromBatchWithRequestOffset(batches, firstAcquiredOffset + maxFetchRecords - 1);
1569+
// If the initial read gap offset window is active then it's not guaranteed that the
1570+
// batches align on batch boundaries. Hence, reset to last offset itself if the batch's
1571+
// last offset is greater than the last offset for acquisition, else there could be
1572+
// a situation where the batch overlaps with the initial read gap offset window batch.
1573+
if (isInitialReadGapOffsetWindowActive() && lastAcquiredOffset > lastOffset) {
1574+
lastAcquiredOffset = lastOffset;
1575+
}
15151576
}
15161577

15171578
// Create batches of acquired records.

0 commit comments

Comments
 (0)