-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Add comprehensive acknowledgment support for Kafka share consumers #4087
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -201,25 +201,6 @@ public class ShareMessageListener { | |
} | ||
---- | ||
|
||
[[share-group-configuration]] | ||
== Share Group Configuration | ||
|
||
Share groups require specific broker configuration to function properly. | ||
For testing with embedded Kafka, use: | ||
|
||
[source,java] | ||
---- | ||
@EmbeddedKafka( | ||
topics = {"my-queue-topic"}, | ||
brokerProperties = { | ||
"unstable.api.versions.enable=true", | ||
"group.coordinator.rebalance.protocols=classic,share", | ||
"share.coordinator.state.topic.replication.factor=1", | ||
"share.coordinator.state.topic.min.isr=1" | ||
} | ||
) | ||
---- | ||
|
||
[[share-group-offset-reset]] | ||
=== Share Group Offset Reset | ||
|
||
|
@@ -248,8 +229,257 @@ private void configureShareGroup(String bootstrapServers, String groupId) throws | |
[[share-record-acknowledgment]] | ||
== Record Acknowledgment | ||
|
||
Currently, share consumers automatically acknowledge records with `AcknowledgeType.ACCEPT` after successful processing. | ||
More sophisticated acknowledgment patterns will be added in future versions. | ||
Share consumers support two acknowledgment modes that control how records are acknowledged after processing. | ||
|
||
[[share-acknowledgment-modes]] | ||
=== Acknowledgment Modes | ||
|
||
Share containers support two acknowledgment modes: | ||
[[share-implicit-acknowledgment]] | ||
|
||
==== Implicit Acknowledgment (Default) | ||
In implicit mode, records are automatically acknowledged based on processing outcome: | ||
|
||
Successful processing: Records are acknowledged as `ACCEPT` | ||
Processing errors: Records are acknowledged as `REJECT` | ||
|
||
[source,java] | ||
---- | ||
@Bean | ||
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory( | ||
ShareConsumerFactory<String, String> shareConsumerFactory) { | ||
ShareKafkaListenerContainerFactory<String, String> factory = | ||
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory); | ||
// Implicit mode is the default | ||
factory.getContainerProperties().setShareAcknowledgmentMode( | ||
ContainerProperties.ShareAcknowledgmentMode.IMPLICIT); | ||
|
||
return factory; | ||
} | ||
---- | ||
|
||
[[share-explicit-acknowledgment]] | ||
==== Explicit Acknowledgment | ||
|
||
In explicit mode, the application must manually acknowledge each record using the provided ShareAcknowledgment: | ||
|
||
[source,java] | ||
---- | ||
@Bean | ||
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() { | ||
Map<String, Object> props = new HashMap<>(); | ||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | ||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | ||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | ||
props.put("share.acknowledgement.mode", "explicit"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is something new. Or... Just make it as a setter on this |
||
return new DefaultShareConsumerFactory<>(props); | ||
} | ||
|
||
@Bean | ||
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory( | ||
ShareConsumerFactory<String, String> explicitShareConsumerFactory) { | ||
return new ShareKafkaListenerContainerFactory<>(explicitShareConsumerFactory); | ||
} | ||
---- | ||
|
||
[[share-acknowledgment-types]] | ||
=== Acknowledgment Types | ||
|
||
Share consumers support three acknowledgment types: | ||
|
||
`ACCEPT`: Record processed successfully, mark as completed | ||
`RELEASE`: Temporary failure, make record available for redelivery | ||
`REJECT`: Permanent failure, do not retry | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't we need to add dash in the beginning of every entry to make them as a list in the rendered doc? |
||
|
||
[[share-acknowledgment-api]] | ||
=== ShareAcknowledgment API | ||
|
||
The `ShareAcknowledgment` interface provides methods for explicit acknowledgment: | ||
|
||
[source,java] | ||
---- | ||
public interface ShareAcknowledgment { | ||
void acknowledge(AcknowledgeType type); | ||
void acknowledge(); // Convenience method for ACCEPT | ||
void release(); // Convenience method for RELEASE | ||
void reject(); // Convenience method for REJECT | ||
boolean isAcknowledged(); | ||
AcknowledgeType getAcknowledgmentType(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure in the getter. |
||
} | ||
---- | ||
|
||
[[share-listener-interfaces]] | ||
=== Listener Interfaces | ||
|
||
Share consumers support specialized listener interfaces for different use cases: | ||
|
||
[[share-basic-listener]] | ||
==== Basic Message Listener | ||
|
||
Use the standard MessageListener for simple cases: | ||
[source,java] | ||
---- | ||
@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory") | ||
public void listen(ConsumerRecord<String, String> record) { | ||
System.out.println("Received: " + record.value()); | ||
// Automatically acknowledged in implicit mode | ||
} | ||
---- | ||
|
||
[[share-consumer-aware-listener]] | ||
==== ShareConsumerAwareMessageListener | ||
|
||
Access the ShareConsumer instance for advanced operations: | ||
|
||
[source,java] | ||
---- | ||
@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory") | ||
public void listen(ConsumerRecord<String, String> record, ShareConsumer<?, ?> consumer) { | ||
System.out.println("Received: " + record.value()); | ||
// Access consumer metrics, etc. | ||
} | ||
---- | ||
|
||
[[share-acknowledging-listener]] | ||
==== AcknowledgingShareConsumerAwareMessageListener | ||
|
||
Use explicit acknowledgment with full consumer access: | ||
|
||
[source,java] | ||
---- | ||
@Component | ||
public class ExplicitAckListener { | ||
@KafkaListener( | ||
topics = "my-topic", | ||
containerFactory = "explicitShareKafkaListenerContainerFactory" | ||
) | ||
public void listen(ConsumerRecord<String, String> record, | ||
ShareAcknowledgment acknowledgment, | ||
ShareConsumer<?, ?> consumer) { | ||
|
||
try { | ||
processRecord(record); | ||
acknowledgment.acknowledge(); // ACCEPT | ||
} catch (RetryableException e) { | ||
acknowledgment.release(); // Will be redelivered | ||
} catch (Exception e) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we have a |
||
acknowledgment.reject(); // Permanent failure | ||
} | ||
} | ||
|
||
private void processRecord(ConsumerRecord<String, String> record) { | ||
// Business logic here | ||
} | ||
} | ||
---- | ||
|
||
[[share-acknowledgment-constraints]] | ||
=== Acknowledgment Constraints | ||
|
||
In explicit acknowledgment mode, the container enforces important constraints: | ||
|
||
Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged. | ||
One-time Acknowledgment: Each record can only be acknowledged once. | ||
Error Handling: If processing throws an exception, the record is automatically acknowledged as `REJECT`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just curious that this is really a proper style these days. |
||
|
||
[WARNING] | ||
In explicit mode, failing to acknowledge records will block further message processing. | ||
Always ensure records are acknowledged in all code paths. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have any trigger to notify target application that ack was missed and we are blocking? |
||
|
||
[[share-acknowledgment-examples]] | ||
=== Acknowledgment Examples | ||
|
||
[[share-mixed-acknowledgment-example]] | ||
==== Mixed Acknowledgment Patterns | ||
|
||
[source,java] | ||
---- | ||
@KafkaListener(topics = "order-processing", containerFactory = "explicitShareKafkaListenerContainerFactory") | ||
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) { | ||
String orderId = record.key(); | ||
String orderData = record.value(); | ||
try { | ||
if (isValidOrder(orderData)) { | ||
if (processOrder(orderData)) { | ||
acknowledgment.acknowledge(); // Success - ACCEPT | ||
} | ||
else { | ||
acknowledgment.release(); // Temporary failure - retry later | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something off with indents in this code snippet. |
||
else { | ||
acknowledgment.reject(); // Invalid order - don't retry | ||
} | ||
} | ||
catch (Exception e) { | ||
// Exception automatically triggers REJECT | ||
throw e; | ||
} | ||
} | ||
---- | ||
|
||
[[share-conditional-acknowledgment-example]] | ||
==== Conditional Acknowledgment | ||
|
||
[source,java] | ||
---- | ||
@KafkaListener(topics = "data-validation", containerFactory = "explicitShareKafkaListenerContainerFactory") | ||
public void validateData(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) { | ||
ValidationResult result = validator.validate(record.value()); | ||
switch (result.getStatus()) { | ||
case VALID: | ||
acknowledgment.acknowledge(AcknowledgeType.ACCEPT); | ||
break; | ||
case INVALID_RETRYABLE: | ||
acknowledgment.acknowledge(AcknowledgeType.RELEASE); | ||
break; | ||
case INVALID_PERMANENT: | ||
acknowledgment.acknowledge(AcknowledgeType.REJECT); | ||
break; | ||
} | ||
} | ||
---- | ||
|
||
[[share-acknowledgment-configuration]] | ||
=== Acknowledgment Mode Configuration | ||
|
||
You can configure the acknowledgment mode at both the consumer factory and container levels: | ||
|
||
[[share-factory-level-configuration]] | ||
==== Factory Level Configuration | ||
|
||
[source,java] | ||
---- | ||
@Bean | ||
public ShareConsumerFactory<String, String> explicitAckShareConsumerFactory() { | ||
Map<String, Object> props = new HashMap<>(); | ||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | ||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | ||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | ||
// Configure explicit acknowledgment at the factory level | ||
props.put("share.acknowledgement.mode", "explicit"); | ||
return new DefaultShareConsumerFactory<>(props); | ||
} | ||
---- | ||
|
||
[[share-container-level-configuration]] | ||
==== Container Level Configuration | ||
|
||
[source,java] | ||
---- | ||
@Bean | ||
public ShareKafkaListenerContainerFactory<String, String> customShareKafkaListenerContainerFactory( | ||
ShareConsumerFactory<String, String> shareConsumerFactory) { | ||
ShareKafkaListenerContainerFactory<String, String> factory = | ||
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory); | ||
|
||
// Configure acknowledgment mode at container level | ||
factory.getContainerProperties().setShareAcknowledgmentMode( | ||
ContainerProperties.ShareAcknowledgmentMode.EXPLICIT); | ||
|
||
return factory; | ||
} | ||
---- | ||
|
||
[[share-differences-from-regular-consumers]] | ||
== Differences from Regular Consumers | ||
|
@@ -259,16 +489,19 @@ Share consumers differ from regular consumers in several key ways: | |
1. **No Partition Assignment**: Share consumers cannot be assigned specific partitions | ||
2. **No Topic Patterns**: Share consumers do not support subscribing to topic patterns | ||
3. **Cooperative Consumption**: Multiple consumers in the same share group can consume from the same partitions simultaneously | ||
4. **Automatic Acknowledgment**: Records are automatically acknowledged after processing | ||
4. **Record-Level Acknowledgment**: Supports explicit acknowledgment with `ACCEPT`, `RELEASE`, and `REJECT` types | ||
5. **Different Group Management**: Share groups use different coordinator protocols | ||
6. **No Batch Processing**: Share consumers process records individually, not in batches | ||
|
||
[[share-limitations-and-considerations]] | ||
== Limitations and Considerations | ||
|
||
[[share-current-limitations]] | ||
=== Current Limitations | ||
|
||
* **Early Access**: This feature is in early access and may change in future versions | ||
* **Limited Acknowledgment Options**: Only automatic `ACCEPT` acknowledgment is currently supported | ||
* **No Message Converters**: Message converters are not yet supported for share consumers | ||
* **In preview**: This feature is in preview mode and may change in future versions | ||
* **Single-Threaded**: Share consumer containers currently run in single-threaded mode | ||
* **No Message Converters**: Message converters are not yet supported for share consumers | ||
* **No Batch Listeners**: Batch processing is not supported with share consumers | ||
* **Poll Constraints**: In explicit acknowledgment mode, unacknowledged records block subsequent polls | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,6 +89,7 @@ | |
import org.springframework.kafka.config.KafkaListenerEndpointRegistry; | ||
import org.springframework.kafka.config.MethodKafkaListenerEndpoint; | ||
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint; | ||
import org.springframework.kafka.config.ShareKafkaListenerContainerFactory; | ||
import org.springframework.kafka.listener.ContainerGroupSequencer; | ||
import org.springframework.kafka.listener.KafkaConsumerBackoffManager; | ||
import org.springframework.kafka.listener.KafkaListenerErrorHandler; | ||
|
@@ -651,6 +652,10 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka | |
KafkaListenerContainerFactory<?> listenerContainerFactory = resolveContainerFactory(kafkaListener, | ||
containerFactory, beanName); | ||
|
||
if (listenerContainerFactory instanceof ShareKafkaListenerContainerFactory<?, ?>) { | ||
endpoint.setShareConsumer(Boolean.TRUE); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why just simple |
||
} | ||
|
||
this.registrar.registerEndpoint(endpoint, listenerContainerFactory); | ||
} | ||
|
||
|
@@ -685,6 +690,7 @@ private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?> en | |
if (StringUtils.hasText(kafkaListener.batch())) { | ||
endpoint.setBatchListener(Boolean.parseBoolean(kafkaListener.batch())); | ||
} | ||
|
||
endpoint.setBeanFactory(this.beanFactory); | ||
resolveErrorHandler(endpoint, kafkaListener); | ||
resolveContentTypeConverter(endpoint, kafkaListener); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -98,6 +98,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V> | |
|
||
private @Nullable Boolean batchListener; | ||
|
||
private @Nullable Boolean shareConsumer; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess if we go with simple Not sure, though, why |
||
|
||
private @Nullable KafkaTemplate<?, ?> replyTemplate; | ||
|
||
private @Nullable String clientIdPrefix; | ||
|
@@ -291,6 +293,14 @@ public void setBatchListener(boolean batchListener) { | |
this.batchListener = batchListener; | ||
} | ||
|
||
public void setShareConsumer(Boolean shareConsumer) { | ||
this.shareConsumer = shareConsumer; | ||
} | ||
|
||
public @Nullable Boolean getShareConsumer() { | ||
return this.shareConsumer; | ||
} | ||
|
||
/** | ||
* Set the {@link KafkaTemplate} to use to send replies. | ||
* @param replyTemplate the template. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,7 @@ | |
import org.springframework.kafka.listener.adapter.HandlerAdapter; | ||
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter; | ||
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter; | ||
import org.springframework.kafka.listener.adapter.ShareRecordMessagingMessageListenerAdapter; | ||
import org.springframework.kafka.support.JavaUtils; | ||
import org.springframework.kafka.support.converter.BatchMessageConverter; | ||
import org.springframework.kafka.support.converter.MessageConverter; | ||
|
@@ -210,7 +211,15 @@ protected MessagingMessageListenerAdapter<K, V> createMessageListenerInstance( | |
@Nullable MessageConverter messageConverter) { | ||
|
||
MessagingMessageListenerAdapter<K, V> listener; | ||
if (isBatchListener()) { | ||
if (getShareConsumer() != null && getShareConsumer()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, weird. |
||
ShareRecordMessagingMessageListenerAdapter<K, V> messageListener = new ShareRecordMessagingMessageListenerAdapter<>( | ||
this.bean, this.method, this.errorHandler); | ||
if (messageConverter instanceof RecordMessageConverter recordMessageConverter) { | ||
messageListener.setMessageConverter(recordMessageConverter); | ||
} | ||
listener = messageListener; | ||
} | ||
else if (isBatchListener()) { | ||
BatchMessagingMessageListenerAdapter<K, V> messageListener = new BatchMessagingMessageListenerAdapter<>( | ||
this.bean, this.method, this.errorHandler); | ||
BatchToRecordAdapter<K, V> batchToRecordAdapter = getBatchToRecordAdapter(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the blank line above has to be before the section id.
Kinda such an id has to be stuck to the section title.
Now sure, though, if we need such a fine-grained indexing with ids...