From 178aedf972b848b28bf1ffc179e86f0ee8a262d5 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Fri, 12 Sep 2025 18:58:26 -0400 Subject: [PATCH] Add comprehensive acknowledgment support for Kafka share consumers Implement explicit and implicit acknowledgment modes for share consumer containers, enabling fine-grained control over record processing outcomes with ACCEPT, RELEASE, and REJECT acknowledgment types. - Add ShareAcknowledgment interface with ACCEPT/RELEASE/REJECT support - Add ShareAcknowledgmentException for acknowledgment failures - Implement ShareAcknowledgmentMode enum (IMPLICIT/EXPLICIT) in ContainerProperties - Add poll-level acknowledgment constraints in explicit mode - Add ShareConsumerAwareMessageListener for ShareConsumer access - Add AcknowledgingShareConsumerAwareMessageListener for explicit acknowledgment - Add ShareRecordMessagingMessageListenerAdapter for KafkaListener integration - Use non-polymorphic onShareRecord method names to avoid regression issues with existing listener infrastructure and maintain clear API separation - Enhanced ShareKafkaMessageListenerContainer with acknowledgment tracking - Automatic error handling with REJECT acknowledgment on exceptions - Poll blocking in explicit mode until all records acknowledged - Support for mixed acknowledgment patterns within single poll - Auto-detection of ShareKafkaListenerContainerFactory for share consumer endpoints - Validation preventing batch listeners with share consumers - Factory-level and container-level acknowledgment mode configuration - Message converter extensions for ShareAcknowledgment parameter injection - Comprehensive integration tests covering all acknowledgment scenarios - Constraint tests validating poll-level acknowledgment requirements - Unit tests for container behavior and listener dispatching - Updated documentation with acknowledgment examples - Implicit: Records auto-acknowledged as ACCEPT on success, REJECT on error - Explicit: Application must manually acknowledge each record before next poll - Explicit mode blocks subsequent polls until all records acknowledged - Prevents message loss and ensures proper acknowledgment ordering - Concurrent acknowledgment attempts properly handled with IllegalStateException - Processing exceptions trigger automatic REJECT acknowledgment - Acknowledgment failures reset state and throw ShareAcknowledgmentException - Container continues processing after individual record failures - ShareAcknowledgment parameter injection follows existing Spring Kafka patterns - Non-polymorphic listener method names (onShareRecord vs onMessage) prevent potential conflicts with existing listener infrastructure and ensure clear separation between regular and share consumer listener contracts - Factory and container level configuration options provide flexibility This implementation provides acknowledgment semantics for Kafka share groups while maintaining backward compatibility with existing implicit acknowledgment behavior. Signed-off-by: Soby Chacko --- .../ROOT/pages/kafka/kafka-queues.adoc | 283 ++++++++- ...kaListenerAnnotationBeanPostProcessor.java | 6 + .../config/AbstractKafkaListenerEndpoint.java | 10 + .../config/MethodKafkaListenerEndpoint.java | 11 +- .../ShareKafkaListenerContainerFactory.java | 31 +- ...gingShareConsumerAwareMessageListener.java | 59 ++ .../kafka/listener/ContainerProperties.java | 59 ++ .../ShareConsumerAwareMessageListener.java | 51 ++ .../ShareKafkaMessageListenerContainer.java | 233 +++++++- .../MessagingMessageListenerAdapter.java | 115 ++++ ...RecordMessagingMessageListenerAdapter.java | 92 +++ .../kafka/support/ShareAcknowledgment.java | 114 ++++ .../support/ShareAcknowledgmentException.java | 57 ++ .../support/converter/MessageConverter.java | 20 + .../converter/MessagingMessageConverter.java | 31 + .../converter/RecordMessageConverter.java | 6 + .../EnableKafkaIntegrationTests.java | 7 + .../ShareKafkaListenerIntegrationTests.java | 384 +++++++++++- ...ssageListenerContainerConstraintTests.java | 331 +++++++++++ ...sageListenerContainerIntegrationTests.java | 562 +++++++++++++++++- ...afkaMessageListenerContainerUnitTests.java | 403 +++++++++++++ 21 files changed, 2764 insertions(+), 101 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingShareConsumerAwareMessageListener.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/ShareConsumerAwareMessageListener.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ShareRecordMessagingMessageListenerAdapter.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/support/ShareAcknowledgment.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/support/ShareAcknowledgmentException.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerConstraintTests.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc index 020ac1a0bf..3022896cb7 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc @@ -201,25 +201,6 @@ public class ShareMessageListener { } ---- -[[share-group-configuration]] -== Share Group Configuration - -Share groups require specific broker configuration to function properly. -For testing with embedded Kafka, use: - -[source,java] ----- -@EmbeddedKafka( - topics = {"my-queue-topic"}, - brokerProperties = { - "unstable.api.versions.enable=true", - "group.coordinator.rebalance.protocols=classic,share", - "share.coordinator.state.topic.replication.factor=1", - "share.coordinator.state.topic.min.isr=1" - } -) ----- - [[share-group-offset-reset]] === Share Group Offset Reset @@ -248,8 +229,257 @@ private void configureShareGroup(String bootstrapServers, String groupId) throws [[share-record-acknowledgment]] == Record Acknowledgment -Currently, share consumers automatically acknowledge records with `AcknowledgeType.ACCEPT` after successful processing. -More sophisticated acknowledgment patterns will be added in future versions. +Share consumers support two acknowledgment modes that control how records are acknowledged after processing. + +[[share-acknowledgment-modes]] +=== Acknowledgment Modes + +Share containers support two acknowledgment modes: +[[share-implicit-acknowledgment]] + +==== Implicit Acknowledgment (Default) +In implicit mode, records are automatically acknowledged based on processing outcome: + +Successful processing: Records are acknowledged as `ACCEPT` +Processing errors: Records are acknowledged as `REJECT` + +[source,java] +---- +@Bean +public ShareKafkaListenerContainerFactory shareKafkaListenerContainerFactory( + ShareConsumerFactory shareConsumerFactory) { + ShareKafkaListenerContainerFactory factory = + new ShareKafkaListenerContainerFactory<>(shareConsumerFactory); + // Implicit mode is the default + factory.getContainerProperties().setShareAcknowledgmentMode( + ContainerProperties.ShareAcknowledgmentMode.IMPLICIT); + + return factory; +} +---- + +[[share-explicit-acknowledgment]] +==== Explicit Acknowledgment + +In explicit mode, the application must manually acknowledge each record using the provided ShareAcknowledgment: + +[source,java] +---- +@Bean +public ShareConsumerFactory explicitShareConsumerFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put("share.acknowledgement.mode", "explicit"); + return new DefaultShareConsumerFactory<>(props); +} + +@Bean +public ShareKafkaListenerContainerFactory explicitShareKafkaListenerContainerFactory( + ShareConsumerFactory explicitShareConsumerFactory) { + return new ShareKafkaListenerContainerFactory<>(explicitShareConsumerFactory); +} +---- + +[[share-acknowledgment-types]] +=== Acknowledgment Types + +Share consumers support three acknowledgment types: + +`ACCEPT`: Record processed successfully, mark as completed +`RELEASE`: Temporary failure, make record available for redelivery +`REJECT`: Permanent failure, do not retry + +[[share-acknowledgment-api]] +=== ShareAcknowledgment API + +The `ShareAcknowledgment` interface provides methods for explicit acknowledgment: + +[source,java] +---- +public interface ShareAcknowledgment { + void acknowledge(AcknowledgeType type); + void acknowledge(); // Convenience method for ACCEPT + void release(); // Convenience method for RELEASE + void reject(); // Convenience method for REJECT + boolean isAcknowledged(); + AcknowledgeType getAcknowledgmentType(); +} +---- + +[[share-listener-interfaces]] +=== Listener Interfaces + +Share consumers support specialized listener interfaces for different use cases: + +[[share-basic-listener]] +==== Basic Message Listener + +Use the standard MessageListener for simple cases: +[source,java] +---- +@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory") +public void listen(ConsumerRecord record) { + System.out.println("Received: " + record.value()); + // Automatically acknowledged in implicit mode +} +---- + +[[share-consumer-aware-listener]] +==== ShareConsumerAwareMessageListener + +Access the ShareConsumer instance for advanced operations: + +[source,java] +---- +@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory") +public void listen(ConsumerRecord record, ShareConsumer consumer) { + System.out.println("Received: " + record.value()); + // Access consumer metrics, etc. +} +---- + +[[share-acknowledging-listener]] +==== AcknowledgingShareConsumerAwareMessageListener + +Use explicit acknowledgment with full consumer access: + +[source,java] +---- +@Component +public class ExplicitAckListener { +@KafkaListener( + topics = "my-topic", + containerFactory = "explicitShareKafkaListenerContainerFactory" +) +public void listen(ConsumerRecord record, + ShareAcknowledgment acknowledgment, + ShareConsumer consumer) { + + try { + processRecord(record); + acknowledgment.acknowledge(); // ACCEPT + } catch (RetryableException e) { + acknowledgment.release(); // Will be redelivered + } catch (Exception e) { + acknowledgment.reject(); // Permanent failure + } +} + + private void processRecord(ConsumerRecord record) { + // Business logic here + } +} +---- + +[[share-acknowledgment-constraints]] +=== Acknowledgment Constraints + +In explicit acknowledgment mode, the container enforces important constraints: + +Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged. +One-time Acknowledgment: Each record can only be acknowledged once. +Error Handling: If processing throws an exception, the record is automatically acknowledged as `REJECT`. + +[WARNING] +In explicit mode, failing to acknowledge records will block further message processing. +Always ensure records are acknowledged in all code paths. + +[[share-acknowledgment-examples]] +=== Acknowledgment Examples + +[[share-mixed-acknowledgment-example]] +==== Mixed Acknowledgment Patterns + +[source,java] +---- +@KafkaListener(topics = "order-processing", containerFactory = "explicitShareKafkaListenerContainerFactory") + public void processOrder(ConsumerRecord record, ShareAcknowledgment acknowledgment) { + String orderId = record.key(); + String orderData = record.value(); + try { + if (isValidOrder(orderData)) { + if (processOrder(orderData)) { + acknowledgment.acknowledge(); // Success - ACCEPT + } + else { + acknowledgment.release(); // Temporary failure - retry later + } + } + else { + acknowledgment.reject(); // Invalid order - don't retry + } + } + catch (Exception e) { + // Exception automatically triggers REJECT + throw e; + } +} +---- + +[[share-conditional-acknowledgment-example]] +==== Conditional Acknowledgment + +[source,java] +---- +@KafkaListener(topics = "data-validation", containerFactory = "explicitShareKafkaListenerContainerFactory") +public void validateData(ConsumerRecord record, ShareAcknowledgment acknowledgment) { + ValidationResult result = validator.validate(record.value()); + switch (result.getStatus()) { + case VALID: + acknowledgment.acknowledge(AcknowledgeType.ACCEPT); + break; + case INVALID_RETRYABLE: + acknowledgment.acknowledge(AcknowledgeType.RELEASE); + break; + case INVALID_PERMANENT: + acknowledgment.acknowledge(AcknowledgeType.REJECT); + break; + } +} +---- + +[[share-acknowledgment-configuration]] +=== Acknowledgment Mode Configuration + +You can configure the acknowledgment mode at both the consumer factory and container levels: + +[[share-factory-level-configuration]] +==== Factory Level Configuration + +[source,java] +---- +@Bean +public ShareConsumerFactory explicitAckShareConsumerFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + // Configure explicit acknowledgment at the factory level + props.put("share.acknowledgement.mode", "explicit"); + return new DefaultShareConsumerFactory<>(props); +} +---- + +[[share-container-level-configuration]] +==== Container Level Configuration + +[source,java] +---- +@Bean +public ShareKafkaListenerContainerFactory customShareKafkaListenerContainerFactory( + ShareConsumerFactory shareConsumerFactory) { + ShareKafkaListenerContainerFactory factory = + new ShareKafkaListenerContainerFactory<>(shareConsumerFactory); + + // Configure acknowledgment mode at container level + factory.getContainerProperties().setShareAcknowledgmentMode( + ContainerProperties.ShareAcknowledgmentMode.EXPLICIT); + + return factory; +} +---- [[share-differences-from-regular-consumers]] == Differences from Regular Consumers @@ -259,8 +489,9 @@ Share consumers differ from regular consumers in several key ways: 1. **No Partition Assignment**: Share consumers cannot be assigned specific partitions 2. **No Topic Patterns**: Share consumers do not support subscribing to topic patterns 3. **Cooperative Consumption**: Multiple consumers in the same share group can consume from the same partitions simultaneously -4. **Automatic Acknowledgment**: Records are automatically acknowledged after processing +4. **Record-Level Acknowledgment**: Supports explicit acknowledgment with `ACCEPT`, `RELEASE`, and `REJECT` types 5. **Different Group Management**: Share groups use different coordinator protocols +6. **No Batch Processing**: Share consumers process records individually, not in batches [[share-limitations-and-considerations]] == Limitations and Considerations @@ -268,7 +499,9 @@ Share consumers differ from regular consumers in several key ways: [[share-current-limitations]] === Current Limitations -* **Early Access**: This feature is in early access and may change in future versions -* **Limited Acknowledgment Options**: Only automatic `ACCEPT` acknowledgment is currently supported -* **No Message Converters**: Message converters are not yet supported for share consumers +* **In preview**: This feature is in preview mode and may change in future versions * **Single-Threaded**: Share consumer containers currently run in single-threaded mode +* **No Message Converters**: Message converters are not yet supported for share consumers +* **No Batch Listeners**: Batch processing is not supported with share consumers +* **Poll Constraints**: In explicit acknowledgment mode, unacknowledged records block subsequent polls + diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 6a1095a326..13b6391ed4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -89,6 +89,7 @@ import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint; +import org.springframework.kafka.config.ShareKafkaListenerContainerFactory; import org.springframework.kafka.listener.ContainerGroupSequencer; import org.springframework.kafka.listener.KafkaConsumerBackoffManager; import org.springframework.kafka.listener.KafkaListenerErrorHandler; @@ -651,6 +652,10 @@ protected void processListener(MethodKafkaListenerEndpoint endpoint, Kafka KafkaListenerContainerFactory listenerContainerFactory = resolveContainerFactory(kafkaListener, containerFactory, beanName); + if (listenerContainerFactory instanceof ShareKafkaListenerContainerFactory) { + endpoint.setShareConsumer(Boolean.TRUE); + } + this.registrar.registerEndpoint(endpoint, listenerContainerFactory); } @@ -685,6 +690,7 @@ private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint en if (StringUtils.hasText(kafkaListener.batch())) { endpoint.setBatchListener(Boolean.parseBoolean(kafkaListener.batch())); } + endpoint.setBeanFactory(this.beanFactory); resolveErrorHandler(endpoint, kafkaListener); resolveContentTypeConverter(endpoint, kafkaListener); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index 834b18de4b..f78586fe9d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -98,6 +98,8 @@ public abstract class AbstractKafkaListenerEndpoint private @Nullable Boolean batchListener; + private @Nullable Boolean shareConsumer; + private @Nullable KafkaTemplate replyTemplate; private @Nullable String clientIdPrefix; @@ -291,6 +293,14 @@ public void setBatchListener(boolean batchListener) { this.batchListener = batchListener; } + public void setShareConsumer(Boolean shareConsumer) { + this.shareConsumer = shareConsumer; + } + + public @Nullable Boolean getShareConsumer() { + return this.shareConsumer; + } + /** * Set the {@link KafkaTemplate} to use to send replies. * @param replyTemplate the template. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java index 3f08435831..ca00471285 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java @@ -36,6 +36,7 @@ import org.springframework.kafka.listener.adapter.HandlerAdapter; import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter; import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter; +import org.springframework.kafka.listener.adapter.ShareRecordMessagingMessageListenerAdapter; import org.springframework.kafka.support.JavaUtils; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.MessageConverter; @@ -210,7 +211,15 @@ protected MessagingMessageListenerAdapter createMessageListenerInstance( @Nullable MessageConverter messageConverter) { MessagingMessageListenerAdapter listener; - if (isBatchListener()) { + if (getShareConsumer() != null && getShareConsumer()) { + ShareRecordMessagingMessageListenerAdapter messageListener = new ShareRecordMessagingMessageListenerAdapter<>( + this.bean, this.method, this.errorHandler); + if (messageConverter instanceof RecordMessageConverter recordMessageConverter) { + messageListener.setMessageConverter(recordMessageConverter); + } + listener = messageListener; + } + else if (isBatchListener()) { BatchMessagingMessageListenerAdapter messageListener = new BatchMessagingMessageListenerAdapter<>( this.bean, this.method, this.errorHandler); BatchToRecordAdapter batchToRecordAdapter = getBatchToRecordAdapter(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java index fdfa68d6bb..f154d7e72e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java @@ -125,6 +125,15 @@ private void configureEndpoint(AbstractKafkaListenerEndpoint endpoint) { protected void initializeContainer(ShareKafkaMessageListenerContainer instance, KafkaListenerEndpoint endpoint) { ContainerProperties properties = instance.getContainerProperties(); Boolean effectiveAutoStartup = endpoint.getAutoStartup() != null ? endpoint.getAutoStartup() : this.autoStartup; + + // Validate share group configuration + validateShareConfiguration(endpoint); + + Object o = this.shareConsumerFactory.getConfigurationProperties().get("share.acknowledgement.mode"); + String explicitAck = null; + if (o != null) { + explicitAck = (String) o; + } JavaUtils.INSTANCE .acceptIfNotNull(effectiveAutoStartup, instance::setAutoStartup) .acceptIfNotNull(this.phase, instance::setPhase) @@ -132,7 +141,27 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer inst .acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher) .acceptIfNotNull(endpoint.getGroupId(), properties::setGroupId) .acceptIfNotNull(endpoint.getClientIdPrefix(), properties::setClientId) - .acceptIfNotNull(endpoint.getConsumerProperties(), properties::setKafkaConsumerProperties); + .acceptIfCondition(explicitAck != null && explicitAck.equals("explicit"), + ContainerProperties.ShareAcknowledgmentMode.EXPLICIT, + properties::setShareAcknowledgmentMode); + } + + private void validateShareConfiguration(KafkaListenerEndpoint endpoint) { + // Validate that batch listeners aren't used with share consumers + if (endpoint.getBatchListener() != null && endpoint.getBatchListener()) { + throw new IllegalArgumentException( + "Batch listeners are not supported with share consumers. " + + "Share groups operate at the record level."); + } + + // Validate acknowledgment mode consistency + Object ackMode = this.shareConsumerFactory.getConfigurationProperties() + .get("share.acknowledgement.mode"); + if (ackMode != null && !Arrays.asList("implicit", "explicit").contains(ackMode)) { + throw new IllegalArgumentException( + "Invalid share.acknowledgement.mode: " + ackMode + + ". Must be 'implicit' or 'explicit'"); + } } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingShareConsumerAwareMessageListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingShareConsumerAwareMessageListener.java new file mode 100644 index 0000000000..8455712dc1 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingShareConsumerAwareMessageListener.java @@ -0,0 +1,59 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.jspecify.annotations.Nullable; + +import org.springframework.kafka.support.ShareAcknowledgment; + +/** + * A message listener for share consumer containers that supports explicit acknowledgment. + *

+ * This interface provides both access to the ShareConsumer and explicit acknowledgment + * capabilities. When used with explicit acknowledgment mode, the acknowledgment parameter + * will be non-null and must be used to acknowledge each record. In implicit mode, + * the acknowledgment parameter will be null and records are auto-acknowledged. + * + * @param the key type + * @param the value type + * @author Soby Chacko + * @since 4.0 + * @see ShareAcknowledgment + * @see ContainerProperties.ShareAcknowledgmentMode + */ +@FunctionalInterface +public interface AcknowledgingShareConsumerAwareMessageListener extends GenericMessageListener> { + + /** + * Invoked with data from kafka, an acknowledgment, and provides access to the consumer. + * When explicit acknowledgment mode is used, the acknowledgment parameter will be non-null + * and must be used to acknowledge the record. When implicit acknowledgment mode is used, + * the acknowledgment parameter will be null. + * + * @param data the data to be processed. + * @param acknowledgment the acknowledgment (nullable in implicit mode). + * @param consumer the consumer. + */ + void onShareRecord(ConsumerRecord data, @Nullable ShareAcknowledgment acknowledgment, ShareConsumer consumer); + + @Override + default void onMessage(ConsumerRecord data) { + throw new UnsupportedOperationException("Container should never call this"); + } +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java index 201ab2298a..65f4110bc7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java @@ -35,6 +35,7 @@ import org.springframework.aop.framework.ProxyFactory; import org.springframework.aop.support.AopUtils; import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.kafka.support.ShareAcknowledgment; import org.springframework.kafka.support.TopicPartitionOffset; import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention; import org.springframework.kafka.transaction.KafkaAwareTransactionManager; @@ -160,6 +161,31 @@ public enum EOSMode { } + /** + * Acknowledgment mode for share consumer containers. + */ + public enum ShareAcknowledgmentMode { + /** + * Records are automatically acknowledged as ACCEPT on next poll, commitSync, or commitAsync. + */ + IMPLICIT("implicit"), + + /** + * Application must explicitly acknowledge all records before next poll. + */ + EXPLICIT("explicit"); + + private final String mode; + + ShareAcknowledgmentMode(String mode) { + this.mode = mode; + } + + public String getMode() { + return this.mode; + } + } + /** * The default {@link #setShutdownTimeout(long) shutDownTimeout} (ms). */ @@ -313,6 +339,8 @@ public enum EOSMode { private boolean recordObservationsInBatch; + private ShareAcknowledgmentMode shareAcknowledgmentMode = ShareAcknowledgmentMode.IMPLICIT; + /** * Create properties for a container that will subscribe to the specified topics. * @param topics the topics. @@ -1115,6 +1143,37 @@ public void setRecordObservationsInBatch(boolean recordObservationsInBatch) { this.recordObservationsInBatch = recordObservationsInBatch; } + /** + * Set the acknowledgment mode for share consumer containers. + *

+ * This setting only applies to share consumer containers and is ignored + * by regular consumer containers. The acknowledgment mode determines + * how records are acknowledged: + *

    + *
  • {@link ShareAcknowledgmentMode#IMPLICIT} - Records are automatically + * acknowledged as ACCEPT when the next poll occurs or when commitSync/commitAsync + * is called
  • + *
  • {@link ShareAcknowledgmentMode#EXPLICIT} - Application must explicitly + * acknowledge each record using the provided {@link ShareAcknowledgment}
  • + *
+ * + * @param shareAcknowledgmentMode the acknowledgment mode + * @since 4.0 + * @see ShareAcknowledgment + */ + public void setShareAcknowledgmentMode(ShareAcknowledgmentMode shareAcknowledgmentMode) { + this.shareAcknowledgmentMode = shareAcknowledgmentMode; + } + + /** + * Get the acknowledgment mode for share consumer containers. + * + * @return the acknowledgment mode + */ + public ShareAcknowledgmentMode getShareAcknowledgmentMode() { + return this.shareAcknowledgmentMode; + } + @Override public String toString() { StringBuilder sb = new StringBuilder("ContainerProperties ["); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareConsumerAwareMessageListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareConsumerAwareMessageListener.java new file mode 100644 index 0000000000..281286c309 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareConsumerAwareMessageListener.java @@ -0,0 +1,51 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ShareConsumer; + +/** + * A message listener for share consumer containers that provides access to the ShareConsumer. + *

+ * This interface extends the basic message listener to provide access to the underlying + * ShareConsumer instance, enabling advanced use cases such as consumer metrics access + * or manual acknowledgment operations. + * + * @param the key type + * @param the value type + * @author Soby Chacko + * @since 4.0 + * @see ShareConsumer + * @see AcknowledgingShareConsumerAwareMessageListener + */ +@FunctionalInterface +public interface ShareConsumerAwareMessageListener extends GenericMessageListener> { + + /** + * Invoked with data from kafka and provides access to the share consumer. + * + * @param data the data to be processed. + * @param consumer the share consumer. + */ + void onShareRecord(ConsumerRecord data, ShareConsumer consumer); + + @Override + default void onMessage(ConsumerRecord data) { + throw new UnsupportedOperationException("Container should never call this"); + } +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java index f94c78c099..6167143dd4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java @@ -20,10 +20,13 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.consumer.AcknowledgeType; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ShareConsumer; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; @@ -36,20 +39,41 @@ import org.springframework.kafka.core.ShareConsumerFactory; import org.springframework.kafka.event.ConsumerStartedEvent; import org.springframework.kafka.event.ConsumerStartingEvent; +import org.springframework.kafka.support.ShareAcknowledgment; +import org.springframework.kafka.support.ShareAcknowledgmentException; import org.springframework.util.Assert; /** - * {@code ShareKafkaMessageListenerContainer} is a message listener container for Kafka's share consumer model. + * Single-threaded share consumer container using the Java {@link ShareConsumer}. *

- * This container manages a single-threaded consumer loop using a {@link org.springframework.kafka.core.ShareConsumerFactory}. - * It is designed for use cases where Kafka's cooperative sharing protocol is desired, and provides a simple polling loop - * with per-record dispatch and acknowledgement. + * This container provides support for Kafka share groups, enabling cooperative + * consumption where multiple consumers can process records from the same partitions. + * Unlike traditional consumer groups with exclusive partition assignment, share groups + * allow load balancing at the record level. + *

+ * Key features: + *

    + *
  • Explicit and implicit acknowledgment modes
  • + *
  • Automatic error handling with REJECT acknowledgments
  • + *
  • Poll-level acknowledgment constraints in explicit mode
  • + *
  • Integration with Spring's {@code @KafkaListener} annotation
  • + *
+ *

+ * Acknowledgment Modes: + *

    + *
  • Implicit: Records are automatically acknowledged as ACCEPT + * after successful processing or REJECT on errors
  • + *
  • Explicit: Application must manually acknowledge each record; + * subsequent polls are blocked until all records from the previous poll are acknowledged
  • + *
* * @param the key type * @param the value type - * * @author Soby Chacko * @since 4.0 + * @see ShareConsumer + * @see ShareAcknowledgment + * @see ContainerProperties.ShareAcknowledgmentMode */ public class ShareKafkaMessageListenerContainer extends AbstractShareKafkaMessageListenerContainer { @@ -165,6 +189,11 @@ private class ShareListenerConsumer implements Runnable { private final @Nullable String clientId; + // Acknowledgment tracking for explicit mode + private final Map, ShareConsumerAcknowledgment> pendingAcknowledgments = new ConcurrentHashMap<>(); + + private final boolean isExplicitMode; + ShareListenerConsumer(GenericMessageListener listener) { this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer( ShareKafkaMessageListenerContainer.this.getGroupId(), @@ -173,6 +202,18 @@ private class ShareListenerConsumer implements Runnable { this.genericListener = listener; this.clientId = ShareKafkaMessageListenerContainer.this.getClientId(); ContainerProperties containerProperties = getContainerProperties(); + + // Configure acknowledgment mode + this.isExplicitMode = containerProperties.getShareAcknowledgmentMode() == + ContainerProperties.ShareAcknowledgmentMode.EXPLICIT; + + // Configure consumer properties based on acknowledgment mode + if (this.isExplicitMode) { + // Apply explicit mode configuration to consumer + // Note: This should ideally be done during consumer creation in the factory + this.logger.info(() -> "Share consumer configured for explicit acknowledgment mode"); + } + this.consumer.subscribe(Arrays.asList(containerProperties.getTopics())); } @@ -188,22 +229,18 @@ public void run() { Throwable exitThrowable = null; while (isRunning()) { try { + // Check acknowledgment constraints before polling + if (this.isExplicitMode && !this.pendingAcknowledgments.isEmpty()) { + // In explicit mode, all records from previous poll must be acknowledged + this.logger.warn(() -> "Skipping poll - " + this.pendingAcknowledgments.size() + + " records from previous poll still need acknowledgment"); + Thread.sleep(100); // Brief pause to avoid tight loop + continue; + } + var records = this.consumer.poll(java.time.Duration.ofMillis(POLL_TIMEOUT)); if (records != null && records.count() > 0) { - for (var record : records) { - if (this.genericListener instanceof AcknowledgingConsumerAwareMessageListener ackListener) { - ackListener.onMessage(record, null, null); - } - else { - GenericMessageListener> listener = - (GenericMessageListener>) this.genericListener; - listener.onMessage(record); - } - // Temporarily auto-acknowledge and commit. - // We will refactor it later on to support more production-like scenarios. - this.consumer.acknowledge(record, AcknowledgeType.ACCEPT); - this.consumer.commitSync(); - } + processRecords(records); } } catch (Error e) { @@ -226,6 +263,98 @@ public void run() { wrapUp(); } + private void processRecords(ConsumerRecords records) { + for (var record : records) { + ShareConsumerAcknowledgment acknowledgment = null; + + try { + if (this.isExplicitMode) { + // Create acknowledgment using inner class + acknowledgment = new ShareConsumerAcknowledgment(record); + this.pendingAcknowledgments.put(record, acknowledgment); + } + + // Dispatch to listener + if (this.genericListener instanceof AcknowledgingShareConsumerAwareMessageListener ackListener) { + @SuppressWarnings("unchecked") + AcknowledgingShareConsumerAwareMessageListener typedAckListener = + (AcknowledgingShareConsumerAwareMessageListener) ackListener; + typedAckListener.onShareRecord(record, acknowledgment, this.consumer); // Changed method name + } + else if (this.genericListener instanceof ShareConsumerAwareMessageListener consumerAwareListener) { + @SuppressWarnings("unchecked") + ShareConsumerAwareMessageListener typedConsumerAwareListener = + (ShareConsumerAwareMessageListener) consumerAwareListener; + typedConsumerAwareListener.onShareRecord(record, this.consumer); // Changed method name + } + else { + // Basic listener remains the same + @SuppressWarnings("unchecked") + GenericMessageListener> listener = + (GenericMessageListener>) this.genericListener; + listener.onMessage(record); + } + + // Handle acknowledgment based on mode + if (!this.isExplicitMode) { + // In implicit mode, auto-acknowledge as ACCEPT + this.consumer.acknowledge(record, AcknowledgeType.ACCEPT); + } + } + catch (Exception e) { + handleProcessingError(record, acknowledgment, e); + } + } + // Commit acknowledgments + commitAcknowledgments(); + } + + private void handleProcessingError(ConsumerRecord record, + @Nullable ShareConsumerAcknowledgment acknowledgment, Exception e) { + this.logger.error(e, "Error processing record: " + record); + + if (this.isExplicitMode && acknowledgment != null) { + // Remove from pending and auto-reject on error + this.pendingAcknowledgments.remove(record); + try { + acknowledgment.reject(); + } + catch (Exception ackEx) { + this.logger.error(ackEx, "Failed to reject record after processing error"); + } + } + else { + // In implicit mode, auto-reject on error + try { + this.consumer.acknowledge(record, AcknowledgeType.REJECT); + } + catch (Exception ackEx) { + this.logger.error(ackEx, "Failed to reject record after processing error"); + } + } + } + + private void commitAcknowledgments() { + try { + this.consumer.commitSync(); + } + catch (Exception e) { + this.logger.error(e, "Failed to commit acknowledgments"); + } + } + + /** + * Called by ShareConsumerAcknowledgment when a record is acknowledged in explicit mode. + * + * @param record the record that was acknowledged + */ + void onRecordAcknowledged(ConsumerRecord record) { + if (this.isExplicitMode) { + this.pendingAcknowledgments.remove(record); + this.logger.debug(() -> "Record acknowledged, " + this.pendingAcknowledgments.size() + " still pending"); + } + } + protected void initialize() { publishConsumerStartingEvent(); publishConsumerStartedEvent(); @@ -244,6 +373,72 @@ public String toString() { + "]"; } + /** + * Inner acknowledgment class that integrates directly with the container. + */ + private class ShareConsumerAcknowledgment implements ShareAcknowledgment { + + private final ConsumerRecord record; + + private final AtomicReference acknowledgmentType = new AtomicReference<>(); + + ShareConsumerAcknowledgment(ConsumerRecord record) { + this.record = record; + } + + @Override + public void acknowledge(AcknowledgeType type) { + Assert.notNull(type, "AcknowledgeType cannot be null"); + + if (!this.acknowledgmentType.compareAndSet(null, type)) { + throw new IllegalStateException( + String.format("Record at offset %d has already been acknowledged with type %s", + this.record.offset(), this.acknowledgmentType.get())); + } + + try { + // Direct access to container's consumer + ShareKafkaMessageListenerContainer.this.listenerConsumer.consumer.acknowledge(this.record, type); + + // Direct notification to container + ShareKafkaMessageListenerContainer.this.listenerConsumer.onRecordAcknowledged(this.record); + + } + catch (Exception e) { + // Reset state if acknowledgment failed + this.acknowledgmentType.set(null); + throw new ShareAcknowledgmentException( + "Failed to acknowledge record at offset " + this.record.offset(), e); + } + } + + @Override + public boolean isAcknowledged() { + return this.acknowledgmentType.get() != null; + } + + @Override + @Nullable + public AcknowledgeType getAcknowledgmentType() { + return this.acknowledgmentType.get(); + } + + ConsumerRecord getRecord() { + return this.record; + } + + @Override + public String toString() { + return "ShareConsumerAcknowledgment{" + + "topic=" + this.record.topic() + + ", partition=" + this.record.partition() + + ", offset=" + this.record.offset() + + ", acknowledged=" + isAcknowledged() + + ", type=" + getAcknowledgmentType() + + '}'; + } + } + } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 9e95c6a0af..ecb68ff431 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -35,9 +35,11 @@ import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationRegistry; import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.consumer.AcknowledgeType; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.ShareConsumer; import org.apache.kafka.common.TopicPartition; import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; @@ -61,6 +63,7 @@ import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaNull; import org.springframework.kafka.support.KafkaUtils; +import org.springframework.kafka.support.ShareAcknowledgment; import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.messaging.Message; @@ -413,6 +416,12 @@ protected Message toMessagingMessage(ConsumerRecord cRecord, @Nullable return getMessageConverter().toMessage(cRecord, acknowledgment, consumer, getType()); } + protected Message toMessagingMessage(ConsumerRecord cRecord, @Nullable ShareAcknowledgment acknowledgment, + @Nullable ShareConsumer consumer) { + + return getMessageConverter().toShareMessage(cRecord, acknowledgment, consumer, getType()); + } + protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, @Nullable Consumer consumer, final Message message) { @@ -442,6 +451,33 @@ protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, @ } } + protected void invoke(Object records, @Nullable ShareAcknowledgment acknowledgment, @Nullable ShareConsumer consumer, + final Message message) { + + Throwable listenerError = null; + Object result = null; + Observation currentObservation = getCurrentObservation(); + try { + result = invokeHandler(records, acknowledgment, message, consumer); + //TODO: How it should handle results with queues? We will tackle it later after some careful evaluation. + } + catch (ListenerExecutionFailedException e) { + listenerError = e; + currentObservation.error(e.getCause() != null ? e.getCause() : e); + throw e; // throw the error back to the container so that it is handled there for share consumer. + } + catch (Error e) { + listenerError = e; + currentObservation.error(e); + throw e; + } + finally { + if (listenerError != null || result == null) { + currentObservation.stop(); + } + } + } + private Observation getCurrentObservation() { Observation currentObservation = this.observationRegistry.getCurrentObservation(); return currentObservation == null ? Observation.NOOP : currentObservation; @@ -493,6 +529,49 @@ else if (this.hasMetadataParameter) { } } + /** + * Invoke the handler, wrapping any exception to a {@link ListenerExecutionFailedException} + * with a dedicated error message. + * @param data the data to process during invocation. + * @param acknowledgment the acknowledgment to use if any. + * @param message the message to process. + * @param consumer the consumer. + * @return the result of invocation. + */ + @Nullable + protected final Object invokeHandler(Object data, @Nullable ShareAcknowledgment acknowledgment, Message message, + @Nullable ShareConsumer consumer) { + + ShareAcknowledgment ack = acknowledgment; + Assert.notNull(this.handlerMethod, "the 'handlerMethod' must not be null"); + try { + if (data instanceof List && !this.isConsumerRecordList) { + return this.handlerMethod.invoke(message, ack, consumer); + } + else if (this.hasMetadataParameter) { + return this.handlerMethod.invoke(message, data, ack, consumer, + AdapterUtils.buildConsumerRecordMetadata(data)); + } + else { + return this.handlerMethod.invoke(message, data, ack, consumer); + } + } + catch (MessageConversionException ex) { + throw checkAckArg(ack, message, new MessageConversionException("Cannot handle message", ex)); + } + catch (MethodArgumentNotValidException ex) { + throw checkAckArg(ack, message, ex); + } + catch (MessagingException ex) { + throw new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " + + "be invoked with the incoming message", message.getPayload()), ex); + } + catch (Exception ex) { + throw new ListenerExecutionFailedException("Listener method '" + + this.handlerMethod.getMethodAsString(message.getPayload()) + "' threw exception", ex); + } + } + private RuntimeException checkAckArg(@Nullable Acknowledgment acknowledgment, Message message, Exception ex) { if (this.hasAckParameter && acknowledgment == null) { return new ListenerExecutionFailedException("invokeHandler Failed", @@ -503,6 +582,16 @@ private RuntimeException checkAckArg(@Nullable Acknowledgment acknowledgment, Me "be invoked with the incoming message", message.getPayload()), ex); } + private RuntimeException checkAckArg(@Nullable ShareAcknowledgment acknowledgment, Message message, Exception ex) { + if (this.hasAckParameter && acknowledgment == null) { + return new ListenerExecutionFailedException("invokeHandler Failed", + new IllegalStateException("No ShareAcknowledgment available as an argument, " + + "the listener container must have an explicit acknowledgement mode to populate the Acknowledgment.")); + } + return new ListenerExecutionFailedException(createMessagingErrorMessage("Listener method could not " + + "be invoked with the incoming message", message.getPayload()), ex); + } + /** * Handle the given result object returned from the listener method, sending a * response message to the SendTo topic. @@ -847,6 +936,9 @@ protected Type determineInferredType(@Nullable Method method) { // NOSONAR compl Type parameterType = methodParameter.getGenericParameterType(); boolean isNotConvertible = parameterIsType(parameterType, ConsumerRecord.class); boolean isAck = parameterIsType(parameterType, Acknowledgment.class); + if (!isAck) { + isAck = parameterIsType(parameterType, ShareAcknowledgment.class); + } this.hasAckParameter |= isAck; if (isAck) { this.noOpAck |= methodParameter.getParameterAnnotation(NonNull.class) != null; @@ -961,4 +1053,27 @@ public void acknowledge() { } + static class NoOpShareAck implements ShareAcknowledgment { + + @Override + public void acknowledge(AcknowledgeType type) { + + } + + @Override + public void acknowledge() { + } + + @Override + public boolean isAcknowledged() { + return false; + } + + @Override + public @Nullable AcknowledgeType getAcknowledgmentType() { + return null; + } + + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ShareRecordMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ShareRecordMessagingMessageListenerAdapter.java new file mode 100644 index 0000000000..d097f600a7 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ShareRecordMessagingMessageListenerAdapter.java @@ -0,0 +1,92 @@ +/* + * Copyright 2002-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener.adapter; + +import java.lang.reflect.Method; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.jspecify.annotations.Nullable; + +import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener; +import org.springframework.kafka.listener.AcknowledgingShareConsumerAwareMessageListener; +import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.support.ShareAcknowledgment; +import org.springframework.kafka.support.converter.JacksonProjectingMessageConverter; +import org.springframework.kafka.support.converter.ProjectingMessageConverter; +import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.messaging.Message; + +/** + * A {@link org.springframework.kafka.listener.MessageListener MessageListener} + * adapter that invokes a configurable {@link HandlerAdapter}; used when the factory is + * configured for the listener to receive individual messages from share groups. + * + *

Wraps the incoming Kafka Message to Spring's {@link Message} abstraction. + * + *

The original {@link ConsumerRecord} and + * the {@link ShareAcknowledgment} are provided as additional arguments so that these can + * be injected as method arguments if necessary. + * + * @param the key type. + * @param the value type. + * @author Soby Chacko + * @since 4.0 + */ +public class ShareRecordMessagingMessageListenerAdapter extends MessagingMessageListenerAdapter + implements AcknowledgingShareConsumerAwareMessageListener { + + public ShareRecordMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Method method) { + this(bean, method, null); + } + + public ShareRecordMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Method method, + @Nullable KafkaListenerErrorHandler errorHandler) { + super(bean, method, errorHandler); + } + + /** + * Kafka {@link AcknowledgingConsumerAwareMessageListener} entry point. + *

Delegate the message to the target listener method, + * with appropriate conversion of the message argument. + * @param record the incoming Kafka {@link ConsumerRecord}. + * @param acknowledgment the acknowledgment. + * @param consumer the consumer. + */ + @Override + @SuppressWarnings("removal") + public void onShareRecord(ConsumerRecord record, @Nullable ShareAcknowledgment acknowledgment, + @Nullable ShareConsumer consumer) { + + Message message; + if (isConversionNeeded()) { + message = toMessagingMessage(record, acknowledgment, consumer); + } + else { + message = NULL_MESSAGE; + } + if (logger.isDebugEnabled()) { + RecordMessageConverter messageConverter = getMessageConverter(); + if (!(messageConverter instanceof JacksonProjectingMessageConverter + || messageConverter instanceof ProjectingMessageConverter)) { + this.logger.debug("Processing [" + message + "]"); + } + } + invoke(record, acknowledgment, consumer, message); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/ShareAcknowledgment.java b/spring-kafka/src/main/java/org/springframework/kafka/support/ShareAcknowledgment.java new file mode 100644 index 0000000000..9622758c24 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/ShareAcknowledgment.java @@ -0,0 +1,114 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support; + +import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.jspecify.annotations.Nullable; + +/** + * A handle for acknowledging the delivery of a record when using share groups. + *

+ * Share groups enable cooperative consumption where multiple consumers can process + * records from the same partitions. Each record must be explicitly acknowledged + * to indicate the result of processing. + *

+ * Acknowledgment types: + *

    + *
  • {@link AcknowledgeType#ACCEPT} - Record processed successfully
  • + *
  • {@link AcknowledgeType#RELEASE} - Temporary failure, make available for retry
  • + *
  • {@link AcknowledgeType#REJECT} - Permanent failure, do not retry
  • + *
+ *

+ * This interface is only applicable when using explicit acknowledgment mode + * ({@code share.acknowledgement.mode=explicit}). In implicit mode, records are + * automatically acknowledged as {@link AcknowledgeType#ACCEPT}. + *

+ * Note: Acknowledgment is separate from commit operations. After acknowledging + * records, use {@code commitSync()} or {@code commitAsync()} to persist the + * acknowledgments to the broker. + * + * @author Soby Chacko + * @since 4.0 + * @see AcknowledgeType + */ +public interface ShareAcknowledgment { + + /** + * Acknowledge the delivery of the record with the specified type. + *

+ * The acknowledgment will be committed when: + *

    + *
  • The next {@code poll()} is called (batched with fetch)
  • + *
  • {@code commitSync()} or {@code commitAsync()} is explicitly called
  • + *
  • The consumer is closed
  • + *
+ * + * @param type the acknowledgment type indicating the result of processing + * @throws IllegalStateException if the record has already been acknowledged + * @throws IllegalArgumentException if the acknowledgment type is null + */ + void acknowledge(AcknowledgeType type); + + /** + * Acknowledge the record as successfully processed. + *

+ * This is equivalent to {@code acknowledge(AcknowledgeType.ACCEPT)}. + * The record will be marked as completed and will not be redelivered. + * + * @throws IllegalStateException if the record has already been acknowledged + * @since 4.0 + */ + default void acknowledge() { + acknowledge(AcknowledgeType.ACCEPT); + } + + /** + * Release the record for redelivery due to a transient failure. + *

+ * This is a convenience method equivalent to {@code acknowledge(AcknowledgeType.RELEASE)}. + * The record will be made available for another delivery attempt. + */ + default void release() { + acknowledge(AcknowledgeType.RELEASE); + } + + /** + * Reject the record due to a permanent failure. + *

+ * This is a convenience method equivalent to {@code acknowledge(AcknowledgeType.REJECT)}. + * The record will not be delivered again and will be archived. + */ + default void reject() { + acknowledge(AcknowledgeType.REJECT); + } + + /** + * Check if this record has already been acknowledged. + * + * @return true if the record has been acknowledged, false otherwise + */ + boolean isAcknowledged(); + + /** + * Get the acknowledgment type that was used to acknowledge this record. + * + * @return the acknowledgment type, or null if not yet acknowledged + */ + @Nullable + AcknowledgeType getAcknowledgmentType(); + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/ShareAcknowledgmentException.java b/spring-kafka/src/main/java/org/springframework/kafka/support/ShareAcknowledgmentException.java new file mode 100644 index 0000000000..72a4850b1c --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/ShareAcknowledgmentException.java @@ -0,0 +1,57 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support; + +import org.springframework.kafka.KafkaException; + +/** + * Exception thrown when share group record acknowledgment fails. + *

+ * This exception indicates that an attempt to acknowledge a record + * in a share group encountered an error. Common causes include: + *

    + *
  • Network issues communicating with the broker
  • + *
  • Record state conflicts (e.g., record already acknowledged by another consumer)
  • + *
  • Share session errors
  • + *
  • Broker-side errors processing the acknowledgment
  • + *
+ * + * @author Soby Chacko + * @since 4.0 + */ +public class ShareAcknowledgmentException extends KafkaException { + + /** + * Construct an instance with the provided message. + * + * @param message the message + */ + public ShareAcknowledgmentException(String message) { + super(message); + } + + /** + * Construct an instance with the provided message and cause. + * + * @param message the message + * @param cause the cause + */ + public ShareAcknowledgmentException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java index 381062cd2c..1640168867 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java @@ -19,12 +19,14 @@ import java.util.Map; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ShareConsumer; import org.jspecify.annotations.Nullable; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.JavaUtils; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaUtils; +import org.springframework.kafka.support.ShareAcknowledgment; /** * A top level interface for message converters. @@ -74,4 +76,22 @@ default void commonHeaders(@Nullable Acknowledgment acknowledgment, @Nullable Co .acceptIfNotNull(KafkaHeaders.CONSUMER, consumer, (key, val) -> rawHeaders.put(key, val)); } + @SuppressWarnings("NullAway") // Dataflow analysis limitation + default void commonHeaders(@Nullable ShareAcknowledgment acknowledgment, @Nullable ShareConsumer consumer, Map rawHeaders, + @Nullable Object theKey, Object topic, Object partition, Object offset, + @Nullable Object timestampType, Object timestamp) { + + rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, topic); + rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION, partition); + rawHeaders.put(KafkaHeaders.OFFSET, offset); + rawHeaders.put(KafkaHeaders.TIMESTAMP_TYPE, timestampType); + rawHeaders.put(KafkaHeaders.RECEIVED_TIMESTAMP, timestamp); + JavaUtils.INSTANCE + .acceptIfNotNull(KafkaHeaders.RECEIVED_KEY, theKey, (key, val) -> rawHeaders.put(key, val)) + .acceptIfNotNull(KafkaHeaders.GROUP_ID, MessageConverter.getGroupId(), + (key, val) -> rawHeaders.put(key, val)) + .acceptIfNotNull(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment, (key, val) -> rawHeaders.put(key, val)) + .acceptIfNotNull(KafkaHeaders.CONSUMER, consumer, (key, val) -> rawHeaders.put(key, val)); + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java index e1c2445cef..7b3c40930d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ShareConsumer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; @@ -40,6 +41,7 @@ import org.springframework.kafka.support.KafkaHeaderMapper; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaNull; +import org.springframework.kafka.support.ShareAcknowledgment; import org.springframework.kafka.support.SimpleKafkaHeaderMapper; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; @@ -209,6 +211,35 @@ public Message toMessage(ConsumerRecord record, @Nullable Acknowledgmen return message; } + @Override + public Message toShareMessage(ConsumerRecord record, @Nullable ShareAcknowledgment acknowledgment, @Nullable ShareConsumer consumer, + @Nullable Type type) { + + KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, + this.generateTimestamp); + + Map rawHeaders = kafkaMessageHeaders.getRawHeaders(); + if (record.headers() != null) { + mapOrAddHeaders(record, rawHeaders); + } + String ttName = record.timestampType() != null ? record.timestampType().name() : null; + commonHeaders(acknowledgment, consumer, rawHeaders, record.key(), record.topic(), record.partition(), + record.offset(), ttName, record.timestamp()); + if (this.rawRecordHeader) { + rawHeaders.put(KafkaHeaders.RAW_DATA, record); + } + Message message = MessageBuilder.createMessage(extractAndConvertValue(record, type), kafkaMessageHeaders); + if (this.messagingConverter != null && !message.getPayload().equals(KafkaNull.INSTANCE)) { + Class clazz = type instanceof Class ? (Class) type : type instanceof ParameterizedType + ? (Class) ((ParameterizedType) type).getRawType() : Object.class; + Object payload = this.messagingConverter.fromMessage(message, clazz, type); + if (payload != null) { + message = new GenericMessage<>(payload, message.getHeaders()); + } + } + return message; + } + private void mapOrAddHeaders(ConsumerRecord record, Map rawHeaders) { if (this.headerMapper != null) { this.headerMapper.toHeaders(record.headers(), rawHeaders); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/RecordMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/RecordMessageConverter.java index fc641570e1..5e8e327cf0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/RecordMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/RecordMessageConverter.java @@ -20,11 +20,13 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ShareConsumer; import org.apache.kafka.clients.producer.ProducerRecord; import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.ShareAcknowledgment; import org.springframework.messaging.Message; /** @@ -47,6 +49,10 @@ public interface RecordMessageConverter extends MessageConverter { Message toMessage(ConsumerRecord record, @Nullable Acknowledgment acknowledgment, @Nullable Consumer consumer, @Nullable Type payloadType); + @NonNull + Message toShareMessage(ConsumerRecord record, @Nullable ShareAcknowledgment acknowledgment, @Nullable ShareConsumer consumer, + @Nullable Type payloadType); + /** * Convert a message to a producer record. * @param message the message. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index aa8f67da07..75ecd35ad1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -58,6 +58,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.ShareConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.TopicExistsException; @@ -123,6 +124,7 @@ import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaNull; import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.support.ShareAcknowledgment; import org.springframework.kafka.support.TopicPartitionOffset; import org.springframework.kafka.support.converter.JacksonJsonMessageConverter; import org.springframework.kafka.support.converter.JacksonProjectingMessageConverter; @@ -1304,6 +1306,11 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle throw new UnsupportedOperationException(); } + @Override + public @NonNull Message toShareMessage(ConsumerRecord record, @Nullable ShareAcknowledgment acknowledgment, @Nullable ShareConsumer consumer, @Nullable Type payloadType) { + return null; + } + @Override public ProducerRecord fromMessage(Message message, String defaultTopic) { throw new UnsupportedOperationException(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java index 8ddadbcedb..8298258dba 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java @@ -18,21 +18,25 @@ import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AlterConfigOp; -import org.apache.kafka.clients.admin.AlterConfigsResult; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.consumer.AcknowledgeType; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ShareConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -46,6 +50,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.core.ShareConsumerFactory; +import org.springframework.kafka.support.ShareAcknowledgment; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.annotation.DirtiesContext; @@ -54,48 +59,146 @@ import static org.assertj.core.api.Assertions.assertThat; /** - * Integration tests for share Kafka listener. + * Enhanced integration tests for @KafkaListener with share consumer acknowledgment features. * * @author Soby Chacko * @since 4.0 */ @SpringJUnitConfig @DirtiesContext -@EmbeddedKafka(topics = "share-listener-integration-test", +@EmbeddedKafka(topics = { + "share-listener-basic-test", + "share-listener-explicit-ack-test", + "share-listener-consumer-aware-test", + "share-listener-ack-consumer-aware-test", + "share-listener-mixed-ack-test", + "share-listener-error-handling-test" +}, brokerProperties = { "share.coordinator.state.topic.replication.factor=1", "share.coordinator.state.topic.min.isr=1" }) class ShareKafkaListenerIntegrationTests { - private static final CountDownLatch latch = new CountDownLatch(1); - - private static final AtomicReference received = new AtomicReference<>(); - @Autowired EmbeddedKafkaBroker broker; + @Autowired + KafkaTemplate kafkaTemplate; + + @Test + void shouldSupportBasicShareKafkaListener() throws Exception { + final String topic = "share-listener-basic-test"; + final String groupId = "share-listener-basic-group"; + setShareAutoOffsetResetEarliest(this.broker.getBrokersAsString(), groupId); + + // Send test message + kafkaTemplate.send(topic, "basic-test-message"); + + // Wait for processing + assertThat(BasicTestListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(BasicTestListener.received.get()).isEqualTo("basic-test-message"); + } + + @Test + void shouldSupportExplicitAcknowledgmentWithShareAcknowledgment() throws Exception { + final String topic = "share-listener-explicit-ack-test"; + final String groupId = "share-explicit-ack-group"; + setShareAutoOffsetResetEarliest(this.broker.getBrokersAsString(), groupId); + + // Send test messages + kafkaTemplate.send(topic, "accept", "accept-message"); + kafkaTemplate.send(topic, "release", "release-message"); + kafkaTemplate.send(topic, "reject", "reject-message"); + + // Wait for processing + assertThat(ExplicitAckTestListener.latch.await(15, TimeUnit.SECONDS)).isTrue(); + assertThat(ExplicitAckTestListener.redeliveryLatch.await(15, TimeUnit.SECONDS)).isTrue(); + + // Verify acknowledgment types were used correctly + assertThat(ExplicitAckTestListener.acknowledgmentTypes).containsKey("accept"); + assertThat(ExplicitAckTestListener.acknowledgmentTypes).containsKey("reject"); + assertThat(ExplicitAckTestListener.acknowledgmentTypes.get("accept")).isEqualTo(AcknowledgeType.ACCEPT); + assertThat(ExplicitAckTestListener.acknowledgmentTypes.get("reject")).isEqualTo(AcknowledgeType.REJECT); + + // The release message should have been redelivered and then accepted + assertThat(ExplicitAckTestListener.redeliveredAndAccepted.get()).isTrue(); + } + + @Test + void shouldSupportShareConsumerAwareListener() throws Exception { + final String topic = "share-listener-consumer-aware-test"; + final String groupId = "share-consumer-aware-group"; + setShareAutoOffsetResetEarliest(this.broker.getBrokersAsString(), groupId); + + // Send test message + kafkaTemplate.send(topic, "consumer-aware-message"); + + // Wait for processing + assertThat(ShareConsumerAwareTestListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(ShareConsumerAwareTestListener.received.get()).isEqualTo("consumer-aware-message"); + assertThat(ShareConsumerAwareTestListener.consumerReceived.get()).isNotNull(); + } + + @Test + void shouldSupportAcknowledgingShareConsumerAwareListener() throws Exception { + final String topic = "share-listener-ack-consumer-aware-test"; + final String groupId = "share-ack-consumer-aware-group"; + setShareAutoOffsetResetEarliest(this.broker.getBrokersAsString(), groupId); + + // Send test message + kafkaTemplate.send(topic, "ack-consumer-aware-message"); + + // Wait for processing + assertThat(AckShareConsumerAwareTestListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(AckShareConsumerAwareTestListener.received.get()).isEqualTo("ack-consumer-aware-message"); + assertThat(AckShareConsumerAwareTestListener.consumerReceived.get()).isNotNull(); + assertThat(AckShareConsumerAwareTestListener.acknowledgmentReceived.get()).isNotNull(); + assertThat(AckShareConsumerAwareTestListener.acknowledgmentReceived.get().isAcknowledged()).isTrue(); + } + @Test - void integrationTestShareKafkaListener() throws Exception { - final String topic = "share-listener-integration-test"; - final String groupId = "share-listener-test-group"; + void shouldHandleMixedAcknowledgmentScenarios() throws Exception { + final String topic = "share-listener-mixed-ack-test"; + final String groupId = "share-mixed-ack-group"; setShareAutoOffsetResetEarliest(this.broker.getBrokersAsString(), groupId); - Map props = new HashMap<>(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker.getBrokersAsString()); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - ProducerFactory pf = new DefaultKafkaProducerFactory<>(props); - KafkaTemplate template = new KafkaTemplate<>(pf); - template.setDefaultTopic(topic); - template.sendDefault("foo"); - assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(received.get()).isEqualTo("foo"); + // Send multiple test messages with different scenarios + kafkaTemplate.send(topic, "success1", "success-message-1"); + kafkaTemplate.send(topic, "success2", "success-message-2"); + kafkaTemplate.send(topic, "retry", "retry-message"); + + // Wait for processing + assertThat(MixedAckTestListener.processedLatch.await(15, TimeUnit.SECONDS)).isTrue(); + assertThat(MixedAckTestListener.retryLatch.await(15, TimeUnit.SECONDS)).isTrue(); + + // Verify correct processing + assertThat(MixedAckTestListener.processedCount.get()).isEqualTo(4); // 3 original + 1 retry + assertThat(MixedAckTestListener.successCount.get()).isEqualTo(3); // 2 success + 1 retry success + assertThat(MixedAckTestListener.retryCount.get()).isEqualTo(1); + } + + @Test + void shouldHandleProcessingErrorsCorrectly() throws Exception { + final String topic = "share-listener-error-handling-test"; + final String groupId = "share-error-handling-group"; + setShareAutoOffsetResetEarliest(this.broker.getBrokersAsString(), groupId); + + // Send messages that will trigger errors + kafkaTemplate.send(topic, "success", "success-message"); + kafkaTemplate.send(topic, "error", "error-message"); + kafkaTemplate.send(topic, "success2", "success-message-2"); + + // Wait for processing + assertThat(ErrorHandlingTestListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + + // Verify error handling + assertThat(ErrorHandlingTestListener.successCount.get()).isEqualTo(2); + assertThat(ErrorHandlingTestListener.errorCount.get()).isEqualTo(1); } /** - * Sets the share.auto.offset.reset group config to earliest for the given groupId, - * using the provided bootstrapServers. + * Sets the share.auto.offset.reset group config to earliest for the given groupId. */ private static void setShareAutoOffsetResetEarliest(String bootstrapServers, String groupId) throws Exception { Map adminProperties = new HashMap<>(); @@ -103,12 +206,10 @@ private static void setShareAutoOffsetResetEarliest(String bootstrapServers, Str try (Admin admin = Admin.create(adminProperties)) { ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId); ConfigEntry configEntry = new ConfigEntry("share.auto.offset.reset", "earliest"); - Map> configs = java.util.Collections.singletonMap(configResource, - java.util.Collections.singleton(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))); - AlterConfigsResult alterConfigsResult = admin.incrementalAlterConfigs(configs); - alterConfigsResult.all().get(); + Map> configs = Map.of(configResource, + List.of(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))); + admin.incrementalAlterConfigs(configs).all().get(); } - } @Configuration @@ -119,33 +220,250 @@ static class TestConfig { public ShareConsumerFactory shareConsumerFactory(EmbeddedKafkaBroker broker) { Map configs = new HashMap<>(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); - configs.put(ConsumerConfig.GROUP_ID_CONFIG, "share-listener-test-group"); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultShareConsumerFactory<>(configs); } + @Bean + public ShareConsumerFactory explicitShareConsumerFactory(EmbeddedKafkaBroker broker) { + Map configs = new HashMap<>(); + configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + configs.put("share.acknowledgement.mode", "explicit"); + return new DefaultShareConsumerFactory<>(configs); + } + @Bean public ShareKafkaListenerContainerFactory shareKafkaListenerContainerFactory( ShareConsumerFactory shareConsumerFactory) { - return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory); } @Bean - public TestListener listener() { - return new TestListener(); + public ShareKafkaListenerContainerFactory explicitShareKafkaListenerContainerFactory( + ShareConsumerFactory explicitShareConsumerFactory) { + return new ShareKafkaListenerContainerFactory<>(explicitShareConsumerFactory); + } + + @Bean + public ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean + public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) { + return new KafkaTemplate<>(producerFactory); + } + + // Test listeners as beans + @Bean + public BasicTestListener basicTestListener() { + return new BasicTestListener(); + } + + @Bean + public ExplicitAckTestListener explicitAckTestListener() { + return new ExplicitAckTestListener(); + } + + @Bean + public ShareConsumerAwareTestListener shareConsumerAwareTestListener() { + return new ShareConsumerAwareTestListener(); } + @Bean + public AckShareConsumerAwareTestListener ackShareConsumerAwareTestListener() { + return new AckShareConsumerAwareTestListener(); + } + + @Bean + public MixedAckTestListener mixedAckTestListener() { + return new MixedAckTestListener(); + } + + @Bean + public ErrorHandlingTestListener errorHandlingTestListener() { + return new ErrorHandlingTestListener(); + } } - static class TestListener { + // Test listener classes + static class BasicTestListener { - @KafkaListener(topics = "share-listener-integration-test", containerFactory = "shareKafkaListenerContainerFactory") + static final CountDownLatch latch = new CountDownLatch(1); + + static final AtomicReference received = new AtomicReference<>(); + + @KafkaListener(topics = "share-listener-basic-test", + groupId = "share-listener-basic-group", + containerFactory = "shareKafkaListenerContainerFactory") public void listen(ConsumerRecord record) { received.set(record.value()); latch.countDown(); } + + } + + static class ExplicitAckTestListener { + + static final CountDownLatch latch = new CountDownLatch(3); + + static final CountDownLatch redeliveryLatch = new CountDownLatch(1); + + static final Map acknowledgmentTypes = new HashMap<>(); + + static final AtomicReference redeliveredAndAccepted = new AtomicReference<>(false); + + @KafkaListener(topics = "share-listener-explicit-ack-test", + groupId = "share-explicit-ack-group", + containerFactory = "explicitShareKafkaListenerContainerFactory") + public void listen(ConsumerRecord record, ShareAcknowledgment acknowledgment, + ShareConsumer consumer) { + String key = record.key(); + + if ("accept".equals(key)) { + acknowledgment.acknowledge(AcknowledgeType.ACCEPT); + acknowledgmentTypes.put(key, AcknowledgeType.ACCEPT); + latch.countDown(); + } + else if ("release".equals(key)) { + if (!acknowledgmentTypes.containsKey("release-attempted")) { + // First attempt - release it + acknowledgment.acknowledge(AcknowledgeType.RELEASE); + acknowledgmentTypes.put("release-attempted", AcknowledgeType.RELEASE); + latch.countDown(); + } + else { + // Redelivered - accept it + acknowledgment.acknowledge(AcknowledgeType.ACCEPT); + redeliveredAndAccepted.set(true); + redeliveryLatch.countDown(); + } + } + else if ("reject".equals(key)) { + acknowledgment.acknowledge(AcknowledgeType.REJECT); + acknowledgmentTypes.put(key, AcknowledgeType.REJECT); + latch.countDown(); + } + } + } + + static class ShareConsumerAwareTestListener { + + static final CountDownLatch latch = new CountDownLatch(1); + + static final AtomicReference received = new AtomicReference<>(); + + static final AtomicReference> consumerReceived = new AtomicReference<>(); + + @KafkaListener(topics = "share-listener-consumer-aware-test", + groupId = "share-consumer-aware-group", + containerFactory = "shareKafkaListenerContainerFactory") + public void listen(ConsumerRecord record, ShareConsumer consumer) { + received.set(record.value()); + consumerReceived.set(consumer); + latch.countDown(); + } + } + + static class AckShareConsumerAwareTestListener { + + static final CountDownLatch latch = new CountDownLatch(1); + + static final AtomicReference received = new AtomicReference<>(); + + static final AtomicReference> consumerReceived = new AtomicReference<>(); + + static final AtomicReference acknowledgmentReceived = new AtomicReference<>(); + + @KafkaListener(topics = "share-listener-ack-consumer-aware-test", + groupId = "share-ack-consumer-aware-group", + containerFactory = "explicitShareKafkaListenerContainerFactory") + public void listen(ConsumerRecord record, + @Nullable ShareAcknowledgment acknowledgment, ShareConsumer consumer) { + received.set(record.value()); + consumerReceived.set(consumer); + acknowledgmentReceived.set(acknowledgment); + if (acknowledgment != null) { + acknowledgment.acknowledge(); // ACCEPT + } + latch.countDown(); + } + } + + static class MixedAckTestListener { + + static final CountDownLatch processedLatch = new CountDownLatch(3); + + static final CountDownLatch retryLatch = new CountDownLatch(1); + + static final AtomicInteger processedCount = new AtomicInteger(); + + static final AtomicInteger successCount = new AtomicInteger(); + + static final AtomicInteger retryCount = new AtomicInteger(); + + @KafkaListener(topics = "share-listener-mixed-ack-test", + groupId = "share-mixed-ack-group", + containerFactory = "explicitShareKafkaListenerContainerFactory") + public void listen(ConsumerRecord record, ShareAcknowledgment acknowledgment) { + String key = record.key(); + int count = processedCount.incrementAndGet(); + + if ("retry".equals(key)) { + if (retryCount.get() == 0) { + // First attempt - release for retry + retryCount.incrementAndGet(); + acknowledgment.acknowledge(AcknowledgeType.RELEASE); + processedLatch.countDown(); + } + else { + // Retry attempt - accept + successCount.incrementAndGet(); + acknowledgment.acknowledge(AcknowledgeType.ACCEPT); + retryLatch.countDown(); + } + } + else { + // Success messages + successCount.incrementAndGet(); + acknowledgment.acknowledge(AcknowledgeType.ACCEPT); + processedLatch.countDown(); + } + } + } + + static class ErrorHandlingTestListener { + + static final CountDownLatch latch = new CountDownLatch(3); + + static final AtomicInteger successCount = new AtomicInteger(); + + static final AtomicInteger errorCount = new AtomicInteger(); + + @KafkaListener(topics = "share-listener-error-handling-test", + groupId = "share-error-handling-group", + containerFactory = "explicitShareKafkaListenerContainerFactory") + public void listen(ConsumerRecord record, ShareAcknowledgment acknowledgment) { + String key = record.key(); + if ("error".equals(key)) { + errorCount.incrementAndGet(); + latch.countDown(); + // Let the error propagate - container should auto-reject + throw new RuntimeException("Simulated processing error"); + } + else { + successCount.incrementAndGet(); + acknowledgment.acknowledge(AcknowledgeType.ACCEPT); + latch.countDown(); + } + } } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerConstraintTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerConstraintTests.java new file mode 100644 index 0000000000..ee773d8326 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerConstraintTests.java @@ -0,0 +1,331 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.jspecify.annotations.Nullable; +import org.junit.jupiter.api.Test; + +import org.springframework.kafka.core.DefaultShareConsumerFactory; +import org.springframework.kafka.support.ShareAcknowledgment; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; + +import static org.assertj.core.api.Assertions.assertThat; + +@EmbeddedKafka( + topics = { + "share-constraint-basic-test", + "share-constraint-partial-test", + "share-constraint-timeout-test", + "share-constraint-concurrent-test" + }, + partitions = 1, + brokerProperties = { + "share.coordinator.state.topic.replication.factor=1", + "share.coordinator.state.topic.min.isr=1" + } +) +class ShareKafkaMessageListenerContainerConstraintTests { + + @Test + void shouldBlockSubsequentPollsUntilAllRecordsAcknowledged(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-constraint-basic-test"; + String groupId = "share-constraint-basic-group"; + String bootstrapServers = broker.getBrokersAsString(); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + + // Produce first batch + produceTestRecords(bootstrapServers, topic, 3); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, true); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setShareAcknowledgmentMode(ContainerProperties.ShareAcknowledgmentMode.EXPLICIT); + + CountDownLatch firstBatchLatch = new CountDownLatch(3); + CountDownLatch secondBatchLatch = new CountDownLatch(2); + List firstBatchAcks = Collections.synchronizedList(new ArrayList<>()); + AtomicInteger totalProcessed = new AtomicInteger(); + + containerProps.setMessageListener(new AcknowledgingShareConsumerAwareMessageListener() { + @Override + public void onShareRecord(ConsumerRecord record, + @Nullable ShareAcknowledgment acknowledgment, ShareConsumer consumer) { + + int count = totalProcessed.incrementAndGet(); + + if (count <= 3) { + // First batch - collect acknowledgments but don't acknowledge + firstBatchAcks.add(acknowledgment); + firstBatchLatch.countDown(); + } + else { + // Second batch - should only happen after first batch acknowledged + acknowledgment.acknowledge(); + secondBatchLatch.countDown(); + } + } + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("constraintBasicTestContainer"); + container.start(); + + try { + // Wait for first batch + assertThat(firstBatchLatch.await(15, TimeUnit.SECONDS)).isTrue(); + assertThat(firstBatchAcks).hasSize(3); + + // Produce second batch while first is pending + produceTestRecords(bootstrapServers, topic, 2); + + // Wait and verify second batch is NOT processed yet + Thread.sleep(3000); + assertThat(totalProcessed.get()).isEqualTo(3); + assertThat(secondBatchLatch.getCount()).isEqualTo(2); + + // Acknowledge first batch + for (ShareAcknowledgment ack : firstBatchAcks) { + ack.acknowledge(); + } + + // Now second batch should be processed + assertThat(secondBatchLatch.await(15, TimeUnit.SECONDS)).isTrue(); + assertThat(totalProcessed.get()).isEqualTo(5); + } + finally { + container.stop(); + } + } + + @Test + void shouldHandlePartialAcknowledgmentCorrectly(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-constraint-partial-test"; + String groupId = "share-constraint-partial-group"; + String bootstrapServers = broker.getBrokersAsString(); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + produceTestRecords(bootstrapServers, topic, 4); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, true); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setShareAcknowledgmentMode(ContainerProperties.ShareAcknowledgmentMode.EXPLICIT); + + CountDownLatch batchLatch = new CountDownLatch(4); + CountDownLatch nextPollLatch = new CountDownLatch(1); + List batchAcks = Collections.synchronizedList(new ArrayList<>()); + AtomicInteger totalProcessed = new AtomicInteger(); + + containerProps.setMessageListener(new AcknowledgingShareConsumerAwareMessageListener() { + @Override + public void onShareRecord(ConsumerRecord record, + @Nullable ShareAcknowledgment acknowledgment, ShareConsumer consumer) { + + int count = totalProcessed.incrementAndGet(); + + if (count <= 4) { + batchAcks.add(acknowledgment); + batchLatch.countDown(); + } + else { + // This should only happen after all previous records acknowledged + acknowledgment.acknowledge(); + nextPollLatch.countDown(); + } + } + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("constraintPartialTestContainer"); + container.start(); + + try { + // Wait for batch to be processed + assertThat(batchLatch.await(15, TimeUnit.SECONDS)).isTrue(); + assertThat(batchAcks).hasSize(4); + + // Acknowledge only first 3 records + for (int i = 0; i < 3; i++) { + batchAcks.get(i).acknowledge(); + } + + // Produce more records + produceTestRecords(bootstrapServers, topic, 1); + + // Should not process new records while one is still pending + Thread.sleep(3000); + assertThat(totalProcessed.get()).isEqualTo(4); + + // Acknowledge the last pending record + batchAcks.get(3).acknowledge(); + + // Now should process new records + assertThat(nextPollLatch.await(15, TimeUnit.SECONDS)).isTrue(); + assertThat(totalProcessed.get()).isEqualTo(5); + + } + finally { + container.stop(); + } + } + + @Test + void shouldHandleConcurrentAcknowledgmentAttempts(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-constraint-concurrent-test"; + String groupId = "share-constraint-concurrent-group"; + String bootstrapServers = broker.getBrokersAsString(); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + produceTestRecords(bootstrapServers, topic, 1); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, true); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setShareAcknowledgmentMode(ContainerProperties.ShareAcknowledgmentMode.EXPLICIT); + + CountDownLatch processedLatch = new CountDownLatch(1); + AtomicReference ackRef = new AtomicReference<>(); + AtomicInteger successfulAcks = new AtomicInteger(); + AtomicInteger failedAcks = new AtomicInteger(); + + containerProps.setMessageListener(new AcknowledgingShareConsumerAwareMessageListener() { + @Override + public void onShareRecord(ConsumerRecord record, + @Nullable ShareAcknowledgment acknowledgment, ShareConsumer consumer) { + ackRef.set(acknowledgment); + processedLatch.countDown(); + } + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("constraintConcurrentTestContainer"); + container.start(); + + try { + // Wait for record to be processed + assertThat(processedLatch.await(10, TimeUnit.SECONDS)).isTrue(); + ShareAcknowledgment ack = ackRef.get(); + assertThat(ack).isNotNull(); + + // Try to acknowledge the same record concurrently from multiple threads + int numThreads = 10; + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CountDownLatch threadLatch = new CountDownLatch(numThreads); + + for (int i = 0; i < numThreads; i++) { + executor.submit(() -> { + try { + ack.acknowledge(); + successfulAcks.incrementAndGet(); + } + catch (IllegalStateException e) { + failedAcks.incrementAndGet(); + } + finally { + threadLatch.countDown(); + } + }); + } + + assertThat(threadLatch.await(10, TimeUnit.SECONDS)).isTrue(); + executor.shutdown(); + + // Only one acknowledgment should succeed + assertThat(successfulAcks.get()).isEqualTo(1); + assertThat(failedAcks.get()).isEqualTo(numThreads - 1); + assertThat(ack.isAcknowledged()).isTrue(); + + } + finally { + container.stop(); + } + } + + // Utility methods + private Map createConsumerProps(String bootstrapServers, String groupId, boolean explicit) { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + if (explicit) { + props.put("share.acknowledgement.mode", "explicit"); + } + return props; + } + + private void produceTestRecords(String bootstrapServers, String topic, int count) throws Exception { + try (var producer = createProducer(bootstrapServers)) { + for (int i = 0; i < count; i++) { + producer.send(new ProducerRecord<>(topic, "key" + i, "value" + i)).get(); + } + } + } + + private KafkaProducer createProducer(String bootstrapServers) { + Map producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new KafkaProducer<>(producerProps); + } + + private void setShareAutoOffsetResetEarliest(String bootstrapServers, String groupId) throws Exception { + Map adminProperties = new HashMap<>(); + adminProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest"); + AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET); + Map> configs = Map.of( + new ConfigResource(ConfigResource.Type.GROUP, groupId), List.of(op)); + try (Admin admin = Admin.create(adminProperties)) { + admin.incrementalAlterConfigs(configs).all().get(); + } + } +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java index 7aff9fa602..087b38ab0a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java @@ -16,43 +16,58 @@ package org.springframework.kafka.listener; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ShareConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.Test; import org.springframework.kafka.core.DefaultShareConsumerFactory; +import org.springframework.kafka.support.ShareAcknowledgment; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; import static org.assertj.core.api.Assertions.assertThat; -/** - * Basic tests for {@link ShareKafkaMessageListenerContainer}. - * - * @author Soby Chacko - * @since 4.0 - */ @EmbeddedKafka( - topics = {"share-listener-integration-test"}, partitions = 1, - brokerProperties = { - "share.coordinator.state.topic.replication.factor=1", - "share.coordinator.state.topic.min.isr=1" - } + topics = { + "share-listener-integration-test", + "share-container-explicit-test", + "share-container-implicit-test", + "share-container-constraint-test", + "share-container-error-test", + "share-container-mixed-ack-test", + "share-container-lifecycle-test" + }, + partitions = 1, + brokerProperties = { + "share.coordinator.state.topic.replication.factor=1", + "share.coordinator.state.topic.min.isr=1" + } ) class ShareKafkaMessageListenerContainerIntegrationTests { @@ -96,28 +111,531 @@ void integrationTestShareKafkaMessageListenerContainer(EmbeddedKafkaBroker broke try { assertThat(latch.await(10, java.util.concurrent.TimeUnit.SECONDS) && "integration-test-value".equals(received.get())) - .as("Message should be received and have expected value") - .isTrue(); + .as("Message should be received and have expected value") + .isTrue(); + } + finally { + container.stop(); + } + } + + @Test + void shouldSupportExplicitAcknowledgmentMode(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-explicit-test"; + String groupId = "share-container-explicit-group"; + String bootstrapServers = broker.getBrokersAsString(); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + produceTestRecords(bootstrapServers, topic, 3); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, true); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setShareAcknowledgmentMode(ContainerProperties.ShareAcknowledgmentMode.EXPLICIT); + + CountDownLatch latch = new CountDownLatch(3); + List received = Collections.synchronizedList(new ArrayList<>()); + List acknowledgments = Collections.synchronizedList(new ArrayList<>()); + + containerProps.setMessageListener(new AcknowledgingShareConsumerAwareMessageListener() { + @Override + public void onShareRecord(ConsumerRecord record, + @Nullable ShareAcknowledgment acknowledgment, ShareConsumer consumer) { + received.add(record.value()); + acknowledgments.add(acknowledgment); + + // Explicitly acknowledge the record + if (acknowledgment != null) { + acknowledgment.acknowledge(); // ACCEPT + } + + latch.countDown(); + } + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("explicitAckTestContainer"); + container.start(); + + try { + assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(received).hasSize(3); + assertThat(acknowledgments).hasSize(3); + assertThat(acknowledgments).allMatch(Objects::nonNull); + assertThat(acknowledgments).allMatch(ack -> ack.isAcknowledged()); + assertThat(acknowledgments).allMatch(ack -> ack.getAcknowledgmentType() == AcknowledgeType.ACCEPT); + } + finally { + container.stop(); + } + } + + @Test + void shouldSupportImplicitAcknowledgmentMode(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-implicit-test"; + String groupId = "share-container-implicit-group"; + String bootstrapServers = broker.getBrokersAsString(); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + produceTestRecords(bootstrapServers, topic, 3); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, false); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + // Default is implicit mode + + CountDownLatch latch = new CountDownLatch(3); + List received = Collections.synchronizedList(new ArrayList<>()); + + containerProps.setMessageListener(new AcknowledgingShareConsumerAwareMessageListener() { + @Override + public void onShareRecord(ConsumerRecord record, + @Nullable ShareAcknowledgment acknowledgment, ShareConsumer consumer) { + received.add(record.value()); + + // In implicit mode, acknowledgment should be null + assertThat(acknowledgment).isNull(); + + latch.countDown(); + } + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("implicitAckTestContainer"); + container.start(); + + try { + assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(received).hasSize(3); + } + finally { + container.stop(); + } + } + + @Test + void shouldEnforceExplicitAcknowledgmentConstraints(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-constraint-test"; + String groupId = "share-container-constraint-group"; + String bootstrapServers = broker.getBrokersAsString(); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + produceTestRecords(bootstrapServers, topic, 3); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, true); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setShareAcknowledgmentMode(ContainerProperties.ShareAcknowledgmentMode.EXPLICIT); + + CountDownLatch firstBatchLatch = new CountDownLatch(3); + CountDownLatch secondBatchLatch = new CountDownLatch(3); + AtomicInteger processedCount = new AtomicInteger(); + List pendingAcks = Collections.synchronizedList(new ArrayList<>()); + + containerProps.setMessageListener(new AcknowledgingShareConsumerAwareMessageListener() { + @Override + public void onShareRecord(ConsumerRecord record, + @Nullable ShareAcknowledgment acknowledgment, ShareConsumer consumer) { + + int count = processedCount.incrementAndGet(); + + if (count <= 3) { + // First batch - collect acknowledgments but don't acknowledge yet + pendingAcks.add(acknowledgment); + firstBatchLatch.countDown(); + } + else { + // Second batch - should only happen after first batch is acknowledged + acknowledgment.acknowledge(); + secondBatchLatch.countDown(); + } + } + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("constraintTestContainer"); + container.start(); + + try { + // Wait for first batch to be processed + assertThat(firstBatchLatch.await(15, TimeUnit.SECONDS)).isTrue(); + assertThat(pendingAcks).hasSize(3); + + // Wait a bit to ensure no more records are processed while acknowledgments are pending + Thread.sleep(2000); + assertThat(processedCount.get()).isEqualTo(3); + + // Acknowledge first batch + for (ShareAcknowledgment ack : pendingAcks) { + ack.acknowledge(); + } + + // Produce more records for second batch + produceTestRecords(bootstrapServers, topic, 3); + + // Now second batch should be processed + assertThat(secondBatchLatch.await(15, TimeUnit.SECONDS)).isTrue(); + assertThat(processedCount.get()).isEqualTo(6); + + } + finally { + container.stop(); + } + } + + @Test + void shouldHandleProcessingErrorsInExplicitMode(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-error-test"; + String groupId = "share-container-error-group"; + String bootstrapServers = broker.getBrokersAsString(); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + produceTestRecords(bootstrapServers, topic, 5); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, true); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setShareAcknowledgmentMode(ContainerProperties.ShareAcknowledgmentMode.EXPLICIT); + + CountDownLatch latch = new CountDownLatch(5); + AtomicInteger errorCount = new AtomicInteger(); + AtomicInteger successCount = new AtomicInteger(); + + containerProps.setMessageListener(new AcknowledgingShareConsumerAwareMessageListener() { + @Override + public void onShareRecord(ConsumerRecord record, + @Nullable ShareAcknowledgment acknowledgment, ShareConsumer consumer) { + + // Simulate error for every 3rd record + if (record.value().endsWith("2")) { // value2 + errorCount.incrementAndGet(); + latch.countDown(); + throw new RuntimeException("Simulated processing error"); + } + else { + successCount.incrementAndGet(); + acknowledgment.acknowledge(); + } + + latch.countDown(); + } + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("errorTestContainer"); + container.start(); + + try { + assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(errorCount.get()).isEqualTo(1); + assertThat(successCount.get()).isEqualTo(4); + } + finally { + container.stop(); + } + } + + @Test + void shouldSupportMixedAcknowledgmentTypes(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-mixed-ack-test"; + String groupId = "share-container-mixed-ack-group"; + String bootstrapServers = broker.getBrokersAsString(); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + + // Produce test records with different keys to identify them + try (var producer = createProducer(bootstrapServers)) { + producer.send(new ProducerRecord<>(topic, "accept", "accept-value")).get(); + producer.send(new ProducerRecord<>(topic, "release", "release-value")).get(); + producer.send(new ProducerRecord<>(topic, "reject", "reject-value")).get(); + } + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, true); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setShareAcknowledgmentMode(ContainerProperties.ShareAcknowledgmentMode.EXPLICIT); + + CountDownLatch firstRoundLatch = new CountDownLatch(3); + CountDownLatch redeliveryLatch = new CountDownLatch(1); + Map ackTypes = new ConcurrentHashMap<>(); + + containerProps.setMessageListener(new AcknowledgingShareConsumerAwareMessageListener() { + @Override + public void onShareRecord(ConsumerRecord record, + @Nullable ShareAcknowledgment acknowledgment, ShareConsumer consumer) { + + String key = record.key(); + + if ("accept".equals(key)) { + acknowledgment.acknowledge(AcknowledgeType.ACCEPT); + ackTypes.put(key, AcknowledgeType.ACCEPT); + firstRoundLatch.countDown(); + } + else if ("release".equals(key)) { + if (!ackTypes.containsKey("release-redelivered")) { + // First delivery - release it + acknowledgment.acknowledge(AcknowledgeType.RELEASE); + ackTypes.put("release-redelivered", AcknowledgeType.RELEASE); + firstRoundLatch.countDown(); + } + else { + // Redelivered - accept it + acknowledgment.acknowledge(AcknowledgeType.ACCEPT); + ackTypes.put(key, AcknowledgeType.ACCEPT); + redeliveryLatch.countDown(); + } + } + else if ("reject".equals(key)) { + acknowledgment.acknowledge(AcknowledgeType.REJECT); + ackTypes.put(key, AcknowledgeType.REJECT); + firstRoundLatch.countDown(); + } + } + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("mixedAckTestContainer"); + container.start(); + + try { + // Wait for first round of processing + assertThat(firstRoundLatch.await(15, TimeUnit.SECONDS)).isTrue(); + assertThat(ackTypes.get("accept")).isEqualTo(AcknowledgeType.ACCEPT); + assertThat(ackTypes.get("reject")).isEqualTo(AcknowledgeType.REJECT); + + // Wait for redelivery of released record + assertThat(redeliveryLatch.await(15, TimeUnit.SECONDS)).isTrue(); + assertThat(ackTypes.get("release")).isEqualTo(AcknowledgeType.ACCEPT); // what got released, was accepted eventually. + assertThat(ackTypes.get("release-redelivered")).isEqualTo(AcknowledgeType.RELEASE); + + } + finally { + container.stop(); + } + } + + @Test + void shouldSupportDifferentListenerTypes(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-implicit-test"; + String groupId = "share-container-listener-types-group"; + String bootstrapServers = broker.getBrokersAsString(); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + produceTestRecords(bootstrapServers, topic, 1); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, false); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + // Test 1: Basic MessageListener + testBasicMessageListener(factory, topic, bootstrapServers, groupId + "-basic"); + + // Test 2: ShareConsumerAwareMessageListener + testShareConsumerAwareListener(factory, topic, bootstrapServers, groupId + "-aware"); + + // Test 3: AcknowledgingShareConsumerAwareMessageListener in implicit mode + testAckListenerInImplicitMode(factory, topic, bootstrapServers, groupId + "-ack-implicit"); + } + + @Test + void shouldHandleContainerLifecycle(EmbeddedKafkaBroker broker) throws Exception { + String topic = "share-container-lifecycle-test"; + String groupId = "share-container-lifecycle-group"; + String bootstrapServers = broker.getBrokersAsString(); + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + + Map consumerProps = createConsumerProps(bootstrapServers, groupId, true); + DefaultShareConsumerFactory factory = new DefaultShareConsumerFactory<>(consumerProps); + + ContainerProperties containerProps = new ContainerProperties(topic); + containerProps.setShareAcknowledgmentMode(ContainerProperties.ShareAcknowledgmentMode.EXPLICIT); + + AtomicBoolean listenerCalled = new AtomicBoolean(false); + containerProps.setMessageListener(new AcknowledgingShareConsumerAwareMessageListener() { + @Override + public void onShareRecord(ConsumerRecord record, + @Nullable ShareAcknowledgment acknowledgment, ShareConsumer consumer) { + listenerCalled.set(true); + acknowledgment.acknowledge(); + } + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("lifecycleTestContainer"); + + // Test initial state + assertThat(container.isRunning()).isFalse(); + + // Test start + container.start(); + assertThat(container.isRunning()).isTrue(); + + // Test processing + produceTestRecords(bootstrapServers, topic, 1); + Thread.sleep(3000); // Give time for processing + assertThat(listenerCalled.get()).isTrue(); + + // Test stop + container.stop(); + assertThat(container.isRunning()).isFalse(); + + // Test restart + listenerCalled.set(false); + container.start(); + assertThat(container.isRunning()).isTrue(); + + produceTestRecords(bootstrapServers, topic, 1); + Thread.sleep(3000); + assertThat(listenerCalled.get()).isTrue(); + + container.stop(); + } + + private void testBasicMessageListener(DefaultShareConsumerFactory factory, + String topic, String bootstrapServers, String groupId) throws Exception { + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + produceTestRecords(bootstrapServers, topic, 1); + + ContainerProperties containerProps = new ContainerProperties(topic); + CountDownLatch latch = new CountDownLatch(1); + + containerProps.setMessageListener((MessageListener) record -> { + assertThat(record).isNotNull(); + latch.countDown(); + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("basicListenerTest"); + container.start(); + + try { + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); } finally { container.stop(); } } - /** - * Sets the share.auto.offset.reset group config to earliest for the given groupId, - * using the provided bootstrapServers. - */ - private static void setShareAutoOffsetResetEarliest(String bootstrapServers, String groupId) throws Exception { + private void testShareConsumerAwareListener(DefaultShareConsumerFactory factory, + String topic, String bootstrapServers, String groupId) throws Exception { + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + produceTestRecords(bootstrapServers, topic, 1); + + ContainerProperties containerProps = new ContainerProperties(topic); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference> consumerRef = new AtomicReference<>(); + + containerProps.setMessageListener(new ShareConsumerAwareMessageListener() { + @Override + public void onShareRecord(ConsumerRecord record, ShareConsumer consumer) { + assertThat(record).isNotNull(); + assertThat(consumer).isNotNull(); + consumerRef.set(consumer); + latch.countDown(); + } + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("consumerAwareListenerTest"); + container.start(); + + try { + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(consumerRef.get()).isNotNull(); + } + finally { + container.stop(); + } + } + + private void testAckListenerInImplicitMode(DefaultShareConsumerFactory factory, + String topic, String bootstrapServers, String groupId) throws Exception { + + setShareAutoOffsetResetEarliest(bootstrapServers, groupId); + produceTestRecords(bootstrapServers, topic, 1); + + ContainerProperties containerProps = new ContainerProperties(topic); + // Implicit mode (default) + CountDownLatch latch = new CountDownLatch(1); + + containerProps.setMessageListener(new AcknowledgingShareConsumerAwareMessageListener() { + @Override + public void onShareRecord(ConsumerRecord record, + @Nullable ShareAcknowledgment acknowledgment, ShareConsumer consumer) { + assertThat(record).isNotNull(); + assertThat(consumer).isNotNull(); + // In implicit mode, acknowledgment should be null + assertThat(acknowledgment).isNull(); + latch.countDown(); + } + }); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(factory, containerProps); + container.setBeanName("ackListenerImplicitTest"); + container.start(); + + try { + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } + finally { + container.stop(); + } + } + + // Utility methods + private Map createConsumerProps(String bootstrapServers, String groupId, boolean explicit) { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + if (explicit) { + props.put("share.acknowledgement.mode", "explicit"); + } + return props; + } + + private void produceTestRecords(String bootstrapServers, String topic, int count) throws Exception { + try (var producer = createProducer(bootstrapServers)) { + for (int i = 0; i < count; i++) { + producer.send(new ProducerRecord<>(topic, "key" + i, "value" + i)).get(); + } + } + } + + private KafkaProducer createProducer(String bootstrapServers) { + Map producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return new KafkaProducer<>(producerProps); + } + + private void setShareAutoOffsetResetEarliest(String bootstrapServers, String groupId) throws Exception { Map adminProperties = new HashMap<>(); - adminProperties.put("bootstrap.servers", bootstrapServers); + adminProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest"); AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET); Map> configs = Map.of( new ConfigResource(ConfigResource.Type.GROUP, groupId), List.of(op)); - try (Admin admin = AdminClient.create(adminProperties)) { + try (Admin admin = Admin.create(adminProperties)) { admin.incrementalAlterConfigs(configs).all().get(); } } - } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java new file mode 100644 index 0000000000..6e54948e75 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerUnitTests.java @@ -0,0 +1,403 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.ShareConsumer; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.springframework.kafka.core.ShareConsumerFactory; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.ArgumentMatchers.notNull; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willThrow; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +public class ShareKafkaMessageListenerContainerUnitTests { + + @Mock + private ShareConsumerFactory shareConsumerFactory; + + @Mock + private ShareConsumer shareConsumer; + + @Mock + private MessageListener messageListener; + + @Mock + private ShareConsumerAwareMessageListener shareConsumerAwareListener; + + @Mock + private AcknowledgingShareConsumerAwareMessageListener ackListener; + + private ContainerProperties containerProperties; + + private ConsumerRecord testRecord; + + private ConsumerRecords testRecords; + + @Test + void shouldConfigureExplicitModeCorrectly() { + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setShareAcknowledgmentMode(ContainerProperties.ShareAcknowledgmentMode.EXPLICIT); + containerProperties.setMessageListener(ackListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + assertThat(container.getContainerProperties().getShareAcknowledgmentMode()) + .isEqualTo(ContainerProperties.ShareAcknowledgmentMode.EXPLICIT); + } + + @Test + void shouldConfigureImplicitModeByDefault() { + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setMessageListener(messageListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + assertThat(container.getContainerProperties().getShareAcknowledgmentMode()) + .isEqualTo(ContainerProperties.ShareAcknowledgmentMode.IMPLICIT); + } + + @Test + void shouldInvokeBasicMessageListener() throws Exception { + // Setup test data + ConsumerRecord testRecord = new ConsumerRecord<>("test-topic", 0, 100L, "key", "value"); + Map>> records = new HashMap<>(); + records.put(new TopicPartition("test-topic", 0), List.of(testRecord)); + ConsumerRecords testRecords = new ConsumerRecords<>(records, Map.of()); + + // Setup mocks + given(shareConsumerFactory.createShareConsumer(any(), any())).willReturn(shareConsumer); + given(shareConsumer.poll(any(Duration.class))) + .willReturn(testRecords) + .willReturn(ConsumerRecords.empty()); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setMessageListener(messageListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + container.start(); + Thread.sleep(1000); + container.stop(); + + verify(messageListener, atLeastOnce()).onMessage(testRecord); + verify(shareConsumer, atLeastOnce()).acknowledge(testRecord, AcknowledgeType.ACCEPT); + } + + @Test + void shouldInvokeShareConsumerAwareListener() throws Exception { + // Setup test data + ConsumerRecord testRecord = new ConsumerRecord<>("test-topic", 0, 100L, "key", "value"); + Map>> records = new HashMap<>(); + records.put(new TopicPartition("test-topic", 0), List.of(testRecord)); + ConsumerRecords testRecords = new ConsumerRecords<>(records, Map.of()); + + // Setup mocks + given(shareConsumerFactory.createShareConsumer(any(), any())).willReturn(shareConsumer); + given(shareConsumer.poll(any(Duration.class))) + .willReturn(testRecords) + .willReturn(ConsumerRecords.empty()); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setMessageListener(shareConsumerAwareListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + container.start(); + Thread.sleep(1000); + container.stop(); + + verify(shareConsumerAwareListener, atLeastOnce()).onShareRecord(testRecord, shareConsumer); + verify(shareConsumer, atLeastOnce()).acknowledge(testRecord, AcknowledgeType.ACCEPT); + } + + @Test + void shouldInvokeAckListenerWithNullInImplicitMode() throws Exception { + // Setup test data + ConsumerRecord testRecord = new ConsumerRecord<>("test-topic", 0, 100L, "key", "value"); + Map>> records = new HashMap<>(); + records.put(new TopicPartition("test-topic", 0), List.of(testRecord)); + ConsumerRecords testRecords = new ConsumerRecords<>(records, Map.of()); + + // Setup mocks + given(shareConsumerFactory.createShareConsumer(any(), any())).willReturn(shareConsumer); + given(shareConsumer.poll(any(Duration.class))) + .willReturn(testRecords) + .willReturn(ConsumerRecords.empty()); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setMessageListener(ackListener); + // Default is implicit mode + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + container.start(); + Thread.sleep(1000); + container.stop(); + + verify(ackListener, atLeastOnce()).onShareRecord(eq(testRecord), isNull(), eq(shareConsumer)); + verify(shareConsumer, atLeastOnce()).acknowledge(testRecord, AcknowledgeType.ACCEPT); + } + + @Test + void shouldInvokeAckListenerWithAckInExplicitMode() throws Exception { + // Setup test data + ConsumerRecord testRecord = new ConsumerRecord<>("test-topic", 0, 100L, "key", "value"); + Map>> records = new HashMap<>(); + records.put(new TopicPartition("test-topic", 0), List.of(testRecord)); + ConsumerRecords testRecords = new ConsumerRecords<>(records, Map.of()); + + // Setup mocks + given(shareConsumerFactory.createShareConsumer(any(), any())).willReturn(shareConsumer); + given(shareConsumer.poll(any(Duration.class))) + .willReturn(testRecords) + .willReturn(ConsumerRecords.empty()); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setShareAcknowledgmentMode(ContainerProperties.ShareAcknowledgmentMode.EXPLICIT); + containerProperties.setMessageListener(ackListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + container.start(); + Thread.sleep(1000); + container.stop(); + + verify(ackListener, atLeastOnce()).onShareRecord(eq(testRecord), notNull(), eq(shareConsumer)); + // No auto-acknowledgment in explicit mode + verify(shareConsumer, never()).acknowledge(any(), any()); + } + + @Test + void shouldHandleProcessingErrorInImplicitMode() throws Exception { + // Setup test data + ConsumerRecord testRecord = new ConsumerRecord<>("test-topic", 0, 100L, "key", "value"); + Map>> records = new HashMap<>(); + records.put(new TopicPartition("test-topic", 0), List.of(testRecord)); + ConsumerRecords testRecords = new ConsumerRecords<>(records, Map.of()); + + // Setup mocks + given(shareConsumerFactory.createShareConsumer(any(), any())).willReturn(shareConsumer); + given(shareConsumer.poll(any(Duration.class))) + .willReturn(testRecords) + .willReturn(ConsumerRecords.empty()); + willThrow(new RuntimeException("Processing error")).given(messageListener).onMessage(any()); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setMessageListener(messageListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + container.start(); + Thread.sleep(1000); + container.stop(); + + verify(messageListener, atLeastOnce()).onMessage(testRecord); + // Should auto-reject on error + verify(shareConsumer, atLeastOnce()).acknowledge(testRecord, AcknowledgeType.REJECT); + } + + @Test + void shouldHandleProcessingErrorInExplicitMode() throws Exception { + // Setup test data + ConsumerRecord testRecord = new ConsumerRecord<>("test-topic", 0, 100L, "key", "value"); + Map>> records = new HashMap<>(); + records.put(new TopicPartition("test-topic", 0), List.of(testRecord)); + ConsumerRecords testRecords = new ConsumerRecords<>(records, Map.of()); + + // Setup mocks + given(shareConsumerFactory.createShareConsumer(any(), any())).willReturn(shareConsumer); + given(shareConsumer.poll(any(Duration.class))) + .willReturn(testRecords) + .willReturn(ConsumerRecords.empty()); + willThrow(new RuntimeException("Processing error")).given(ackListener).onShareRecord(any(), any(), any()); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setShareAcknowledgmentMode(ContainerProperties.ShareAcknowledgmentMode.EXPLICIT); + containerProperties.setMessageListener(ackListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + container.start(); + Thread.sleep(1000); + container.stop(); + + verify(ackListener, atLeastOnce()).onShareRecord(eq(testRecord), notNull(), eq(shareConsumer)); + // Should auto-reject on error in explicit mode too + verify(shareConsumer, atLeastOnce()).acknowledge(testRecord, AcknowledgeType.REJECT); + } + + @Test + void shouldCommitAcknowledgments() throws Exception { + // Setup test data + ConsumerRecord testRecord = new ConsumerRecord<>("test-topic", 0, 100L, "key", "value"); + Map>> records = new HashMap<>(); + records.put(new TopicPartition("test-topic", 0), List.of(testRecord)); + ConsumerRecords testRecords = new ConsumerRecords<>(records, Map.of()); + + // Setup mocks + given(shareConsumerFactory.createShareConsumer(any(), any())).willReturn(shareConsumer); + given(shareConsumer.poll(any(Duration.class))) + .willReturn(testRecords) + .willReturn(ConsumerRecords.empty()); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setMessageListener(messageListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + container.start(); + Thread.sleep(1000); + container.stop(); + + verify(shareConsumer, atLeastOnce()).commitSync(); + } + + @Test + void shouldSubscribeToTopics() { + given(shareConsumerFactory.createShareConsumer(any(), any())).willReturn(shareConsumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setMessageListener(messageListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + container.start(); + + verify(shareConsumer).subscribe(List.of("test-topic")); + + container.stop(); + } + + @Test + void shouldHandleEmptyPollResults() throws Exception { + // Setup mocks for empty results + given(shareConsumerFactory.createShareConsumer(any(), any())).willReturn(shareConsumer); + given(shareConsumer.poll(any(Duration.class))).willReturn(ConsumerRecords.empty()); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setMessageListener(messageListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + container.start(); + Thread.sleep(1000); + container.stop(); + + // No listener invocation for empty records + verify(messageListener, never()).onMessage(any()); + // No acknowledgments for empty records + verify(shareConsumer, never()).acknowledge(any(), any()); + // But commit should still be called + verify(shareConsumer, never()).commitSync(); + } + + @Test + void shouldCloseConsumerOnStop() throws Exception { + given(shareConsumerFactory.createShareConsumer(any(), any())).willReturn(shareConsumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setMessageListener(messageListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + container.start(); + container.stop(); + Thread.sleep(100); + + verify(shareConsumer).close(); + } + + @Test + void shouldSupportContainerProperties() { + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setMessageListener(messageListener); + containerProperties.setShareAcknowledgmentMode(ContainerProperties.ShareAcknowledgmentMode.EXPLICIT); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + assertThat(container.getContainerProperties().getShareAcknowledgmentMode()) + .isEqualTo(ContainerProperties.ShareAcknowledgmentMode.EXPLICIT); + } + + @Test + void shouldReportRunningState() { + given(shareConsumerFactory.createShareConsumer(any(), any())).willReturn(shareConsumer); + + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setMessageListener(messageListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + assertThat(container.isRunning()).isFalse(); + + container.start(); + assertThat(container.isRunning()).isTrue(); + + container.stop(); + assertThat(container.isRunning()).isFalse(); + } + + @Test + void shouldSupportBeanNameSetting() { + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + containerProperties.setMessageListener(messageListener); + + ShareKafkaMessageListenerContainer container = + new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); + + container.setBeanName("testContainer"); + assertThat(container.getBeanName()).isEqualTo("testContainer"); + assertThat(container.getListenerId()).isEqualTo("testContainer"); + } + +}