Skip to content

Commit c9f73bf

Browse files
fixing bug of not being able to serialize Object in messages
1 parent eb0fdbe commit c9f73bf

File tree

7 files changed

+38
-34
lines changed

7 files changed

+38
-34
lines changed

examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerExample.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ public class KafkaProducerExample {
2929
public static void main(String[] args) {
3030
// Build the configuration map first using a mutable map
3131
Map<String, Object> mutableProps = new java.util.HashMap<>();
32-
// mutableProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_BOOTSTRAP_SERVERS);
33-
mutableProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList(DEFAULT_BOOTSTRAP_SERVERS));
34-
// mutableProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, List.of(DEFAULT_BOOTSTRAP_SERVERS));
32+
mutableProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList(DEFAULT_BOOTSTRAP_SERVERS));
3533
mutableProps.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
3634
mutableProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
3735
mutableProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
@@ -41,13 +39,13 @@ public static void main(String[] args) {
4139
mutableProps.put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, "use_all_dns_ips");
4240

4341
// Wrap the map to make it immutable – simulates a user supplying an unmodifiable configuration object
44-
Map<String, Object> props = java.util.Collections.unmodifiableMap(mutableProps);
42+
// Map<String, Object> props = java.util.Collections.unmodifiableMap(mutableProps);
4543

4644
// Pass the immutable map directly to the KafkaProducer constructor
4745
Producer<String, String> producer = new KafkaProducer<String, String>(mutableProps);
4846

4947
mutableProps.put(ProducerConfig.CLIENT_ID_CONFIG, CLIENT_ID+"1");
50-
Producer<String, String> producer1 = new KafkaProducer<String, String>(mutableProps);
48+
Producer<String, String> producer1 = new KafkaProducer<String, String>(mutableProps);
5149
long recordCount = 50; // Number of messages to send
5250
try {
5351
while (true) {
@@ -57,7 +55,7 @@ public static void main(String[] args) {
5755
String messageValue = MESSAGE_VALUE + "-" + i + "-" + System.currentTimeMillis();
5856
producer.send(new ProducerRecord<>(TOPIC_NAME, messageKey, messageValue));
5957
producer.send(new ProducerRecord<>(TOPIC_NAME+"1", messageKey, messageValue));
60-
producer1.send(new ProducerRecord<>(TOPIC_NAME+"1", messageKey, messageValue));
58+
producer1.send(new ProducerRecord<>(TOPIC_NAME+"1", messageKey, messageValue));
6159
}
6260

6361
producer.flush();
@@ -67,6 +65,7 @@ public static void main(String[] args) {
6765
logger.error("Error sending message", e);
6866
} finally {
6967
producer.close();
68+
producer1.close();
7069
}
7170
}
7271

superstream-clients/src/main/java/ai/superstream/core/ClientReporter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
public class ClientReporter {
2626
private static final SuperstreamLogger logger = SuperstreamLogger.getLogger(ClientReporter.class);
2727
private static final String CLIENTS_TOPIC = "superstream.clients";
28-
private static final ObjectMapper objectMapper = new ObjectMapper();
28+
private static final ObjectMapper objectMapper = new ObjectMapper()
29+
.configure(com.fasterxml.jackson.databind.SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
30+
.configure(com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
2931
private static final String CLIENT_VERSION = getClientVersion();
3032
private static final String LANGUAGE = "Java";
3133
private static final String CLIENT_TYPE = "producer"; // for now support only producers
@@ -112,7 +114,7 @@ public boolean reportClient(String bootstrapServers, Properties originalClientPr
112114
logger.debug("Successfully reported client information to {}", CLIENTS_TOPIC);
113115
return true;
114116
} catch (Exception e) {
115-
logger.error("[ERR-026] Error reporting client information.", e);
117+
logger.error("[ERR-026] Error reporting client information: {}", e.getMessage(), e);
116118
return false;
117119
}
118120
}

superstream-clients/src/main/java/ai/superstream/core/ClientStatsReporter.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
public class ClientStatsReporter {
3030
private static final SuperstreamLogger logger = SuperstreamLogger.getLogger(ClientStatsReporter.class);
3131
private static final String CLIENTS_TOPIC = "superstream.clients";
32-
private static final ObjectMapper objectMapper = new ObjectMapper();
32+
private static final ObjectMapper objectMapper = new ObjectMapper()
33+
.configure(com.fasterxml.jackson.databind.SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
34+
.configure(com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
3335
// Default reporting interval (5 minutes) – overridden when metadata provides a different value
3436
private static final long DEFAULT_REPORT_INTERVAL_MS = 300000; // 5 minutes
3537
private static final String DISABLED_ENV_VAR = "SUPERSTREAM_DISABLED";
@@ -180,7 +182,7 @@ void drainInto(Producer<String, String> producer) {
180182
logger.debug("Producer {} stats sent: before={} bytes, after={} bytes",
181183
clientId, totalBytesBefore, totalBytesAfter);
182184
} catch (Exception e) {
183-
logger.error("[ERR-021] Failed to drain stats for client {}.", clientId, e);
185+
logger.error("[ERR-021] Failed to drain stats for client {}: {}", clientId, e.getMessage(), e);
184186
}
185187
}
186188

@@ -344,8 +346,9 @@ private void run() {
344346
r.drainInto(producer);
345347
}
346348
producer.flush();
349+
logger.debug("Successfully reported cluster stats to {}", CLIENTS_TOPIC);
347350
} catch (Exception e) {
348-
logger.error("[ERR-022] Cluster stats coordinator failed for {}, please make sure the Kafka user has read/write/describe permissions on superstream.* topics.", bootstrapServers, e);
351+
logger.error("[ERR-022] Cluster stats coordinator failed for {}, please make sure the Kafka user has read/write/describe permissions on superstream.* topics: {}", bootstrapServers, e.getMessage(), e);
349352
}
350353
}
351354
}

superstream-clients/src/main/java/ai/superstream/core/MetadataConsumer.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
public class MetadataConsumer {
2323
private static final SuperstreamLogger logger = SuperstreamLogger.getLogger(MetadataConsumer.class);
2424
private static final String METADATA_TOPIC = "superstream.metadata_v1";
25-
private static final ObjectMapper objectMapper = new ObjectMapper();
25+
private static final ObjectMapper objectMapper = new ObjectMapper()
26+
.configure(com.fasterxml.jackson.databind.SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
27+
.configure(com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
2628

2729
/**
2830
* Get the metadata message from the Kafka cluster.
@@ -97,10 +99,10 @@ public MetadataMessage getMetadataMessage(String bootstrapServers, Properties or
9799
String json = records.iterator().next().value();
98100
return objectMapper.readValue(json, MetadataMessage.class);
99101
} catch (IOException e) {
100-
logger.error("[ERR-027] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please contact the Superstream team if the issue persists.", e);
102+
logger.error("[ERR-027] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please contact the Superstream team if the issue persists: {}", e.getMessage(), e);
101103
return null;
102104
} catch (Exception e) {
103-
logger.error("[ERR-028] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please make sure the Kafka user has read/write/describe permissions on superstream.* topics.", e);
105+
logger.error("[ERR-028] Unable to retrieve optimizations data from Superstream. This is required for optimization. Please make sure the Kafka user has read/write/describe permissions on superstream.* topics: {}", e.getMessage(), e);
104106
return null;
105107
}
106108
}

superstream-clients/src/main/java/ai/superstream/core/SuperstreamManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ public boolean optimizeProducer(String bootstrapServers, String clientId, Proper
199199
}
200200
return true;
201201
} catch (Exception e) {
202-
logger.error("[ERR-030] Failed to optimize producer configuration", e);
202+
logger.error("[ERR-030] Failed to optimize producer configuration: {}", e.getMessage(), e);
203203
return false;
204204
} finally {
205205
// Always clear the flag when done
@@ -323,7 +323,7 @@ public void reportClientInformation(String bootstrapServers, Properties original
323323
logger.error("[ERR-032] Failed to report client information to the superstream.clients topic");
324324
}
325325
} catch (Exception e) {
326-
logger.error("[ERR-031] Error reporting client information", e);
326+
logger.error("[ERR-031] Error reporting client information: {}", e.getMessage(), e);
327327
}
328328
}
329329
}

superstream-clients/src/main/java/ai/superstream/util/NetworkUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public static String getLocalIpAddress() {
5252
cachedIpAddress = localHost.getHostAddress();
5353
return cachedIpAddress;
5454
} catch (SocketException | UnknownHostException e) {
55-
logger.error("[ERR-033] Failed to determine local IP address", e);
55+
logger.error("[ERR-033] Failed to determine local IP address: {}", e.getMessage(), e);
5656
return "";
5757
}
5858
}
@@ -72,7 +72,7 @@ public static String getHostname() {
7272
cachedHostname = localHost.getHostName();
7373
return cachedHostname;
7474
} catch (UnknownHostException e) {
75-
logger.error("[ERR-091] Failed to determine local hostname", e);
75+
logger.error("[ERR-091] Failed to determine local hostname: {}", e.getMessage(), e);
7676
return "";
7777
}
7878
}

superstream-clients/src/main/java/ai/superstream/util/SuperstreamLogger.java

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,26 +65,24 @@ public void warn(String message, Object... args) {
6565
System.out.println(formatLogMessage("WARN", formatArgs(message, args)));
6666
}
6767

68-
/**
69-
* Log an error message.
70-
*/
71-
public void error(String message) {
72-
System.err.println(formatLogMessage("ERROR", message));
73-
}
74-
7568
/**
7669
* Log an error message with parameters.
7770
*/
7871
public void error(String message, Object... args) {
79-
System.err.println(formatLogMessage("ERROR", formatArgs(message, args)));
80-
}
81-
82-
/**
83-
* Log an error message with an exception.
84-
*/
85-
public void error(String message, Throwable throwable) {
86-
String formattedMessage = formatExceptionMessage(message, throwable);
87-
System.err.println(formatLogMessage("ERROR", formattedMessage));
72+
if (args != null && args.length > 0 && args[args.length - 1] instanceof Throwable) {
73+
// If the last argument is a Throwable, format it properly
74+
Throwable throwable = (Throwable) args[args.length - 1];
75+
// Remove the Throwable from args array
76+
Object[] messageArgs = new Object[args.length - 1];
77+
System.arraycopy(args, 0, messageArgs, 0, args.length - 1);
78+
// Format the message with the remaining args
79+
String formattedMessage = formatArgs(message, messageArgs);
80+
// Format the exception message
81+
String formattedExceptionMessage = formatExceptionMessage(formattedMessage, throwable);
82+
System.err.println(formatLogMessage("ERROR", formattedExceptionMessage));
83+
} else {
84+
System.err.println(formatLogMessage("ERROR", formatArgs(message, args)));
85+
}
8886
}
8987

9088
/**

0 commit comments

Comments
 (0)