diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/ProducerProperties.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/ProducerProperties.java index 9713d18288..c22f0037b2 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/ProducerProperties.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/ProducerProperties.java @@ -83,6 +83,8 @@ public class ProducerProperties { private boolean dynamicPartitionUpdatesEnabled = false; + private boolean splitMode = false; + public String getBindingName() { return bindingName; } @@ -215,6 +217,14 @@ public void setDynamicPartitionUpdatesEnabled(boolean enabled) { this.dynamicPartitionUpdatesEnabled = enabled; } + public boolean isSplitMode() { + return splitMode; + } + + public void setSplitMode(boolean splitMode) { + this.splitMode = splitMode; + } + static class ExpressionSerializer extends JsonSerializer { @Override diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java index b6af8f7a03..0c81222a07 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -614,6 +615,13 @@ private void bindFunctionToDestinations(BindableProxyFactory bindableProxyFactor }); } else { + boolean isSplitMode = outputBindingNames.stream() + .map(bindingName -> this.serviceProperties.getBindings().get(bindingName).getProducer()) + .filter(Objects::nonNull) + .anyMatch(ProducerProperties::isSplitMode); + + function.setEnableSplitting(isSplitMode); + String outputDestinationName = this.determineOutputDestinationName(0, bindableProxyFactory, function.isConsumer()); if (!ObjectUtils.isEmpty(inputBindingNames)) { String inputDestinationName = inputBindingNames.iterator().next();