Skip to content

Commit c715bb7

Browse files
truepelegaryrussell
authored andcommitted
GH-2542: FallbackBatchErrorHandler Improvement
Resolves #2542 Fix retry logic causing consumer leaving the group **Short problem description**: consumer leaves the group when combined processing+backoff time is higher than `max.poll.interval.ms`. **Root cause**: The retry logic in `ErrorHandlingUtils` does not call `poll()` (on the paused consumer) before re-trying the listener runnable, it calls `poll()` only before backing off. So if backoffInterval + duration of the following retried execution is longer than `max.poll.interval` - consumer leaves the group. **Solution**: Amend `ErrorHandlingUtils` to have an extra call to `consumer.poll` right before retrying the listener runnable. cleanup fix import style fix import style cleanup
1 parent 8cab408 commit c715bb7

File tree

2 files changed

+10
-6
lines changed

2 files changed

+10
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2022 the original author or authors.
2+
* Copyright 2021-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,6 +37,7 @@
3737
* Utilities for error handling.
3838
*
3939
* @author Gary Russell
40+
* @author Andrii Pelesh
4041
* @since 2.8
4142
*
4243
*/
@@ -96,6 +97,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
9697
if (!container.isRunning()) {
9798
throw new KafkaException("Container stopped during retries");
9899
}
100+
consumer.poll(Duration.ZERO);
99101
try {
100102
invokeListener.run();
101103
return;

spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -82,7 +82,7 @@ void recover() {
8282
assertThat(this.invoked).isEqualTo(3);
8383
assertThat(recovered).hasSize(2);
8484
verify(consumer).pause(any());
85-
verify(consumer, times(3)).poll(any());
85+
verify(consumer, times(2 * this.invoked)).poll(any());
8686
verify(consumer).resume(any());
8787
verify(consumer, times(2)).assignment();
8888
verifyNoMoreInteractions(consumer);
@@ -108,7 +108,7 @@ void successOnRetry() {
108108
assertThat(this.invoked).isEqualTo(1);
109109
assertThat(recovered).hasSize(0);
110110
verify(consumer).pause(any());
111-
verify(consumer).poll(any());
111+
verify(consumer, times(2)).poll(any());
112112
verify(consumer).resume(any());
113113
verify(consumer, times(2)).assignment();
114114
verifyNoMoreInteractions(consumer);
@@ -139,7 +139,7 @@ void recoveryFails() {
139139
assertThat(this.invoked).isEqualTo(3);
140140
assertThat(recovered).hasSize(1);
141141
verify(consumer).pause(any());
142-
verify(consumer, times(3)).poll(any());
142+
verify(consumer, times(2 * this.invoked)).poll(any());
143143
verify(consumer).resume(any());
144144
verify(consumer, times(2)).assignment();
145145
verify(consumer).seek(new TopicPartition("foo", 0), 0L);
@@ -208,9 +208,11 @@ void rePauseOnRebalance() {
208208
inOrder.verify(container).publishConsumerPausedEvent(map.keySet(), "For batch retry");
209209
inOrder.verify(consumer).poll(any());
210210
inOrder.verify(consumer).pause(any());
211+
inOrder.verify(consumer).poll(any());
212+
inOrder.verify(consumer).pause(any());
211213
inOrder.verify(consumer).resume(any());
212214
inOrder.verify(container).publishConsumerResumedEvent(map.keySet());
213-
verify(consumer, times(3)).assignment();
215+
verify(consumer, times(4)).assignment();
214216
verifyNoMoreInteractions(consumer);
215217
assertThat(pubPauseCalled.get()).isTrue();
216218
}

0 commit comments

Comments
 (0)