Skip to content

Conversation

sobychacko
Copy link
Contributor

Implement explicit and implicit acknowledgment modes for share consumer containers, enabling fine-grained control over record processing outcomes with ACCEPT, RELEASE, and REJECT acknowledgment types.

  • Add ShareAcknowledgment interface with ACCEPT/RELEASE/REJECT support

  • Add ShareAcknowledgmentException for acknowledgment failures

  • Implement ShareAcknowledgmentMode enum (IMPLICIT/EXPLICIT) in ContainerProperties

  • Add poll-level acknowledgment constraints in explicit mode

  • Add ShareConsumerAwareMessageListener for ShareConsumer access

  • Add AcknowledgingShareConsumerAwareMessageListener for explicit acknowledgment

  • Add ShareRecordMessagingMessageListenerAdapter for KafkaListener integration

  • Use non-polymorphic onShareRecord method names to avoid regression issues with existing listener infrastructure and maintain clear API separation

  • Enhanced ShareKafkaMessageListenerContainer with acknowledgment tracking

  • Automatic error handling with REJECT acknowledgment on exceptions

  • Poll blocking in explicit mode until all records acknowledged

  • Support for mixed acknowledgment patterns within single poll

  • Auto-detection of ShareKafkaListenerContainerFactory for share consumer endpoints

  • Validation preventing batch listeners with share consumers

  • Factory-level and container-level acknowledgment mode configuration

  • Message converter extensions for ShareAcknowledgment parameter injection

  • Comprehensive integration tests covering all acknowledgment scenarios

  • Constraint tests validating poll-level acknowledgment requirements

  • Unit tests for container behavior and listener dispatching

  • Updated documentation with acknowledgment examples

  • Implicit: Records auto-acknowledged as ACCEPT on success, REJECT on error

  • Explicit: Application must manually acknowledge each record before next poll

  • Explicit mode blocks subsequent polls until all records acknowledged

  • Prevents message loss and ensures proper acknowledgment ordering

  • Concurrent acknowledgment attempts properly handled with IllegalStateException

  • Processing exceptions trigger automatic REJECT acknowledgment

  • Acknowledgment failures reset state and throw ShareAcknowledgmentException

  • Container continues processing after individual record failures

  • ShareAcknowledgment parameter injection follows existing Spring Kafka patterns

  • Non-polymorphic listener method names (onShareRecord vs onMessage) prevent potential conflicts with existing listener infrastructure and ensure clear separation between regular and share consumer listener contracts

  • Factory and container level configuration options provide flexibility

This implementation provides acknowledgment semantics for Kafka share groups while maintaining backward compatibility with existing implicit acknowledgment behavior.

Implement explicit and implicit acknowledgment modes for share consumer
containers, enabling fine-grained control over record processing outcomes
with ACCEPT, RELEASE, and REJECT acknowledgment types.

- Add ShareAcknowledgment interface with ACCEPT/RELEASE/REJECT support
- Add ShareAcknowledgmentException for acknowledgment failures
- Implement ShareAcknowledgmentMode enum (IMPLICIT/EXPLICIT) in ContainerProperties
- Add poll-level acknowledgment constraints in explicit mode

- Add ShareConsumerAwareMessageListener for ShareConsumer access
- Add AcknowledgingShareConsumerAwareMessageListener for explicit acknowledgment
- Add ShareRecordMessagingMessageListenerAdapter for KafkaListener integration
- Use non-polymorphic onShareRecord method names to avoid regression issues
  with existing listener infrastructure and maintain clear API separation

- Enhanced ShareKafkaMessageListenerContainer with acknowledgment tracking
- Automatic error handling with REJECT acknowledgment on exceptions
- Poll blocking in explicit mode until all records acknowledged
- Support for mixed acknowledgment patterns within single poll

- Auto-detection of ShareKafkaListenerContainerFactory for share consumer endpoints
- Validation preventing batch listeners with share consumers
- Factory-level and container-level acknowledgment mode configuration
- Message converter extensions for ShareAcknowledgment parameter injection

- Comprehensive integration tests covering all acknowledgment scenarios
- Constraint tests validating poll-level acknowledgment requirements
- Unit tests for container behavior and listener dispatching
- Updated documentation with acknowledgment examples

- Implicit: Records auto-acknowledged as ACCEPT on success, REJECT on error
- Explicit: Application must manually acknowledge each record before next poll

- Explicit mode blocks subsequent polls until all records acknowledged
- Prevents message loss and ensures proper acknowledgment ordering
- Concurrent acknowledgment attempts properly handled with IllegalStateException

- Processing exceptions trigger automatic REJECT acknowledgment
- Acknowledgment failures reset state and throw ShareAcknowledgmentException
- Container continues processing after individual record failures

- ShareAcknowledgment parameter injection follows existing Spring Kafka patterns
- Non-polymorphic listener method names (onShareRecord vs onMessage) prevent
  potential conflicts with existing listener infrastructure and ensure clear
  separation between regular and share consumer listener contracts
- Factory and container level configuration options provide flexibility

This implementation provides acknowledgment semantics for Kafka share groups
while maintaining backward compatibility with existing implicit acknowledgment behavior.

Signed-off-by: Soby Chacko <[email protected]>
@artembilan artembilan added this to the 4.0.0-RC1 milestone Sep 18, 2025
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The review so far.
Might be the case that some of it is void according to your new vision, but still might be helpful.

Thanks

Share containers support two acknowledgment modes:
[[share-implicit-acknowledgment]]

==== Implicit Acknowledgment (Default)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the blank line above has to be before the section id.
Kinda such an id has to be stuck to the section title.
Now sure, though, if we need such a fine-grained indexing with ids...

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put("share.acknowledgement.mode", "explicit");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something new.
I mean an approach to configure such a an option on the target ContainerProperties.
We never followed this pattern before and just tried to stay in the configs with only Apache Kafka Client properties as much as possible.
So, why now introducing this extra level of complexity and paradox of choice?

Or... Just make it as a setter on this DefaultShareConsumerFactory to be type-safe!


`ACCEPT`: Record processed successfully, mark as completed
`RELEASE`: Temporary failure, make record available for redelivery
`REJECT`: Permanent failure, do not retry
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to add dash in the beginning of every entry to make them as a list in the rendered doc?

void release(); // Convenience method for RELEASE
void reject(); // Convenience method for REJECT
boolean isAcknowledged();
AcknowledgeType getAcknowledgmentType();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure in the getter.
And what is useful for end-user with an isAcknowledged()?
If we still need such an accessor, why not make it internal at the place where we implement this ShareAcknowledgment
and make end-user API as straightforward as possible?
I also think that API like acknowledge(AcknowledgeType type) is not needed since we have all those other choices.
In other words: let's avoid paradox of choice: or one, or another!
But again: this is my preference. If you have some argument to keep this, I'm OK.

acknowledgment.acknowledge(); // ACCEPT
} catch (RetryableException e) {
acknowledgment.release(); // Will be redelivered
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a catch on a new like even in docs?
Just let's don't encourage for bad habit copy/pasting samples from our docs!

/**
* A message listener for share consumer containers that supports explicit acknowledgment.
* <p>
* This interface provides both access to the ShareConsumer and explicit acknowledgment
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chances to make Javadocs more reader-friendly?
I mean use of {@link}, {@code} etc.

* @param <V> the value type
* @author Soby Chacko
* @since 4.0
* @see ShareAcknowledgment
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I told before that my preference is it divide sections in Javadocs, so then it would give more respect to @author making it more visible.
But that's my preference: If you are not agreed, just let me know and I won't make such a comment anymore.

* When explicit acknowledgment mode is used, the acknowledgment parameter will be non-null
* and must be used to acknowledge the record. When implicit acknowledgment mode is used,
* the acknowledgment parameter will be null.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here is no: The style in Spring projects is to no blank lines in the method Javadoc.

/**
* Application must explicitly acknowledge all records before next poll.
*/
EXPLICIT("explicit");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is interesting if you would use lower case for enum names, then you would not need extra string property on it.
On the other hand, if we get rid off of property-based configuration, we may not need even think about lower case, since the type-safe setter would always accepts only this enum.

* @param data the data to be processed.
* @param consumer the share consumer.
*/
void onShareRecord(ConsumerRecord<K, V> data, ShareConsumer<?, ?> consumer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why do we need this extra abstraction if AcknowledgingShareConsumerAwareMessageListener can deal with nullable ack argument?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants