From 73560dec675fdbb5877fcff80f0135232c418f33 Mon Sep 17 00:00:00 2001 From: gihong-park Date: Sun, 3 Aug 2025 23:41:08 +0900 Subject: [PATCH] Add caching and logging control options for KTable materialization - Add cachingDisabled and loggingDisabled properties to KafkaStreamsConsumerProperties - Update getMaterialized method to apply caching and logging settings - Fixes gh-3094 Signed-off-by: gihong-park --- .../AbstractKafkaStreamsBinderProcessor.java | 27 ++++++- .../KafkaStreamsConsumerProperties.java | 27 +++++++ ...appingsProviderAutoConfigurationTests.java | 79 +++++++++++++++++++ .../configuration-options.adoc | 13 +++ 4 files changed, 142 insertions(+), 4 deletions(-) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java index 30d11b0f25..555c3e2cf6 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java @@ -92,6 +92,7 @@ /** * @author Soby Chacko * @author Ralf Wiedmann + * @author Gihong Park * @since 3.0.0 */ public abstract class AbstractKafkaStreamsBinderProcessor implements ApplicationContextAware { @@ -542,13 +543,31 @@ private KTable materializedAs(StreamsBuilder streamsBuilder, String final Consumed consumed = getConsumed(kafkaStreamsConsumerProperties, k, v, autoOffsetReset); return streamsBuilder.table(this.bindingServiceProperties.getBindingDestination(destination), - consumed, getMaterialized(storeName, k, v)); + consumed, getMaterialized(storeName, k, v, kafkaStreamsConsumerProperties.isCachingDisabled(), kafkaStreamsConsumerProperties.isLoggingDisabled())); } private Materialized> getMaterialized( - String storeName, Serde k, Serde v) { - return Materialized.>as(storeName) + String storeName, Serde k, Serde v, Boolean isCachingDisabled, Boolean isLoggingDisabled) { + Materialized> materialized = + Materialized.>as(storeName) .withKeySerde(k).withValueSerde(v); + + if (isCachingDisabled != null) { + if (isCachingDisabled) { + materialized = materialized.withCachingDisabled(); + } + else { + materialized = materialized.withCachingEnabled(); + } + } + + if (isLoggingDisabled != null) { + if (isLoggingDisabled) { + materialized = materialized.withLoggingDisabled(); + } + } + + return materialized; } private GlobalKTable materializedAsGlobalKTable( @@ -558,7 +577,7 @@ private GlobalKTable materializedAsGlobalKTable( return streamsBuilder.globalTable( this.bindingServiceProperties.getBindingDestination(destination), consumed, - getMaterialized(storeName, k, v)); + getMaterialized(storeName, k, v, kafkaStreamsConsumerProperties.isCachingDisabled(), kafkaStreamsConsumerProperties.isLoggingDisabled())); } private GlobalKTable getGlobalKTable(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/properties/KafkaStreamsConsumerProperties.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/properties/KafkaStreamsConsumerProperties.java index 616fb207f5..0ec656eefa 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/properties/KafkaStreamsConsumerProperties.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/properties/KafkaStreamsConsumerProperties.java @@ -24,6 +24,7 @@ * * @author Marius Bogoevici * @author Soby Chacko + * @author Gihong Park */ public class KafkaStreamsConsumerProperties extends KafkaConsumerProperties { @@ -44,6 +45,16 @@ public class KafkaStreamsConsumerProperties extends KafkaConsumerProperties { */ private String materializedAs; + /** + * Disable caching for materialized KTable. + */ + private boolean cachingDisabled; + + /** + * Disable logging for materialized KTable. + */ + private boolean loggingDisabled; + /** * Per input binding deserialization handler. */ @@ -109,6 +120,22 @@ public void setMaterializedAs(String materializedAs) { this.materializedAs = materializedAs; } + public boolean isCachingDisabled() { + return this.cachingDisabled; + } + + public void setCachingDisabled(boolean cachingDisabled) { + this.cachingDisabled = cachingDisabled; + } + + public boolean isLoggingDisabled() { + return this.loggingDisabled; + } + + public void setLoggingDisabled(boolean loggingDisabled) { + this.loggingDisabled = loggingDisabled; + } + public String getTimestampExtractorBeanName() { return timestampExtractorBeanName; } diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/ExtendedBindingHandlerMappingsProviderAutoConfigurationTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/ExtendedBindingHandlerMappingsProviderAutoConfigurationTests.java index c5b3cb9744..81bea35f23 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/ExtendedBindingHandlerMappingsProviderAutoConfigurationTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/ExtendedBindingHandlerMappingsProviderAutoConfigurationTests.java @@ -77,6 +77,85 @@ void defaultsRespectedWhenCustomBindingProperties() { }); } + @Test + void cachingAndLoggingDisabledPropertiesWork() { + this.contextRunner + .withPropertyValues( + "spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.caching-disabled: true", + "spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.logging-disabled: true") + .run((context) -> { + assertThat(context) + .hasNotFailed() + .hasSingleBean(KafkaStreamsExtendedBindingProperties.class); + KafkaStreamsExtendedBindingProperties extendedBindingProperties = context.getBean(KafkaStreamsExtendedBindingProperties.class); + assertThat(extendedBindingProperties.getExtendedConsumerProperties("process-in-0")) + .hasFieldOrPropertyWithValue("cachingDisabled", true) + .hasFieldOrPropertyWithValue("loggingDisabled", true); + }); + } + + @Test + void cachingAndLoggingDefaultValues() { + this.contextRunner.run((context) -> { + assertThat(context) + .hasNotFailed() + .hasSingleBean(KafkaStreamsExtendedBindingProperties.class); + KafkaStreamsExtendedBindingProperties extendedBindingProperties = context.getBean(KafkaStreamsExtendedBindingProperties.class); + assertThat(extendedBindingProperties.getExtendedConsumerProperties("process-in-0")) + .hasFieldOrPropertyWithValue("cachingDisabled", false) + .hasFieldOrPropertyWithValue("loggingDisabled", false); + }); + } + + @Test + void onlyCachingDisabledProperty() { + this.contextRunner + .withPropertyValues( + "spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.caching-disabled: true") + .run((context) -> { + assertThat(context) + .hasNotFailed() + .hasSingleBean(KafkaStreamsExtendedBindingProperties.class); + KafkaStreamsExtendedBindingProperties extendedBindingProperties = context.getBean(KafkaStreamsExtendedBindingProperties.class); + assertThat(extendedBindingProperties.getExtendedConsumerProperties("process-in-0")) + .hasFieldOrPropertyWithValue("cachingDisabled", true) + .hasFieldOrPropertyWithValue("loggingDisabled", false); + }); + } + + @Test + void onlyLoggingDisabledProperty() { + this.contextRunner + .withPropertyValues( + "spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.logging-disabled: true") + .run((context) -> { + assertThat(context) + .hasNotFailed() + .hasSingleBean(KafkaStreamsExtendedBindingProperties.class); + KafkaStreamsExtendedBindingProperties extendedBindingProperties = context.getBean(KafkaStreamsExtendedBindingProperties.class); + assertThat(extendedBindingProperties.getExtendedConsumerProperties("process-in-0")) + .hasFieldOrPropertyWithValue("cachingDisabled", false) + .hasFieldOrPropertyWithValue("loggingDisabled", true); + }); + } + + @Test + void defaultAndBindingSpecificCachingLoggingProperties() { + this.contextRunner + .withPropertyValues( + "spring.cloud.stream.kafka.streams.default.consumer.caching-disabled: true", + "spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.logging-disabled: true") + .run((context) -> { + assertThat(context) + .hasNotFailed() + .hasSingleBean(KafkaStreamsExtendedBindingProperties.class); + KafkaStreamsExtendedBindingProperties extendedBindingProperties = context.getBean(KafkaStreamsExtendedBindingProperties.class); + assertThat(extendedBindingProperties.getExtendedConsumerProperties("process-in-0")) + .hasFieldOrPropertyWithValue("cachingDisabled", true) + .hasFieldOrPropertyWithValue("loggingDisabled", true); + }); + } + @EnableAutoConfiguration static class KafkaStreamsTestApp { } diff --git a/docs/modules/ROOT/pages/kafka/kafka-streams-binder/configuration-options.adoc b/docs/modules/ROOT/pages/kafka/kafka-streams-binder/configuration-options.adoc index 970d8a747e..d9e062374b 100644 --- a/docs/modules/ROOT/pages/kafka/kafka-streams-binder/configuration-options.adoc +++ b/docs/modules/ROOT/pages/kafka/kafka-streams-binder/configuration-options.adoc @@ -146,6 +146,19 @@ state store to materialize when using incoming KTable types + Default: `none`. +cachingDisabled:: +Disable caching for materialized KTable. +When set to `true`, calls `withCachingDisabled()` on the Materialized object. +When set to `false`, calls `withCachingEnabled()` on the Materialized object. ++ +Default: `false`. + +loggingDisabled:: +Disable logging for materialized KTable. +When set to `true`, calls `withLoggingDisabled()` on the Materialized object. ++ +Default: `false`. + useNativeDecoding:: flag to enable/disable native decoding +