Skip to content

Commit c3d9719

Browse files
support custom class that exteds producer + return an error when not possible to extract props
1 parent 6569151 commit c3d9719

File tree

7 files changed

+157
-93
lines changed

7 files changed

+157
-93
lines changed

examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java

Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,68 +9,47 @@
99
import org.slf4j.LoggerFactory;
1010

1111
import java.nio.charset.StandardCharsets;
12-
import java.util.Properties;
12+
import java.util.*;
1313

14-
/**
15-
* Example application that uses the Kafka Clients API to produce messages.
16-
* Run with:
17-
* java -javaagent:path/to/superstream-clients-1.0.0.jar
18-
* -Dlogback.configurationFile=logback.xml -jar
19-
* kafka-clients-example-1.0.0-jar-with-dependencies.jar
20-
*
21-
* Prerequisites:
22-
* 1. A Kafka server with the following topics:
23-
* - superstream.metadata_v1 - with a configuration message
24-
* - superstream.clients - for client reports
25-
* - example-topic - for test messages
26-
*
27-
* Environment variables:
28-
* - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default:
29-
* localhost:9092)
30-
* - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for
31-
* (default: example-topic)
32-
*/
3314
public class KafkaProducerExample {
3415
private static final Logger logger = LoggerFactory.getLogger(KafkaProducerExample.class);
3516

3617
// === Configuration Constants ===
3718
private static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092";
3819

3920
private static final String CLIENT_ID = "superstream-example-producer";
40-
private static final String COMPRESSION_TYPE = "zstd"; // Changed from gzip to snappy for better visibility
41-
private static final Integer BATCH_SIZE = 1048576; // 1MB batch size
21+
private static final String COMPRESSION_TYPE = "none"; // Changed from gzip to snappy for better visibility
22+
private static final Integer BATCH_SIZE = 10; // 1MB batch size
4223

4324
private static final String TOPIC_NAME = "example-topic";
4425
private static final String MESSAGE_KEY = "test-key";
4526
// Create a larger message that will compress well
4627
private static final String MESSAGE_VALUE = generateLargeCompressibleMessage();
4728

4829
public static void main(String[] args) {
49-
Properties props = new Properties();
30+
// Create a Map with the configuration
31+
Map<String, Object> props = new HashMap<>();
5032
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_BOOTSTRAP_SERVERS);
51-
props.put("client.id", CLIENT_ID);
33+
props.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
5234
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
5335
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
5436
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE);
5537
props.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE);
56-
props.put(ProducerConfig.LINGER_MS_CONFIG, 500); // Force batching by waiting 500ms
38+
props.put(ProducerConfig.LINGER_MS_CONFIG, 500);
39+
5740

5841
long recordCount = 50; // Number of messages to send
5942
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
6043
while (true) {
6144
// Send 50 large messages to see compression benefits
62-
logger.info("Starting to send {} large messages...", recordCount);
6345
for (int i = 1; i <= recordCount; i++) {
6446
String messageKey = MESSAGE_KEY + "-" + i;
6547
String messageValue = MESSAGE_VALUE + "-" + i + "-" + System.currentTimeMillis();
6648
producer.send(new ProducerRecord<>(TOPIC_NAME, messageKey, messageValue));
6749
}
6850

69-
logger.info("All 50 large messages queued successfully! Adding a producer.flush() to send them all at once...");
7051
producer.flush();
71-
Thread.sleep(7000000);
72-
logger.info("Waking up and preparing to send the next batch of messages");
73-
// return;
52+
Thread.sleep(100000);
7453
}
7554
} catch (Exception e) {
7655
logger.error("Error sending message", e);
@@ -88,10 +67,11 @@ private static String generateLargeCompressibleMessage() {
8867
json.append(" },\n");
8968
json.append(" \"data\": {\n");
9069
json.append(" \"metrics\": [\n");
91-
70+
9271
// Add repeating metrics data to reach ~1KB
9372
for (int i = 0; i < 15; i++) {
94-
if (i > 0) json.append(",\n");
73+
if (i > 0)
74+
json.append(",\n");
9575
json.append(" {\n");
9676
json.append(" \"name\": \"metric").append(i).append("\",\n");
9777
json.append(" \"value\": ").append(i * 10).append(",\n");
@@ -102,7 +82,7 @@ private static String generateLargeCompressibleMessage() {
10282
json.append(" }\n");
10383
json.append(" }");
10484
}
105-
85+
10686
json.append("\n ]\n");
10787
json.append(" },\n");
10888
json.append(" \"config\": {\n");
@@ -112,10 +92,10 @@ private static String generateLargeCompressibleMessage() {
11292
json.append(" \"encryption\": false\n");
11393
json.append(" }\n");
11494
json.append("}");
115-
95+
11696
String result = json.toString();
11797
logger.debug("Generated compressible message of {} bytes", result.getBytes(StandardCharsets.UTF_8).length);
118-
98+
11999
return result;
120100
}
121101
}

examples/spring-kafka-example/src/main/java/ai/superstream/examples/TemplateKafkaProducer.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,19 @@
1515
public class TemplateKafkaProducer extends KafkaProducer<String, String> {
1616

1717
public TemplateKafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
18-
// Delegate to the regular KafkaProducer(Map<String,Object>) constructor
19-
// by extracting the configuration from the template's producer factory.
20-
super(kafkaTemplate.getProducerFactory().getConfigurationProperties());
18+
// Spring returns an unmodifiable Map; create a mutable copy so the
19+
// Superstream optimiser (and KafkaProducer itself) can adjust values.
20+
super(toProperties(kafkaTemplate.getProducerFactory().getConfigurationProperties()));
21+
}
22+
23+
private static java.util.Properties toProperties(java.util.Map<String, Object> config) {
24+
java.util.Map<String,Object> mutable = toMutableMap(config);
25+
java.util.Properties props = new java.util.Properties();
26+
props.putAll(mutable);
27+
return props;
28+
}
29+
30+
private static java.util.Map<String, Object> toMutableMap(java.util.Map<String, Object> config) {
31+
return (java.util.Map<String, Object>) config;
2132
}
2233
}

superstream-clients/dependency-reduced-pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<groupId>ai.superstream</groupId>
55
<artifactId>superstream-clients</artifactId>
66
<name>Superstream Kafka Client Optimizer</name>
7-
<version>1.0.14</version>
7+
<version>1.0.15</version>
88
<description>A Java library that dynamically optimizes Kafka client configuration based on recommendations</description>
99
<url>https://github.com/superstreamlabs/superstream-clients-java</url>
1010
<developers>

superstream-clients/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>ai.superstream</groupId>
88
<artifactId>superstream-clients</artifactId>
9-
<version>1.0.14</version>
9+
<version>1.0.15</version>
1010
<packaging>jar</packaging>
1111

1212
<name>Superstream Kafka Client Optimizer</name>

0 commit comments

Comments
 (0)