Skip to content

Commit a20e11e

Browse files
improve error logs + bugfix copy parent client configs
1 parent d2861fc commit a20e11e

File tree

10 files changed

+173
-131
lines changed

10 files changed

+173
-131
lines changed

.idea/runConfigurations/Aiven_example.xml

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/runConfigurations/kafka_clients_example.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/runConfigurations/spring_example.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/kafka-clients-example/src/main/java/ai/superstream/examples/AivenKafkaExample.java

Lines changed: 67 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
import org.slf4j.Logger;
99
import org.slf4j.LoggerFactory;
1010

11+
import java.nio.charset.StandardCharsets;
1112
import java.util.Properties;
12-
import java.util.concurrent.ExecutionException;
1313

1414
/**
1515
* Example application that uses the Kafka Clients API to produce messages securely over SSL (TLS).
@@ -81,12 +81,12 @@ public class AivenKafkaExample {
8181
private static final String KEYSTORE_TYPE = "PKCS12";
8282

8383
private static final String CLIENT_ID = "superstream-example-producer";
84-
private static final String COMPRESSION_TYPE = "gzip";
85-
private static final Integer BATCH_SIZE = 16384;
84+
private static final String COMPRESSION_TYPE = "none";
85+
private static final Integer BATCH_SIZE = 15;
8686

8787
private static final String TOPIC_NAME = "example-topic";
8888
private static final String MESSAGE_KEY = "test-key";
89-
private static final String MESSAGE_VALUE = "Hello, Superstream!";
89+
private static final String MESSAGE_VALUE = generateLargeCompressibleMessage();
9090

9191
public static void main(String[] args) {
9292
// Get bootstrap servers from environment variable or use default
@@ -112,37 +112,70 @@ public static void main(String[] args) {
112112
// Set some basic configuration
113113
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE);
114114
props.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE);
115-
logger.info("Creating producer with bootstrap servers: {}", bootstrapServers);
116-
logger.info("Original producer configuration:");
117-
props.forEach((k, v) -> logger.info(" {} = {}", k, v));
118-
119-
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
120-
// The Superstream Agent should have intercepted the producer creation
121-
// and potentially optimized the configuration
122-
123-
// Log the actual configuration used by the producer
124-
logger.info("Actual producer configuration (after potential Superstream optimization):");
125-
126-
// Get the actual configuration from the producer via reflection
127-
java.lang.reflect.Field configField = producer.getClass().getDeclaredField("producerConfig");
128-
configField.setAccessible(true);
129-
org.apache.kafka.clients.producer.ProducerConfig actualConfig =
130-
(org.apache.kafka.clients.producer.ProducerConfig) configField.get(producer);
131-
132-
logger.info(" compression.type = {}", actualConfig.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
133-
logger.info(" batch.size = {}", actualConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
134-
135-
// Send a test message
136-
logger.info("Sending message to topic {}: key={}, value={}", TOPIC_NAME, MESSAGE_KEY, MESSAGE_VALUE);
137-
producer.send(new ProducerRecord<>(TOPIC_NAME, MESSAGE_KEY, MESSAGE_VALUE)).get();
138-
logger.info("Message sent successfully!");
139-
} catch (InterruptedException e) {
140-
Thread.currentThread().interrupt();
141-
logger.error("Interrupted while sending message", e);
142-
} catch (ExecutionException e) {
143-
logger.error("Error sending message", e);
115+
116+
// Pass the immutable map directly to the KafkaProducer constructor
117+
Producer<String, String> producer = new KafkaProducer<String, String>(props);
118+
119+
long recordCount = 10; // Number of messages to send
120+
try {
121+
while (true) {
122+
// Send 50 large messages to see compression benefits
123+
for (int i = 1; i <= recordCount; i++) {
124+
String messageKey = MESSAGE_KEY + "-" + i;
125+
String messageValue = MESSAGE_VALUE + "-" + i + "-" + System.currentTimeMillis();
126+
producer.send(new ProducerRecord<>(TOPIC_NAME, messageKey, messageValue));
127+
}
128+
129+
producer.flush();
130+
Thread.sleep(150000);
131+
}
144132
} catch (Exception e) {
145-
logger.error("Unexpected error", e);
133+
logger.error("Error sending message", e);
134+
} finally {
135+
producer.close();
136+
}
137+
}
138+
139+
private static String generateLargeCompressibleMessage() {
140+
// Return a 1KB JSON string with repeating data that can be compressed well
141+
StringBuilder json = new StringBuilder();
142+
json.append("{\n");
143+
json.append(" \"metadata\": {\n");
144+
json.append(" \"id\": \"12345\",\n");
145+
json.append(" \"type\": \"example\",\n");
146+
json.append(" \"timestamp\": 1635954438000\n");
147+
json.append(" },\n");
148+
json.append(" \"data\": {\n");
149+
json.append(" \"metrics\": [\n");
150+
151+
// Add repeating metrics data to reach ~1KB
152+
for (int i = 0; i < 15; i++) {
153+
if (i > 0)
154+
json.append(",\n");
155+
json.append(" {\n");
156+
json.append(" \"name\": \"metric").append(i).append("\",\n");
157+
json.append(" \"value\": ").append(i * 10).append(",\n");
158+
json.append(" \"tags\": [\"tag1\", \"tag2\", \"tag3\"],\n");
159+
json.append(" \"properties\": {\n");
160+
json.append(" \"property1\": \"value1\",\n");
161+
json.append(" \"property2\": \"value2\"\n");
162+
json.append(" }\n");
163+
json.append(" }");
146164
}
165+
166+
json.append("\n ]\n");
167+
json.append(" },\n");
168+
json.append(" \"config\": {\n");
169+
json.append(" \"sampling\": \"full\",\n");
170+
json.append(" \"retention\": \"30d\",\n");
171+
json.append(" \"compression\": true,\n");
172+
json.append(" \"encryption\": false\n");
173+
json.append(" }\n");
174+
json.append("}");
175+
176+
String result = json.toString();
177+
logger.debug("Generated compressible message of {} bytes", result.getBytes(StandardCharsets.UTF_8).length);
178+
179+
return result;
147180
}
148181
}

superstream-clients/dependency-reduced-pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<groupId>ai.superstream</groupId>
55
<artifactId>superstream-clients</artifactId>
66
<name>Superstream Kafka Client Optimizer</name>
7-
<version>1.0.19</version>
7+
<version>1.0.200</version>
88
<description>A Java library that dynamically optimizes Kafka client configuration based on recommendations</description>
99
<url>https://github.com/superstreamlabs/superstream-clients-java</url>
1010
<developers>

superstream-clients/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>ai.superstream</groupId>
88
<artifactId>superstream-clients</artifactId>
9-
<version>1.0.19</version>
9+
<version>1.0.200</version>
1010
<packaging>jar</packaging>
1111

1212
<name>Superstream Kafka Client Optimizer</name>

superstream-clients/src/main/java/ai/superstream/core/ClientReporter.java

Lines changed: 4 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import ai.superstream.model.ClientMessage;
44
import ai.superstream.util.NetworkUtils;
55
import ai.superstream.util.SuperstreamLogger;
6+
import ai.superstream.util.KafkaPropertiesUtils;
67
import com.fasterxml.jackson.databind.ObjectMapper;
78
import org.apache.kafka.clients.producer.KafkaProducer;
89
import org.apache.kafka.clients.producer.Producer;
@@ -50,8 +51,8 @@ public boolean reportClient(String bootstrapServers, Properties originalClientPr
5051
String error) {
5152
Properties properties = new Properties();
5253

53-
// Copy all authentication-related and essential properties from the original client
54-
copyAuthenticationProperties(originalClientProperties, properties);
54+
// Copy essential client configuration properties from the original client
55+
KafkaPropertiesUtils.copyClientConfigurationProperties(originalClientProperties, properties);
5556

5657
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
5758
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
@@ -92,7 +93,7 @@ public boolean reportClient(String bootstrapServers, Properties originalClientPr
9293
logger.debug("Successfully reported client information to {}", CLIENTS_TOPIC);
9394
return true;
9495
} catch (Exception e) {
95-
logger.error("[ERR-026] Error reporting client information", e);
96+
logger.error("[ERR-026] Error reporting client information. Error: {} - {}", e.getClass().getName(), e.getMessage(), e);
9697
return false;
9798
}
9899
}
@@ -158,46 +159,6 @@ private Map<String, Object> getCompleteProducerConfig(Map<String, Object> explic
158159
return completeConfig;
159160
}
160161

161-
// Helper method to copy authentication properties
162-
public static void copyAuthenticationProperties(Properties source, Properties destination) {
163-
if (source == null || destination == null) {
164-
logger.warn("Cannot copy authentication properties: source or destination is null");
165-
return;
166-
}
167-
// Authentication-related properties
168-
String[] authProps = {
169-
// Security protocol
170-
"security.protocol",
171-
172-
// SSL properties
173-
"ssl.truststore.location", "ssl.truststore.password",
174-
"ssl.keystore.location", "ssl.keystore.password",
175-
"ssl.key.password", "ssl.endpoint.identification.algorithm",
176-
"ssl.truststore.type", "ssl.keystore.type", "ssl.secure.random.implementation",
177-
"ssl.enabled.protocols", "ssl.cipher.suites",
178-
179-
// SASL properties
180-
"sasl.mechanism", "sasl.jaas.config",
181-
"sasl.client.callback.handler.class", "sasl.login.callback.handler.class",
182-
"sasl.login.class", "sasl.kerberos.service.name",
183-
"sasl.kerberos.kinit.cmd", "sasl.kerberos.ticket.renew.window.factor",
184-
"sasl.kerberos.ticket.renew.jitter", "sasl.kerberos.min.time.before.relogin",
185-
"sasl.login.refresh.window.factor", "sasl.login.refresh.window.jitter",
186-
"sasl.login.refresh.min.period.seconds", "sasl.login.refresh.buffer.seconds",
187-
188-
// Other important properties to preserve
189-
"request.timeout.ms", "retry.backoff.ms", "connections.max.idle.ms",
190-
"reconnect.backoff.ms", "reconnect.backoff.max.ms"
191-
};
192-
193-
// Copy all authentication properties if they exist in the source
194-
for (String prop : authProps) {
195-
if (source.containsKey(prop)) {
196-
destination.put(prop, source.get(prop));
197-
}
198-
}
199-
}
200-
201162
/**
202163
* Get the version of the Superstream Clients library.
203164
* @return The version string

superstream-clients/src/main/java/ai/superstream/core/ClientStatsReporter.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import ai.superstream.model.ClientStatsMessage;
55
import ai.superstream.util.NetworkUtils;
66
import ai.superstream.util.SuperstreamLogger;
7+
import ai.superstream.util.KafkaPropertiesUtils;
78

89
import com.fasterxml.jackson.databind.ObjectMapper;
910
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -74,9 +75,9 @@ public ClientStatsReporter(String bootstrapServers, Properties clientProperties,
7475

7576
this.statsCollector = new ClientStatsCollector();
7677

77-
// Copy authentication properties from the original client
78+
// Copy essential client configuration properties from the original client
7879
this.producerProperties = new Properties();
79-
ClientReporter.copyAuthenticationProperties(clientProperties, this.producerProperties);
80+
KafkaPropertiesUtils.copyClientConfigurationProperties(clientProperties, this.producerProperties);
8081

8182
// Set up basic producer properties
8283
this.producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
@@ -165,7 +166,7 @@ void drainInto(Producer<String, String> producer) {
165166
logger.debug("Producer {} stats sent: before={} bytes, after={} bytes",
166167
clientId, totalBytesBefore, totalBytesAfter);
167168
} catch (Exception e) {
168-
logger.error("[ERR-021] Failed to drain stats for client {}", clientId, e);
169+
logger.error("[ERR-021] Failed to drain stats for client {}. Error: {} - {}", clientId, e.getClass().getName(), e.getMessage(), e);
169170
}
170171
}
171172

@@ -225,7 +226,7 @@ private void run() {
225226
}
226227
producer.flush();
227228
} catch (Exception e) {
228-
logger.error("[ERR-022] Cluster stats coordinator failed for {}, please make sure the Kafka user has read/write/describe permissions on superstream.* topics: {}. Full stack trace:", bootstrapServers, e.getMessage(), e);
229+
logger.error("[ERR-022] Cluster stats coordinator failed for {}, please make sure the Kafka user has read/write/describe permissions on superstream.* topics. Error: {} - {}", bootstrapServers, e.getClass().getName(), e.getMessage(), e);
229230
}
230231
}
231232
}

superstream-clients/src/main/java/ai/superstream/core/MetadataConsumer.java

Lines changed: 6 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import ai.superstream.model.MetadataMessage;
44
import ai.superstream.util.SuperstreamLogger;
5+
import ai.superstream.util.KafkaPropertiesUtils;
56
import com.fasterxml.jackson.databind.ObjectMapper;
67
import org.apache.kafka.clients.consumer.Consumer;
78
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -32,9 +33,8 @@ public class MetadataConsumer {
3233
public MetadataMessage getMetadataMessage(String bootstrapServers, Properties originalClientProperties) {
3334
Properties properties = new Properties();
3435

35-
// Copy all authentication-related and essential properties from the original
36-
// client
37-
copyAuthenticationProperties(originalClientProperties, properties);
36+
// Copy essential client configuration properties from the original client
37+
KafkaPropertiesUtils.copyClientConfigurationProperties(originalClientProperties, properties);
3838

3939
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
4040
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
@@ -46,7 +46,7 @@ public MetadataMessage getMetadataMessage(String bootstrapServers, Properties or
4646
// Check if the metadata topic exists
4747
Set<String> topics = consumer.listTopics().keySet();
4848
if (!topics.contains(METADATA_TOPIC)) {
49-
logger.error("[ERR-034] Superstream internal topic is missing. This topic is required for Superstream to function properly. Please contact the Superstream team for assistance.");
49+
logger.error("[ERR-034] Superstream internal topic is missing. This topic is required for Superstream to function properly. Please make sure the Kafka user has read/write/describe permissions on superstream.* topics.");
5050
return null;
5151
}
5252

@@ -78,51 +78,11 @@ public MetadataMessage getMetadataMessage(String bootstrapServers, Properties or
7878
String json = records.iterator().next().value();
7979
return objectMapper.readValue(json, MetadataMessage.class);
8080
} catch (IOException e) {
81-
logger.error("[ERR-027] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please contact the Superstream team if the issue persists.", e);
81+
logger.error("[ERR-027] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please contact the Superstream team if the issue persists. Error: {} - {}", e.getClass().getName(), e.getMessage(), e);
8282
return null;
8383
} catch (Exception e) {
84-
logger.error("[ERR-028] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please make sure the Kafka user has read/write/describe permissions on superstream.* topics.", e);
84+
logger.error("[ERR-028] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please make sure the Kafka user has read/write/describe permissions on superstream.* topics. Error: {} - {}", e.getClass().getName(), e.getMessage(), e);
8585
return null;
8686
}
8787
}
88-
89-
// Helper method to copy authentication properties
90-
private void copyAuthenticationProperties(Properties source, Properties destination) {
91-
if (source == null || destination == null) {
92-
logger.error("[ERR-029] Cannot copy authentication properties: source or destination is null");
93-
return;
94-
}
95-
// Authentication-related properties
96-
String[] authProps = {
97-
// Security protocol
98-
"security.protocol",
99-
100-
// SSL properties
101-
"ssl.truststore.location", "ssl.truststore.password",
102-
"ssl.keystore.location", "ssl.keystore.password",
103-
"ssl.key.password", "ssl.endpoint.identification.algorithm",
104-
"ssl.truststore.type", "ssl.keystore.type", "ssl.secure.random.implementation",
105-
"ssl.enabled.protocols", "ssl.cipher.suites",
106-
107-
// SASL properties
108-
"sasl.mechanism", "sasl.jaas.config",
109-
"sasl.client.callback.handler.class", "sasl.login.callback.handler.class",
110-
"sasl.login.class", "sasl.kerberos.service.name",
111-
"sasl.kerberos.kinit.cmd", "sasl.kerberos.ticket.renew.window.factor",
112-
"sasl.kerberos.ticket.renew.jitter", "sasl.kerberos.min.time.before.relogin",
113-
"sasl.login.refresh.window.factor", "sasl.login.refresh.window.jitter",
114-
"sasl.login.refresh.min.period.seconds", "sasl.login.refresh.buffer.seconds",
115-
116-
// Other important properties to preserve
117-
"request.timeout.ms", "retry.backoff.ms", "connections.max.idle.ms",
118-
"reconnect.backoff.ms", "reconnect.backoff.max.ms"
119-
};
120-
121-
// Copy all authentication properties if they exist in the source
122-
for (String prop : authProps) {
123-
if (source.containsKey(prop)) {
124-
destination.put(prop, source.get(prop));
125-
}
126-
}
127-
}
12888
}

0 commit comments

Comments
 (0)