Skip to content

Commit 178aedf

Browse files
committed
Add comprehensive acknowledgment support for Kafka share consumers
Implement explicit and implicit acknowledgment modes for share consumer containers, enabling fine-grained control over record processing outcomes with ACCEPT, RELEASE, and REJECT acknowledgment types. - Add ShareAcknowledgment interface with ACCEPT/RELEASE/REJECT support - Add ShareAcknowledgmentException for acknowledgment failures - Implement ShareAcknowledgmentMode enum (IMPLICIT/EXPLICIT) in ContainerProperties - Add poll-level acknowledgment constraints in explicit mode - Add ShareConsumerAwareMessageListener for ShareConsumer access - Add AcknowledgingShareConsumerAwareMessageListener for explicit acknowledgment - Add ShareRecordMessagingMessageListenerAdapter for KafkaListener integration - Use non-polymorphic onShareRecord method names to avoid regression issues with existing listener infrastructure and maintain clear API separation - Enhanced ShareKafkaMessageListenerContainer with acknowledgment tracking - Automatic error handling with REJECT acknowledgment on exceptions - Poll blocking in explicit mode until all records acknowledged - Support for mixed acknowledgment patterns within single poll - Auto-detection of ShareKafkaListenerContainerFactory for share consumer endpoints - Validation preventing batch listeners with share consumers - Factory-level and container-level acknowledgment mode configuration - Message converter extensions for ShareAcknowledgment parameter injection - Comprehensive integration tests covering all acknowledgment scenarios - Constraint tests validating poll-level acknowledgment requirements - Unit tests for container behavior and listener dispatching - Updated documentation with acknowledgment examples - Implicit: Records auto-acknowledged as ACCEPT on success, REJECT on error - Explicit: Application must manually acknowledge each record before next poll - Explicit mode blocks subsequent polls until all records acknowledged - Prevents message loss and ensures proper acknowledgment ordering - Concurrent acknowledgment attempts properly handled with IllegalStateException - Processing exceptions trigger automatic REJECT acknowledgment - Acknowledgment failures reset state and throw ShareAcknowledgmentException - Container continues processing after individual record failures - ShareAcknowledgment parameter injection follows existing Spring Kafka patterns - Non-polymorphic listener method names (onShareRecord vs onMessage) prevent potential conflicts with existing listener infrastructure and ensure clear separation between regular and share consumer listener contracts - Factory and container level configuration options provide flexibility This implementation provides acknowledgment semantics for Kafka share groups while maintaining backward compatibility with existing implicit acknowledgment behavior. Signed-off-by: Soby Chacko <[email protected]>
1 parent 2704f85 commit 178aedf

21 files changed

+2764
-101
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc

Lines changed: 258 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -201,25 +201,6 @@ public class ShareMessageListener {
201201
}
202202
----
203203

204-
[[share-group-configuration]]
205-
== Share Group Configuration
206-
207-
Share groups require specific broker configuration to function properly.
208-
For testing with embedded Kafka, use:
209-
210-
[source,java]
211-
----
212-
@EmbeddedKafka(
213-
topics = {"my-queue-topic"},
214-
brokerProperties = {
215-
"unstable.api.versions.enable=true",
216-
"group.coordinator.rebalance.protocols=classic,share",
217-
"share.coordinator.state.topic.replication.factor=1",
218-
"share.coordinator.state.topic.min.isr=1"
219-
}
220-
)
221-
----
222-
223204
[[share-group-offset-reset]]
224205
=== Share Group Offset Reset
225206

@@ -248,8 +229,257 @@ private void configureShareGroup(String bootstrapServers, String groupId) throws
248229
[[share-record-acknowledgment]]
249230
== Record Acknowledgment
250231

251-
Currently, share consumers automatically acknowledge records with `AcknowledgeType.ACCEPT` after successful processing.
252-
More sophisticated acknowledgment patterns will be added in future versions.
232+
Share consumers support two acknowledgment modes that control how records are acknowledged after processing.
233+
234+
[[share-acknowledgment-modes]]
235+
=== Acknowledgment Modes
236+
237+
Share containers support two acknowledgment modes:
238+
[[share-implicit-acknowledgment]]
239+
240+
==== Implicit Acknowledgment (Default)
241+
In implicit mode, records are automatically acknowledged based on processing outcome:
242+
243+
Successful processing: Records are acknowledged as `ACCEPT`
244+
Processing errors: Records are acknowledged as `REJECT`
245+
246+
[source,java]
247+
----
248+
@Bean
249+
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
250+
ShareConsumerFactory<String, String> shareConsumerFactory) {
251+
ShareKafkaListenerContainerFactory<String, String> factory =
252+
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
253+
// Implicit mode is the default
254+
factory.getContainerProperties().setShareAcknowledgmentMode(
255+
ContainerProperties.ShareAcknowledgmentMode.IMPLICIT);
256+
257+
return factory;
258+
}
259+
----
260+
261+
[[share-explicit-acknowledgment]]
262+
==== Explicit Acknowledgment
263+
264+
In explicit mode, the application must manually acknowledge each record using the provided ShareAcknowledgment:
265+
266+
[source,java]
267+
----
268+
@Bean
269+
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
270+
Map<String, Object> props = new HashMap<>();
271+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
272+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
273+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
274+
props.put("share.acknowledgement.mode", "explicit");
275+
return new DefaultShareConsumerFactory<>(props);
276+
}
277+
278+
@Bean
279+
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
280+
ShareConsumerFactory<String, String> explicitShareConsumerFactory) {
281+
return new ShareKafkaListenerContainerFactory<>(explicitShareConsumerFactory);
282+
}
283+
----
284+
285+
[[share-acknowledgment-types]]
286+
=== Acknowledgment Types
287+
288+
Share consumers support three acknowledgment types:
289+
290+
`ACCEPT`: Record processed successfully, mark as completed
291+
`RELEASE`: Temporary failure, make record available for redelivery
292+
`REJECT`: Permanent failure, do not retry
293+
294+
[[share-acknowledgment-api]]
295+
=== ShareAcknowledgment API
296+
297+
The `ShareAcknowledgment` interface provides methods for explicit acknowledgment:
298+
299+
[source,java]
300+
----
301+
public interface ShareAcknowledgment {
302+
void acknowledge(AcknowledgeType type);
303+
void acknowledge(); // Convenience method for ACCEPT
304+
void release(); // Convenience method for RELEASE
305+
void reject(); // Convenience method for REJECT
306+
boolean isAcknowledged();
307+
AcknowledgeType getAcknowledgmentType();
308+
}
309+
----
310+
311+
[[share-listener-interfaces]]
312+
=== Listener Interfaces
313+
314+
Share consumers support specialized listener interfaces for different use cases:
315+
316+
[[share-basic-listener]]
317+
==== Basic Message Listener
318+
319+
Use the standard MessageListener for simple cases:
320+
[source,java]
321+
----
322+
@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory")
323+
public void listen(ConsumerRecord<String, String> record) {
324+
System.out.println("Received: " + record.value());
325+
// Automatically acknowledged in implicit mode
326+
}
327+
----
328+
329+
[[share-consumer-aware-listener]]
330+
==== ShareConsumerAwareMessageListener
331+
332+
Access the ShareConsumer instance for advanced operations:
333+
334+
[source,java]
335+
----
336+
@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory")
337+
public void listen(ConsumerRecord<String, String> record, ShareConsumer<?, ?> consumer) {
338+
System.out.println("Received: " + record.value());
339+
// Access consumer metrics, etc.
340+
}
341+
----
342+
343+
[[share-acknowledging-listener]]
344+
==== AcknowledgingShareConsumerAwareMessageListener
345+
346+
Use explicit acknowledgment with full consumer access:
347+
348+
[source,java]
349+
----
350+
@Component
351+
public class ExplicitAckListener {
352+
@KafkaListener(
353+
topics = "my-topic",
354+
containerFactory = "explicitShareKafkaListenerContainerFactory"
355+
)
356+
public void listen(ConsumerRecord<String, String> record,
357+
ShareAcknowledgment acknowledgment,
358+
ShareConsumer<?, ?> consumer) {
359+
360+
try {
361+
processRecord(record);
362+
acknowledgment.acknowledge(); // ACCEPT
363+
} catch (RetryableException e) {
364+
acknowledgment.release(); // Will be redelivered
365+
} catch (Exception e) {
366+
acknowledgment.reject(); // Permanent failure
367+
}
368+
}
369+
370+
private void processRecord(ConsumerRecord<String, String> record) {
371+
// Business logic here
372+
}
373+
}
374+
----
375+
376+
[[share-acknowledgment-constraints]]
377+
=== Acknowledgment Constraints
378+
379+
In explicit acknowledgment mode, the container enforces important constraints:
380+
381+
Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged.
382+
One-time Acknowledgment: Each record can only be acknowledged once.
383+
Error Handling: If processing throws an exception, the record is automatically acknowledged as `REJECT`.
384+
385+
[WARNING]
386+
In explicit mode, failing to acknowledge records will block further message processing.
387+
Always ensure records are acknowledged in all code paths.
388+
389+
[[share-acknowledgment-examples]]
390+
=== Acknowledgment Examples
391+
392+
[[share-mixed-acknowledgment-example]]
393+
==== Mixed Acknowledgment Patterns
394+
395+
[source,java]
396+
----
397+
@KafkaListener(topics = "order-processing", containerFactory = "explicitShareKafkaListenerContainerFactory")
398+
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
399+
String orderId = record.key();
400+
String orderData = record.value();
401+
try {
402+
if (isValidOrder(orderData)) {
403+
if (processOrder(orderData)) {
404+
acknowledgment.acknowledge(); // Success - ACCEPT
405+
}
406+
else {
407+
acknowledgment.release(); // Temporary failure - retry later
408+
}
409+
}
410+
else {
411+
acknowledgment.reject(); // Invalid order - don't retry
412+
}
413+
}
414+
catch (Exception e) {
415+
// Exception automatically triggers REJECT
416+
throw e;
417+
}
418+
}
419+
----
420+
421+
[[share-conditional-acknowledgment-example]]
422+
==== Conditional Acknowledgment
423+
424+
[source,java]
425+
----
426+
@KafkaListener(topics = "data-validation", containerFactory = "explicitShareKafkaListenerContainerFactory")
427+
public void validateData(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
428+
ValidationResult result = validator.validate(record.value());
429+
switch (result.getStatus()) {
430+
case VALID:
431+
acknowledgment.acknowledge(AcknowledgeType.ACCEPT);
432+
break;
433+
case INVALID_RETRYABLE:
434+
acknowledgment.acknowledge(AcknowledgeType.RELEASE);
435+
break;
436+
case INVALID_PERMANENT:
437+
acknowledgment.acknowledge(AcknowledgeType.REJECT);
438+
break;
439+
}
440+
}
441+
----
442+
443+
[[share-acknowledgment-configuration]]
444+
=== Acknowledgment Mode Configuration
445+
446+
You can configure the acknowledgment mode at both the consumer factory and container levels:
447+
448+
[[share-factory-level-configuration]]
449+
==== Factory Level Configuration
450+
451+
[source,java]
452+
----
453+
@Bean
454+
public ShareConsumerFactory<String, String> explicitAckShareConsumerFactory() {
455+
Map<String, Object> props = new HashMap<>();
456+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
457+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
458+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
459+
// Configure explicit acknowledgment at the factory level
460+
props.put("share.acknowledgement.mode", "explicit");
461+
return new DefaultShareConsumerFactory<>(props);
462+
}
463+
----
464+
465+
[[share-container-level-configuration]]
466+
==== Container Level Configuration
467+
468+
[source,java]
469+
----
470+
@Bean
471+
public ShareKafkaListenerContainerFactory<String, String> customShareKafkaListenerContainerFactory(
472+
ShareConsumerFactory<String, String> shareConsumerFactory) {
473+
ShareKafkaListenerContainerFactory<String, String> factory =
474+
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
475+
476+
// Configure acknowledgment mode at container level
477+
factory.getContainerProperties().setShareAcknowledgmentMode(
478+
ContainerProperties.ShareAcknowledgmentMode.EXPLICIT);
479+
480+
return factory;
481+
}
482+
----
253483

254484
[[share-differences-from-regular-consumers]]
255485
== Differences from Regular Consumers
@@ -259,16 +489,19 @@ Share consumers differ from regular consumers in several key ways:
259489
1. **No Partition Assignment**: Share consumers cannot be assigned specific partitions
260490
2. **No Topic Patterns**: Share consumers do not support subscribing to topic patterns
261491
3. **Cooperative Consumption**: Multiple consumers in the same share group can consume from the same partitions simultaneously
262-
4. **Automatic Acknowledgment**: Records are automatically acknowledged after processing
492+
4. **Record-Level Acknowledgment**: Supports explicit acknowledgment with `ACCEPT`, `RELEASE`, and `REJECT` types
263493
5. **Different Group Management**: Share groups use different coordinator protocols
494+
6. **No Batch Processing**: Share consumers process records individually, not in batches
264495

265496
[[share-limitations-and-considerations]]
266497
== Limitations and Considerations
267498

268499
[[share-current-limitations]]
269500
=== Current Limitations
270501

271-
* **Early Access**: This feature is in early access and may change in future versions
272-
* **Limited Acknowledgment Options**: Only automatic `ACCEPT` acknowledgment is currently supported
273-
* **No Message Converters**: Message converters are not yet supported for share consumers
502+
* **In preview**: This feature is in preview mode and may change in future versions
274503
* **Single-Threaded**: Share consumer containers currently run in single-threaded mode
504+
* **No Message Converters**: Message converters are not yet supported for share consumers
505+
* **No Batch Listeners**: Batch processing is not supported with share consumers
506+
* **Poll Constraints**: In explicit acknowledgment mode, unacknowledged records block subsequent polls
507+

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
9090
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
9191
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
92+
import org.springframework.kafka.config.ShareKafkaListenerContainerFactory;
9293
import org.springframework.kafka.listener.ContainerGroupSequencer;
9394
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
9495
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
@@ -651,6 +652,10 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
651652
KafkaListenerContainerFactory<?> listenerContainerFactory = resolveContainerFactory(kafkaListener,
652653
containerFactory, beanName);
653654

655+
if (listenerContainerFactory instanceof ShareKafkaListenerContainerFactory<?, ?>) {
656+
endpoint.setShareConsumer(Boolean.TRUE);
657+
}
658+
654659
this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
655660
}
656661

@@ -685,6 +690,7 @@ private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?> en
685690
if (StringUtils.hasText(kafkaListener.batch())) {
686691
endpoint.setBatchListener(Boolean.parseBoolean(kafkaListener.batch()));
687692
}
693+
688694
endpoint.setBeanFactory(this.beanFactory);
689695
resolveErrorHandler(endpoint, kafkaListener);
690696
resolveContentTypeConverter(endpoint, kafkaListener);

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
9898

9999
private @Nullable Boolean batchListener;
100100

101+
private @Nullable Boolean shareConsumer;
102+
101103
private @Nullable KafkaTemplate<?, ?> replyTemplate;
102104

103105
private @Nullable String clientIdPrefix;
@@ -291,6 +293,14 @@ public void setBatchListener(boolean batchListener) {
291293
this.batchListener = batchListener;
292294
}
293295

296+
public void setShareConsumer(Boolean shareConsumer) {
297+
this.shareConsumer = shareConsumer;
298+
}
299+
300+
public @Nullable Boolean getShareConsumer() {
301+
return this.shareConsumer;
302+
}
303+
294304
/**
295305
* Set the {@link KafkaTemplate} to use to send replies.
296306
* @param replyTemplate the template.

spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.springframework.kafka.listener.adapter.HandlerAdapter;
3737
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
3838
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
39+
import org.springframework.kafka.listener.adapter.ShareRecordMessagingMessageListenerAdapter;
3940
import org.springframework.kafka.support.JavaUtils;
4041
import org.springframework.kafka.support.converter.BatchMessageConverter;
4142
import org.springframework.kafka.support.converter.MessageConverter;
@@ -210,7 +211,15 @@ protected MessagingMessageListenerAdapter<K, V> createMessageListenerInstance(
210211
@Nullable MessageConverter messageConverter) {
211212

212213
MessagingMessageListenerAdapter<K, V> listener;
213-
if (isBatchListener()) {
214+
if (getShareConsumer() != null && getShareConsumer()) {
215+
ShareRecordMessagingMessageListenerAdapter<K, V> messageListener = new ShareRecordMessagingMessageListenerAdapter<>(
216+
this.bean, this.method, this.errorHandler);
217+
if (messageConverter instanceof RecordMessageConverter recordMessageConverter) {
218+
messageListener.setMessageConverter(recordMessageConverter);
219+
}
220+
listener = messageListener;
221+
}
222+
else if (isBatchListener()) {
214223
BatchMessagingMessageListenerAdapter<K, V> messageListener = new BatchMessagingMessageListenerAdapter<>(
215224
this.bean, this.method, this.errorHandler);
216225
BatchToRecordAdapter<K, V> batchToRecordAdapter = getBatchToRecordAdapter();

0 commit comments

Comments
 (0)