-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Add comprehensive acknowledgment support for Kafka share consumers #4087
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add comprehensive acknowledgment support for Kafka share consumers #4087
Conversation
Implement explicit and implicit acknowledgment modes for share consumer containers, enabling fine-grained control over record processing outcomes with ACCEPT, RELEASE, and REJECT acknowledgment types. - Add ShareAcknowledgment interface with ACCEPT/RELEASE/REJECT support - Add ShareAcknowledgmentException for acknowledgment failures - Implement ShareAcknowledgmentMode enum (IMPLICIT/EXPLICIT) in ContainerProperties - Add poll-level acknowledgment constraints in explicit mode - Add ShareConsumerAwareMessageListener for ShareConsumer access - Add AcknowledgingShareConsumerAwareMessageListener for explicit acknowledgment - Add ShareRecordMessagingMessageListenerAdapter for KafkaListener integration - Use non-polymorphic onShareRecord method names to avoid regression issues with existing listener infrastructure and maintain clear API separation - Enhanced ShareKafkaMessageListenerContainer with acknowledgment tracking - Automatic error handling with REJECT acknowledgment on exceptions - Poll blocking in explicit mode until all records acknowledged - Support for mixed acknowledgment patterns within single poll - Auto-detection of ShareKafkaListenerContainerFactory for share consumer endpoints - Validation preventing batch listeners with share consumers - Factory-level and container-level acknowledgment mode configuration - Message converter extensions for ShareAcknowledgment parameter injection - Comprehensive integration tests covering all acknowledgment scenarios - Constraint tests validating poll-level acknowledgment requirements - Unit tests for container behavior and listener dispatching - Updated documentation with acknowledgment examples - Implicit: Records auto-acknowledged as ACCEPT on success, REJECT on error - Explicit: Application must manually acknowledge each record before next poll - Explicit mode blocks subsequent polls until all records acknowledged - Prevents message loss and ensures proper acknowledgment ordering - Concurrent acknowledgment attempts properly handled with IllegalStateException - Processing exceptions trigger automatic REJECT acknowledgment - Acknowledgment failures reset state and throw ShareAcknowledgmentException - Container continues processing after individual record failures - ShareAcknowledgment parameter injection follows existing Spring Kafka patterns - Non-polymorphic listener method names (onShareRecord vs onMessage) prevent potential conflicts with existing listener infrastructure and ensure clear separation between regular and share consumer listener contracts - Factory and container level configuration options provide flexibility This implementation provides acknowledgment semantics for Kafka share groups while maintaining backward compatibility with existing implicit acknowledgment behavior. Signed-off-by: Soby Chacko <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The review so far.
Might be the case that some of it is void according to your new vision, but still might be helpful.
Thanks
Share containers support two acknowledgment modes: | ||
[[share-implicit-acknowledgment]] | ||
|
||
==== Implicit Acknowledgment (Default) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the blank line above has to be before the section id.
Kinda such an id has to be stuck to the section title.
Now sure, though, if we need such a fine-grained indexing with ids...
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | ||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | ||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | ||
props.put("share.acknowledgement.mode", "explicit"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is something new.
I mean an approach to configure such a an option on the target ContainerProperties
.
We never followed this pattern before and just tried to stay in the configs
with only Apache Kafka Client properties as much as possible.
So, why now introducing this extra level of complexity and paradox of choice?
Or... Just make it as a setter on this DefaultShareConsumerFactory
to be type-safe!
|
||
`ACCEPT`: Record processed successfully, mark as completed | ||
`RELEASE`: Temporary failure, make record available for redelivery | ||
`REJECT`: Permanent failure, do not retry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to add dash in the beginning of every entry to make them as a list in the rendered doc?
void release(); // Convenience method for RELEASE | ||
void reject(); // Convenience method for REJECT | ||
boolean isAcknowledged(); | ||
AcknowledgeType getAcknowledgmentType(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure in the getter.
And what is useful for end-user with an isAcknowledged()
?
If we still need such an accessor, why not make it internal at the place where we implement this ShareAcknowledgment
and make end-user API as straightforward as possible?
I also think that API like acknowledge(AcknowledgeType type)
is not needed since we have all those other choices.
In other words: let's avoid paradox of choice: or one, or another!
But again: this is my preference. If you have some argument to keep this, I'm OK.
acknowledgment.acknowledge(); // ACCEPT | ||
} catch (RetryableException e) { | ||
acknowledgment.release(); // Will be redelivered | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a catch
on a new like even in docs?
Just let's don't encourage for bad habit copy/pasting samples from our docs!
/** | ||
* A message listener for share consumer containers that supports explicit acknowledgment. | ||
* <p> | ||
* This interface provides both access to the ShareConsumer and explicit acknowledgment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any chances to make Javadocs more reader-friendly?
I mean use of {@link}
, {@code}
etc.
* @param <V> the value type | ||
* @author Soby Chacko | ||
* @since 4.0 | ||
* @see ShareAcknowledgment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I told before that my preference is it divide sections in Javadocs, so then it would give more respect to @author
making it more visible.
But that's my preference: If you are not agreed, just let me know and I won't make such a comment anymore.
* When explicit acknowledgment mode is used, the acknowledgment parameter will be non-null | ||
* and must be used to acknowledge the record. When implicit acknowledgment mode is used, | ||
* the acknowledgment parameter will be null. | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And here is no: The style in Spring projects is to no blank lines in the method Javadoc.
/** | ||
* Application must explicitly acknowledge all records before next poll. | ||
*/ | ||
EXPLICIT("explicit"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is interesting if you would use lower case for enum names, then you would not need extra string property on it.
On the other hand, if we get rid off of property-based configuration, we may not need even think about lower case, since the type-safe setter would always accepts only this enum.
* @param data the data to be processed. | ||
* @param consumer the share consumer. | ||
*/ | ||
void onShareRecord(ConsumerRecord<K, V> data, ShareConsumer<?, ?> consumer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why do we need this extra abstraction if AcknowledgingShareConsumerAwareMessageListener
can deal with nullable ack argument?
Implement explicit and implicit acknowledgment modes for share consumer containers, enabling fine-grained control over record processing outcomes with ACCEPT, RELEASE, and REJECT acknowledgment types.
Add ShareAcknowledgment interface with ACCEPT/RELEASE/REJECT support
Add ShareAcknowledgmentException for acknowledgment failures
Implement ShareAcknowledgmentMode enum (IMPLICIT/EXPLICIT) in ContainerProperties
Add poll-level acknowledgment constraints in explicit mode
Add ShareConsumerAwareMessageListener for ShareConsumer access
Add AcknowledgingShareConsumerAwareMessageListener for explicit acknowledgment
Add ShareRecordMessagingMessageListenerAdapter for KafkaListener integration
Use non-polymorphic onShareRecord method names to avoid regression issues with existing listener infrastructure and maintain clear API separation
Enhanced ShareKafkaMessageListenerContainer with acknowledgment tracking
Automatic error handling with REJECT acknowledgment on exceptions
Poll blocking in explicit mode until all records acknowledged
Support for mixed acknowledgment patterns within single poll
Auto-detection of ShareKafkaListenerContainerFactory for share consumer endpoints
Validation preventing batch listeners with share consumers
Factory-level and container-level acknowledgment mode configuration
Message converter extensions for ShareAcknowledgment parameter injection
Comprehensive integration tests covering all acknowledgment scenarios
Constraint tests validating poll-level acknowledgment requirements
Unit tests for container behavior and listener dispatching
Updated documentation with acknowledgment examples
Implicit: Records auto-acknowledged as ACCEPT on success, REJECT on error
Explicit: Application must manually acknowledge each record before next poll
Explicit mode blocks subsequent polls until all records acknowledged
Prevents message loss and ensures proper acknowledgment ordering
Concurrent acknowledgment attempts properly handled with IllegalStateException
Processing exceptions trigger automatic REJECT acknowledgment
Acknowledgment failures reset state and throw ShareAcknowledgmentException
Container continues processing after individual record failures
ShareAcknowledgment parameter injection follows existing Spring Kafka patterns
Non-polymorphic listener method names (onShareRecord vs onMessage) prevent potential conflicts with existing listener infrastructure and ensure clear separation between regular and share consumer listener contracts
Factory and container level configuration options provide flexibility
This implementation provides acknowledgment semantics for Kafka share groups while maintaining backward compatibility with existing implicit acknowledgment behavior.