Skip to content

Commit d155fbc

Browse files
null checks + examples and readme fixes
1 parent 3bf8fc3 commit d155fbc

18 files changed

+263
-156
lines changed

.idea/runConfigurations/kafka_clients_example.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/runConfigurations/spring_example.xml

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,97 @@ Works with any Java library that depends on `kafka-clients`, including:
3131
- **Intelligent optimization**: Identifies the most impactful topics to optimize
3232
- **Graceful fallback**: Falls back to default settings if optimization fails
3333

34+
## Important: Producer Configuration Requirements
35+
36+
When initializing your Kafka producers, please ensure you pass the configuration as a mutable object. The Superstream library needs to modify the producer configuration to apply optimizations. The following initialization patterns are supported:
37+
38+
**Supported (Recommended)**:
39+
```java
40+
// Using Properties (recommended)
41+
Properties props = new Properties();
42+
props.put("bootstrap.servers", "localhost:9092");
43+
// ... other properties ...
44+
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
45+
46+
// Using a regular HashMap
47+
Map<String, Object> config = new HashMap<>();
48+
config.put("bootstrap.servers", "localhost:9092");
49+
// ... other properties ...
50+
KafkaProducer<String, String> producer = new KafkaProducer<>(config);
51+
52+
// Using Spring's @Value annotations and configuration loading
53+
@Configuration
54+
public class KafkaConfig {
55+
@Value("${spring.kafka.bootstrap-servers}")
56+
private String bootstrapServers;
57+
// ... other properties ...
58+
59+
@Bean
60+
public ProducerFactory<String, String> producerFactory() {
61+
Map<String, Object> configProps = new HashMap<>();
62+
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
63+
// ... other properties ...
64+
return new DefaultKafkaProducerFactory<>(configProps);
65+
}
66+
}
67+
```
68+
69+
**Not Supported**:
70+
```java
71+
// Using Collections.unmodifiableMap
72+
Map<String, Object> config = Collections.unmodifiableMap(new HashMap<>());
73+
KafkaProducer<String, String> producer = new KafkaProducer<>(config);
74+
75+
// Using Map.of() (creates unmodifiable map)
76+
KafkaProducer<String, String> producer = new KafkaProducer<>(
77+
Map.of("bootstrap.servers", "localhost:9092")
78+
);
79+
80+
// Using ProducerConfig.originals() which returns an unmodifiable copy
81+
ProducerConfig config = new ProducerConfig(props);
82+
KafkaProducer<String, String> producer = new KafkaProducer<>(config.originals());
83+
84+
// Using KafkaTemplate's getProducerFactory().getConfigurationProperties()
85+
// which returns an unmodifiable map
86+
KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory);
87+
KafkaProducer<String, String> producer = new KafkaProducer<>(
88+
template.getProducerFactory().getConfigurationProperties()
89+
);
90+
```
91+
92+
### Spring Applications
93+
Spring applications that use `@Value` annotations and Spring's configuration loading (like `application.yml` or `application.properties`) are fully supported. The Superstream library will be able to modify the configuration when it's loaded into a mutable `Map` or `Properties` object in your Spring configuration class.
94+
95+
Example of supported Spring configuration:
96+
```yaml
97+
# application.yml
98+
spring:
99+
kafka:
100+
producer:
101+
properties:
102+
compression.type: snappy
103+
batch.size: 16384
104+
linger.ms: 1
105+
```
106+
107+
```java
108+
@Configuration
109+
public class KafkaConfig {
110+
@Value("${spring.kafka.producer.properties.compression.type}")
111+
private String compressionType;
112+
113+
@Bean
114+
public ProducerFactory<String, String> producerFactory() {
115+
Map<String, Object> configProps = new HashMap<>();
116+
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
117+
return new DefaultKafkaProducerFactory<>(configProps);
118+
}
119+
}
120+
```
121+
122+
### Why This Matters
123+
The Superstream library needs to modify your producer's configuration to apply optimizations based on your cluster's characteristics. This includes adjusting settings like compression, batch size, and other performance parameters. When the configuration is immutable, these optimizations cannot be applied.
124+
34125
## Installation
35126

36127
*Superstream package*: https://central.sonatype.com/artifact/ai.superstream/superstream-clients-java/overview
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package ai.superstream;
2+
3+
import org.springframework.beans.factory.annotation.Value;
4+
import org.springframework.context.annotation.Bean;
5+
import org.springframework.context.annotation.Configuration;
6+
import org.springframework.kafka.core.KafkaTemplate;
7+
import org.springframework.kafka.core.ProducerFactory;
8+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
9+
import org.apache.kafka.clients.producer.ProducerConfig;
10+
11+
import java.util.HashMap;
12+
import java.util.Map;
13+
14+
@Configuration
15+
public class KafkaProducerConfig {
16+
17+
@Value("${spring.kafka.bootstrap-servers}")
18+
private String bootstrapServers;
19+
20+
@Value("${spring.kafka.producer.key-serializer}")
21+
private String keySerializer;
22+
23+
@Value("${spring.kafka.producer.value-serializer}")
24+
private String valueSerializer;
25+
26+
@Value("${spring.kafka.producer.client-id}")
27+
private String clientId;
28+
29+
@Value("${spring.kafka.producer.properties.compression.type}")
30+
private String compressionType;
31+
32+
@Value("${spring.kafka.producer.properties.batch.size}")
33+
private String batchSize;
34+
35+
@Value("${spring.kafka.producer.properties.linger.ms}")
36+
private String lingerMs;
37+
38+
@Bean
39+
public ProducerFactory<String, String> producerFactory() {
40+
Map<String, Object> configProps = new HashMap<>();
41+
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
42+
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
43+
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
44+
configProps.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
45+
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
46+
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
47+
configProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
48+
49+
return new DefaultKafkaProducerFactory<>(configProps);
50+
}
51+
52+
@Bean
53+
public KafkaTemplate<String, String> kafkaTemplate() {
54+
return new KafkaTemplate<>(producerFactory());
55+
}
56+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package ai.superstream;
2+
3+
import org.springframework.beans.factory.annotation.Value;
4+
import org.springframework.kafka.core.KafkaTemplate;
5+
import org.springframework.stereotype.Service;
6+
7+
@Service
8+
public class KafkaProducerService {
9+
10+
private final KafkaTemplate<String, String> kafkaTemplate;
11+
12+
@Value("${kafka.topic}")
13+
private String topic;
14+
15+
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
16+
this.kafkaTemplate = kafkaTemplate;
17+
}
18+
19+
public void sendMessage(String message) {
20+
kafkaTemplate.send(topic, message);
21+
}
22+
23+
public void sendMessageWithKey(String key, String message) {
24+
kafkaTemplate.send(topic, key, message);
25+
}
26+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package ai.superstream;
2+
3+
import org.springframework.boot.CommandLineRunner;
4+
import org.springframework.stereotype.Component;
5+
6+
@Component
7+
public class MessageSender implements CommandLineRunner {
8+
9+
private final KafkaProducerService producerService;
10+
11+
public MessageSender(KafkaProducerService producerService) {
12+
this.producerService = producerService;
13+
}
14+
15+
@Override
16+
public void run(String... args) {
17+
// Send a simple message
18+
producerService.sendMessage("Hello from Spring Kafka Example!");
19+
20+
// Send a message with a key
21+
producerService.sendMessageWithKey("user-1", "Message for user 1");
22+
producerService.sendMessageWithKey("user-2", "Message for user 2");
23+
24+
System.out.println("Messages sent successfully!");
25+
}
26+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package ai.superstream;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
@SpringBootApplication
7+
public class SpringKafkaExample {
8+
public static void main(String[] args) {
9+
SpringApplication.run(SpringKafkaExample.class, args);
10+
}
11+
}

examples/spring-kafka-example/src/main/java/ai/superstream/examples/SpringKafkaExampleApplication.java

Lines changed: 0 additions & 44 deletions
This file was deleted.

examples/spring-kafka-example/src/main/java/ai/superstream/examples/TemplateKafkaProducer.java

Lines changed: 0 additions & 33 deletions
This file was deleted.

examples/spring-kafka-example/src/main/java/ai/superstream/examples/config/KafkaConfig.java

Lines changed: 0 additions & 31 deletions
This file was deleted.

0 commit comments

Comments
 (0)