Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,25 +201,6 @@ public class ShareMessageListener {
}
----

[[share-group-configuration]]
== Share Group Configuration

Share groups require specific broker configuration to function properly.
For testing with embedded Kafka, use:

[source,java]
----
@EmbeddedKafka(
topics = {"my-queue-topic"},
brokerProperties = {
"unstable.api.versions.enable=true",
"group.coordinator.rebalance.protocols=classic,share",
"share.coordinator.state.topic.replication.factor=1",
"share.coordinator.state.topic.min.isr=1"
}
)
----

[[share-group-offset-reset]]
=== Share Group Offset Reset

Expand Down Expand Up @@ -248,8 +229,257 @@ private void configureShareGroup(String bootstrapServers, String groupId) throws
[[share-record-acknowledgment]]
== Record Acknowledgment

Currently, share consumers automatically acknowledge records with `AcknowledgeType.ACCEPT` after successful processing.
More sophisticated acknowledgment patterns will be added in future versions.
Share consumers support two acknowledgment modes that control how records are acknowledged after processing.

[[share-acknowledgment-modes]]
=== Acknowledgment Modes

Share containers support two acknowledgment modes:
[[share-implicit-acknowledgment]]

==== Implicit Acknowledgment (Default)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the blank line above has to be before the section id.
Kinda such an id has to be stuck to the section title.
Now sure, though, if we need such a fine-grained indexing with ids...

In implicit mode, records are automatically acknowledged based on processing outcome:

Successful processing: Records are acknowledged as `ACCEPT`
Processing errors: Records are acknowledged as `REJECT`

[source,java]
----
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Implicit mode is the default
factory.getContainerProperties().setShareAcknowledgmentMode(
ContainerProperties.ShareAcknowledgmentMode.IMPLICIT);

return factory;
}
----

[[share-explicit-acknowledgment]]
==== Explicit Acknowledgment

In explicit mode, the application must manually acknowledge each record using the provided ShareAcknowledgment:

[source,java]
----
@Bean
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put("share.acknowledgement.mode", "explicit");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something new.
I mean an approach to configure such a an option on the target ContainerProperties.
We never followed this pattern before and just tried to stay in the configs with only Apache Kafka Client properties as much as possible.
So, why now introducing this extra level of complexity and paradox of choice?

Or... Just make it as a setter on this DefaultShareConsumerFactory to be type-safe!

return new DefaultShareConsumerFactory<>(props);
}

@Bean
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> explicitShareConsumerFactory) {
return new ShareKafkaListenerContainerFactory<>(explicitShareConsumerFactory);
}
----

[[share-acknowledgment-types]]
=== Acknowledgment Types

Share consumers support three acknowledgment types:

`ACCEPT`: Record processed successfully, mark as completed
`RELEASE`: Temporary failure, make record available for redelivery
`REJECT`: Permanent failure, do not retry
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to add dash in the beginning of every entry to make them as a list in the rendered doc?


[[share-acknowledgment-api]]
=== ShareAcknowledgment API

The `ShareAcknowledgment` interface provides methods for explicit acknowledgment:

[source,java]
----
public interface ShareAcknowledgment {
void acknowledge(AcknowledgeType type);
void acknowledge(); // Convenience method for ACCEPT
void release(); // Convenience method for RELEASE
void reject(); // Convenience method for REJECT
boolean isAcknowledged();
AcknowledgeType getAcknowledgmentType();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure in the getter.
And what is useful for end-user with an isAcknowledged()?
If we still need such an accessor, why not make it internal at the place where we implement this ShareAcknowledgment
and make end-user API as straightforward as possible?
I also think that API like acknowledge(AcknowledgeType type) is not needed since we have all those other choices.
In other words: let's avoid paradox of choice: or one, or another!
But again: this is my preference. If you have some argument to keep this, I'm OK.

}
----

[[share-listener-interfaces]]
=== Listener Interfaces

Share consumers support specialized listener interfaces for different use cases:

[[share-basic-listener]]
==== Basic Message Listener

Use the standard MessageListener for simple cases:
[source,java]
----
@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received: " + record.value());
// Automatically acknowledged in implicit mode
}
----

[[share-consumer-aware-listener]]
==== ShareConsumerAwareMessageListener

Access the ShareConsumer instance for advanced operations:

[source,java]
----
@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record, ShareConsumer<?, ?> consumer) {
System.out.println("Received: " + record.value());
// Access consumer metrics, etc.
}
----

[[share-acknowledging-listener]]
==== AcknowledgingShareConsumerAwareMessageListener

Use explicit acknowledgment with full consumer access:

[source,java]
----
@Component
public class ExplicitAckListener {
@KafkaListener(
topics = "my-topic",
containerFactory = "explicitShareKafkaListenerContainerFactory"
)
public void listen(ConsumerRecord<String, String> record,
ShareAcknowledgment acknowledgment,
ShareConsumer<?, ?> consumer) {

try {
processRecord(record);
acknowledgment.acknowledge(); // ACCEPT
} catch (RetryableException e) {
acknowledgment.release(); // Will be redelivered
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a catch on a new like even in docs?
Just let's don't encourage for bad habit copy/pasting samples from our docs!

acknowledgment.reject(); // Permanent failure
}
}

private void processRecord(ConsumerRecord<String, String> record) {
// Business logic here
}
}
----

[[share-acknowledgment-constraints]]
=== Acknowledgment Constraints

In explicit acknowledgment mode, the container enforces important constraints:

Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged.
One-time Acknowledgment: Each record can only be acknowledged once.
Error Handling: If processing throws an exception, the record is automatically acknowledged as `REJECT`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious that this is really a proper style these days.
Reading now The Three-Body Problem, and indeed they capitalize after colon.
Let me know, and I'll follow same pattern from now on! 😄


[WARNING]
In explicit mode, failing to acknowledge records will block further message processing.
Always ensure records are acknowledged in all code paths.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any trigger to notify target application that ack was missed and we are blocking?
I mean this might be a typical misuse and respective reports like "Spring Kafka does not consume new records any more" 🤷


[[share-acknowledgment-examples]]
=== Acknowledgment Examples

[[share-mixed-acknowledgment-example]]
==== Mixed Acknowledgment Patterns

[source,java]
----
@KafkaListener(topics = "order-processing", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
String orderId = record.key();
String orderData = record.value();
try {
if (isValidOrder(orderData)) {
if (processOrder(orderData)) {
acknowledgment.acknowledge(); // Success - ACCEPT
}
else {
acknowledgment.release(); // Temporary failure - retry later
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something off with indents in this code snippet.
Perhaps not everything is indented with 4-spaces instead of tabs.

else {
acknowledgment.reject(); // Invalid order - don't retry
}
}
catch (Exception e) {
// Exception automatically triggers REJECT
throw e;
}
}
----

[[share-conditional-acknowledgment-example]]
==== Conditional Acknowledgment

[source,java]
----
@KafkaListener(topics = "data-validation", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void validateData(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
ValidationResult result = validator.validate(record.value());
switch (result.getStatus()) {
case VALID:
acknowledgment.acknowledge(AcknowledgeType.ACCEPT);
break;
case INVALID_RETRYABLE:
acknowledgment.acknowledge(AcknowledgeType.RELEASE);
break;
case INVALID_PERMANENT:
acknowledgment.acknowledge(AcknowledgeType.REJECT);
break;
}
}
----

[[share-acknowledgment-configuration]]
=== Acknowledgment Mode Configuration

You can configure the acknowledgment mode at both the consumer factory and container levels:

[[share-factory-level-configuration]]
==== Factory Level Configuration

[source,java]
----
@Bean
public ShareConsumerFactory<String, String> explicitAckShareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// Configure explicit acknowledgment at the factory level
props.put("share.acknowledgement.mode", "explicit");
return new DefaultShareConsumerFactory<>(props);
}
----

[[share-container-level-configuration]]
==== Container Level Configuration

[source,java]
----
@Bean
public ShareKafkaListenerContainerFactory<String, String> customShareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);

// Configure acknowledgment mode at container level
factory.getContainerProperties().setShareAcknowledgmentMode(
ContainerProperties.ShareAcknowledgmentMode.EXPLICIT);

return factory;
}
----

[[share-differences-from-regular-consumers]]
== Differences from Regular Consumers
Expand All @@ -259,16 +489,19 @@ Share consumers differ from regular consumers in several key ways:
1. **No Partition Assignment**: Share consumers cannot be assigned specific partitions
2. **No Topic Patterns**: Share consumers do not support subscribing to topic patterns
3. **Cooperative Consumption**: Multiple consumers in the same share group can consume from the same partitions simultaneously
4. **Automatic Acknowledgment**: Records are automatically acknowledged after processing
4. **Record-Level Acknowledgment**: Supports explicit acknowledgment with `ACCEPT`, `RELEASE`, and `REJECT` types
5. **Different Group Management**: Share groups use different coordinator protocols
6. **No Batch Processing**: Share consumers process records individually, not in batches

[[share-limitations-and-considerations]]
== Limitations and Considerations

[[share-current-limitations]]
=== Current Limitations

* **Early Access**: This feature is in early access and may change in future versions
* **Limited Acknowledgment Options**: Only automatic `ACCEPT` acknowledgment is currently supported
* **No Message Converters**: Message converters are not yet supported for share consumers
* **In preview**: This feature is in preview mode and may change in future versions
* **Single-Threaded**: Share consumer containers currently run in single-threaded mode
* **No Message Converters**: Message converters are not yet supported for share consumers
* **No Batch Listeners**: Batch processing is not supported with share consumers
* **Poll Constraints**: In explicit acknowledgment mode, unacknowledged records block subsequent polls

Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.config.ShareKafkaListenerContainerFactory;
import org.springframework.kafka.listener.ContainerGroupSequencer;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
Expand Down Expand Up @@ -651,6 +652,10 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
KafkaListenerContainerFactory<?> listenerContainerFactory = resolveContainerFactory(kafkaListener,
containerFactory, beanName);

if (listenerContainerFactory instanceof ShareKafkaListenerContainerFactory<?, ?>) {
endpoint.setShareConsumer(Boolean.TRUE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why just simple boolean is not enough?

}

this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
}

Expand Down Expand Up @@ -685,6 +690,7 @@ private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?> en
if (StringUtils.hasText(kafkaListener.batch())) {
endpoint.setBatchListener(Boolean.parseBoolean(kafkaListener.batch()));
}

endpoint.setBeanFactory(this.beanFactory);
resolveErrorHandler(endpoint, kafkaListener);
resolveContentTypeConverter(endpoint, kafkaListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>

private @Nullable Boolean batchListener;

private @Nullable Boolean shareConsumer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess if we go with simple boolean, then even these two spaces after @Nullable would go away 😄 .

Not sure, though, why batchListener is not a primitive...


private @Nullable KafkaTemplate<?, ?> replyTemplate;

private @Nullable String clientIdPrefix;
Expand Down Expand Up @@ -291,6 +293,14 @@ public void setBatchListener(boolean batchListener) {
this.batchListener = batchListener;
}

public void setShareConsumer(Boolean shareConsumer) {
this.shareConsumer = shareConsumer;
}

public @Nullable Boolean getShareConsumer() {
return this.shareConsumer;
}

/**
* Set the {@link KafkaTemplate} to use to send replies.
* @param replyTemplate the template.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.springframework.kafka.listener.adapter.HandlerAdapter;
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.ShareRecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.JavaUtils;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.MessageConverter;
Expand Down Expand Up @@ -210,7 +211,15 @@ protected MessagingMessageListenerAdapter<K, V> createMessageListenerInstance(
@Nullable MessageConverter messageConverter) {

MessagingMessageListenerAdapter<K, V> listener;
if (isBatchListener()) {
if (getShareConsumer() != null && getShareConsumer()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, weird.
I guess with a primitive this would not be that convoluted.
Either way something like Boolean.TRUE.equals(getShareConsumer()) looks much better and does exactly the same, but better because of only one method call.

ShareRecordMessagingMessageListenerAdapter<K, V> messageListener = new ShareRecordMessagingMessageListenerAdapter<>(
this.bean, this.method, this.errorHandler);
if (messageConverter instanceof RecordMessageConverter recordMessageConverter) {
messageListener.setMessageConverter(recordMessageConverter);
}
listener = messageListener;
}
else if (isBatchListener()) {
BatchMessagingMessageListenerAdapter<K, V> messageListener = new BatchMessagingMessageListenerAdapter<>(
this.bean, this.method, this.errorHandler);
BatchToRecordAdapter<K, V> batchToRecordAdapter = getBatchToRecordAdapter();
Expand Down
Loading