Skip to content

Commit a66f2db

Browse files
improve logging
1 parent a20e11e commit a66f2db

File tree

9 files changed

+101
-12
lines changed

9 files changed

+101
-12
lines changed

.idea/runConfigurations/Aiven_example.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/runConfigurations/kafka_clients_example.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/runConfigurations/spring_example.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,16 @@ public class KafkaProducerExample {
2929
public static void main(String[] args) {
3030
// Build the configuration map first using a mutable map
3131
Map<String, Object> mutableProps = new java.util.HashMap<>();
32-
// mutableProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_BOOTSTRAP_SERVERS);
32+
mutableProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_BOOTSTRAP_SERVERS);
3333
// mutableProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList(DEFAULT_BOOTSTRAP_SERVERS));
34-
mutableProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, List.of(DEFAULT_BOOTSTRAP_SERVERS));
34+
// mutableProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, List.of(DEFAULT_BOOTSTRAP_SERVERS));
3535
// mutableProps.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
3636
mutableProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
3737
mutableProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
3838
mutableProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE);
3939
mutableProps.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE);
4040
mutableProps.put(ProducerConfig.LINGER_MS_CONFIG, 500);
41+
mutableProps.put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, "use_all_dns_ips");
4142

4243
// Wrap the map to make it immutable – simulates a user supplying an unmodifiable configuration object
4344
Map<String, Object> props = java.util.Collections.unmodifiableMap(mutableProps);

superstream-clients/dependency-reduced-pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<groupId>ai.superstream</groupId>
55
<artifactId>superstream-clients</artifactId>
66
<name>Superstream Kafka Client Optimizer</name>
7-
<version>1.0.200</version>
7+
<version>1.0.201</version>
88
<description>A Java library that dynamically optimizes Kafka client configuration based on recommendations</description>
99
<url>https://github.com/superstreamlabs/superstream-clients-java</url>
1010
<developers>

superstream-clients/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>ai.superstream</groupId>
88
<artifactId>superstream-clients</artifactId>
9-
<version>1.0.200</version>
9+
<version>1.0.201</version>
1010
<packaging>jar</packaging>
1111

1212
<name>Superstream Kafka Client Optimizer</name>

superstream-clients/src/main/java/ai/superstream/core/ClientReporter.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,25 @@ public boolean reportClient(String bootstrapServers, Properties originalClientPr
6363
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16KB batch size
6464
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
6565

66+
// Log the configuration before creating producer
67+
if (SuperstreamLogger.isDebugEnabled()) {
68+
StringBuilder configLog = new StringBuilder("Creating internal ClientReporter producer with configuration: ");
69+
properties.forEach((key, value) -> {
70+
// Mask sensitive values
71+
if (key.toString().toLowerCase().contains("password") ||
72+
key.toString().toLowerCase().contains("sasl.jaas.config")) {
73+
configLog.append(key).append("=[MASKED], ");
74+
} else {
75+
configLog.append(key).append("=").append(value).append(", ");
76+
}
77+
});
78+
// Remove trailing comma and space
79+
if (configLog.length() > 2) {
80+
configLog.setLength(configLog.length() - 2);
81+
}
82+
logger.debug(configLog.toString());
83+
}
84+
6685
try (Producer<String, String> producer = new KafkaProducer<>(properties)) {
6786
// Create the client message
6887
ClientMessage message = new ClientMessage(
@@ -93,7 +112,13 @@ public boolean reportClient(String bootstrapServers, Properties originalClientPr
93112
logger.debug("Successfully reported client information to {}", CLIENTS_TOPIC);
94113
return true;
95114
} catch (Exception e) {
96-
logger.error("[ERR-026] Error reporting client information. Error: {} - {}", e.getClass().getName(), e.getMessage(), e);
115+
// Convert stack trace to string
116+
java.io.StringWriter sw = new java.io.StringWriter();
117+
java.io.PrintWriter pw = new java.io.PrintWriter(sw);
118+
e.printStackTrace(pw);
119+
String stackTrace = sw.toString().replaceAll("\\r?\\n", " ");
120+
logger.error("[ERR-026] Error reporting client information. Error: {} - {}. Stack trace: {}",
121+
e.getClass().getName(), e.getMessage(), stackTrace);
97122
return false;
98123
}
99124
}

superstream-clients/src/main/java/ai/superstream/core/ClientStatsReporter.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,13 @@ void drainInto(Producer<String, String> producer) {
166166
logger.debug("Producer {} stats sent: before={} bytes, after={} bytes",
167167
clientId, totalBytesBefore, totalBytesAfter);
168168
} catch (Exception e) {
169-
logger.error("[ERR-021] Failed to drain stats for client {}. Error: {} - {}", clientId, e.getClass().getName(), e.getMessage(), e);
169+
// Convert stack trace to string
170+
java.io.StringWriter sw = new java.io.StringWriter();
171+
java.io.PrintWriter pw = new java.io.PrintWriter(sw);
172+
e.printStackTrace(pw);
173+
String stackTrace = sw.toString().replaceAll("\\r?\\n", " ");
174+
logger.error("[ERR-021] Failed to drain stats for client {}. Error: {} - {}. Stack trace: {}",
175+
clientId, e.getClass().getName(), e.getMessage(), stackTrace);
170176
}
171177
}
172178

@@ -220,13 +226,39 @@ void addReporter(ClientStatsReporter r) {
220226
private void run() {
221227
if (reporters.isEmpty())
222228
return;
229+
230+
// Log the configuration before creating producer
231+
if (SuperstreamLogger.isDebugEnabled()) {
232+
StringBuilder configLog = new StringBuilder("Creating internal ClientStatsReporter producer with configuration: ");
233+
baseProps.forEach((key, value) -> {
234+
// Mask sensitive values
235+
if (key.toString().toLowerCase().contains("password") ||
236+
key.toString().toLowerCase().contains("sasl.jaas.config")) {
237+
configLog.append(key).append("=[MASKED], ");
238+
} else {
239+
configLog.append(key).append("=").append(value).append(", ");
240+
}
241+
});
242+
// Remove trailing comma and space
243+
if (configLog.length() > 2) {
244+
configLog.setLength(configLog.length() - 2);
245+
}
246+
logger.debug(configLog.toString());
247+
}
248+
223249
try (Producer<String, String> producer = new KafkaProducer<>(baseProps)) {
224250
for (ClientStatsReporter r : reporters) {
225251
r.drainInto(producer);
226252
}
227253
producer.flush();
228254
} catch (Exception e) {
229-
logger.error("[ERR-022] Cluster stats coordinator failed for {}, please make sure the Kafka user has read/write/describe permissions on superstream.* topics. Error: {} - {}", bootstrapServers, e.getClass().getName(), e.getMessage(), e);
255+
// Convert stack trace to string
256+
java.io.StringWriter sw = new java.io.StringWriter();
257+
java.io.PrintWriter pw = new java.io.PrintWriter(sw);
258+
e.printStackTrace(pw);
259+
String stackTrace = sw.toString().replaceAll("\\r?\\n", " ");
260+
logger.error("[ERR-022] Cluster stats coordinator failed for {}, please make sure the Kafka user has read/write/describe permissions on superstream.* topics. Error: {} - {}. Stack trace: {}",
261+
bootstrapServers, e.getClass().getName(), e.getMessage(), stackTrace);
230262
}
231263
}
232264
}

superstream-clients/src/main/java/ai/superstream/core/MetadataConsumer.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,25 @@ public MetadataMessage getMetadataMessage(String bootstrapServers, Properties or
4242
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
4343
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, KafkaProducerInterceptor.SUPERSTREAM_LIBRARY_PREFIX + "metadata-consumer");
4444

45+
// Log the configuration before creating consumer
46+
if (SuperstreamLogger.isDebugEnabled()) {
47+
StringBuilder configLog = new StringBuilder("Creating internal MetadataConsumer with configuration: ");
48+
properties.forEach((key, value) -> {
49+
// Mask sensitive values
50+
if (key.toString().toLowerCase().contains("password") ||
51+
key.toString().toLowerCase().contains("sasl.jaas.config")) {
52+
configLog.append(key).append("=[MASKED], ");
53+
} else {
54+
configLog.append(key).append("=").append(value).append(", ");
55+
}
56+
});
57+
// Remove trailing comma and space
58+
if (configLog.length() > 2) {
59+
configLog.setLength(configLog.length() - 2);
60+
}
61+
logger.debug(configLog.toString());
62+
}
63+
4564
try (Consumer<String, String> consumer = new KafkaConsumer<>(properties)) {
4665
// Check if the metadata topic exists
4766
Set<String> topics = consumer.listTopics().keySet();
@@ -78,10 +97,22 @@ public MetadataMessage getMetadataMessage(String bootstrapServers, Properties or
7897
String json = records.iterator().next().value();
7998
return objectMapper.readValue(json, MetadataMessage.class);
8099
} catch (IOException e) {
81-
logger.error("[ERR-027] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please contact the Superstream team if the issue persists. Error: {} - {}", e.getClass().getName(), e.getMessage(), e);
100+
// Convert stack trace to string
101+
java.io.StringWriter sw = new java.io.StringWriter();
102+
java.io.PrintWriter pw = new java.io.PrintWriter(sw);
103+
e.printStackTrace(pw);
104+
String stackTrace = sw.toString().replaceAll("\\r?\\n", " ");
105+
logger.error("[ERR-027] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please contact the Superstream team if the issue persists. Error: {} - {}. Stack trace: {}",
106+
e.getClass().getName(), e.getMessage(), stackTrace);
82107
return null;
83108
} catch (Exception e) {
84-
logger.error("[ERR-028] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please make sure the Kafka user has read/write/describe permissions on superstream.* topics. Error: {} - {}", e.getClass().getName(), e.getMessage(), e);
109+
// Convert stack trace to string
110+
java.io.StringWriter sw = new java.io.StringWriter();
111+
java.io.PrintWriter pw = new java.io.PrintWriter(sw);
112+
e.printStackTrace(pw);
113+
String stackTrace = sw.toString().replaceAll("\\r?\\n", " ");
114+
logger.error("[ERR-028] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please make sure the Kafka user has read/write/describe permissions on superstream.* topics. Error: {} - {}. Stack trace: {}",
115+
e.getClass().getName(), e.getMessage(), stackTrace);
85116
return null;
86117
}
87118
}

0 commit comments

Comments
 (0)