Skip to content

Commit fcab9ce

Browse files
bugfix - parsing the BS servers
1 parent f8e8ebc commit fcab9ce

File tree

6 files changed

+37
-7
lines changed

6 files changed

+37
-7
lines changed

.idea/runConfigurations/kafka_clients_example.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/runConfigurations/spring_example.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@ public class KafkaProducerExample {
2929
public static void main(String[] args) {
3030
// Build the configuration map first using a mutable map
3131
Map<String, Object> mutableProps = new java.util.HashMap<>();
32-
mutableProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_BOOTSTRAP_SERVERS);
33-
mutableProps.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
32+
// mutableProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_BOOTSTRAP_SERVERS);
33+
// mutableProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList(DEFAULT_BOOTSTRAP_SERVERS));
34+
mutableProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, List.of(DEFAULT_BOOTSTRAP_SERVERS));
35+
// mutableProps.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
3436
mutableProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
3537
mutableProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
3638
mutableProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE);
@@ -42,7 +44,6 @@ public static void main(String[] args) {
4244

4345
// Pass the immutable map directly to the KafkaProducer constructor
4446
Producer<String, String> producer = new KafkaProducer<String, String>(mutableProps);
45-
4647
long recordCount = 50; // Number of messages to send
4748
try {
4849
while (true) {

superstream-clients/dependency-reduced-pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<groupId>ai.superstream</groupId>
55
<artifactId>superstream-clients</artifactId>
66
<name>Superstream Kafka Client Optimizer</name>
7-
<version>1.0.17</version>
7+
<version>1.0.18</version>
88
<description>A Java library that dynamically optimizes Kafka client configuration based on recommendations</description>
99
<url>https://github.com/superstreamlabs/superstream-clients-java</url>
1010
<developers>

superstream-clients/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>ai.superstream</groupId>
88
<artifactId>superstream-clients</artifactId>
9-
<version>1.0.17</version>
9+
<version>1.0.18</version>
1010
<packaging>jar</packaging>
1111

1212
<name>Superstream Kafka Client Optimizer</name>

superstream-clients/src/main/java/ai/superstream/agent/KafkaProducerInterceptor.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,5 +1127,34 @@ public synchronized void putAll(java.util.Map<?,?> m) {
11271127
put(e.getKey(), e.getValue());
11281128
}
11291129
}
1130+
1131+
@Override
1132+
public String getProperty(String key) {
1133+
Object value = backing.get(key);
1134+
if (value == null) {
1135+
return super.getProperty(key);
1136+
}
1137+
1138+
// Handle special case for bootstrap.servers which can be any Collection<String>
1139+
if ("bootstrap.servers".equals(key) && value instanceof java.util.Collection) {
1140+
try {
1141+
@SuppressWarnings("unchecked")
1142+
java.util.Collection<String> serverCollection = (java.util.Collection<String>) value;
1143+
return String.join(",", serverCollection);
1144+
} catch (ClassCastException e) {
1145+
// If the collection doesn't contain strings, fall back to toString()
1146+
logger.debug("bootstrap.servers collection contains non-String elements, falling back to toString()");
1147+
}
1148+
}
1149+
1150+
// For all other cases, convert to String
1151+
return value.toString();
1152+
}
1153+
1154+
@Override
1155+
public String getProperty(String key, String defaultValue) {
1156+
String result = getProperty(key);
1157+
return result != null ? result : defaultValue;
1158+
}
11301159
}
11311160
}

0 commit comments

Comments
 (0)