Skip to content

Commit 680dd6f

Browse files
add immidiate stats report after producer creation
1 parent c9f73bf commit 680dd6f

File tree

5 files changed

+23
-8
lines changed

5 files changed

+23
-8
lines changed

.idea/runConfigurations/kafka_clients_example.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

superstream-clients/dependency-reduced-pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<groupId>ai.superstream</groupId>
55
<artifactId>superstream-clients</artifactId>
66
<name>Superstream Kafka Client Optimizer</name>
7-
<version>1.0.202</version>
7+
<version>1.0.203</version>
88
<description>A Java library that dynamically optimizes Kafka client configuration based on recommendations</description>
99
<url>https://github.com/superstreamlabs/superstream-clients-java</url>
1010
<developers>

superstream-clients/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>ai.superstream</groupId>
88
<artifactId>superstream-clients</artifactId>
9-
<version>1.0.202</version>
9+
<version>1.0.203</version>
1010
<packaging>jar</packaging>
1111

1212
<name>Superstream Kafka Client Optimizer</name>

superstream-clients/src/main/java/ai/superstream/agent/KafkaProducerInterceptor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,13 @@ public static void onExit(@Advice.This Object producer) {
345345
reporter.setConfigurations(completeConfig, new java.util.HashMap<>());
346346
}
347347

348+
// Trigger immediate metrics collection for this producer
349+
try {
350+
sharedCollector.collectMetricsForProducer(producerId, metricsInfo);
351+
} catch (Exception e) {
352+
logger.error("[ERR-047] Failed to collect immediate metrics for new producer {}: {}", producerId, e.getMessage(), e);
353+
}
354+
348355
logger.debug("Producer {} registered with shared metrics collector", producerId);
349356
}
350357
} catch (Exception e) {

superstream-clients/src/main/java/ai/superstream/core/ClientStatsReporter.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,8 @@ void drainInto(Producer<String, String> producer) {
160160
message.setOriginalConfiguration(originalConfig != null ? originalConfig : new java.util.HashMap<>());
161161
message.setOptimizedConfiguration(optimizedConfig != null ? optimizedConfig : new java.util.HashMap<>());
162162

163-
// Attach topics list
164-
if (!topicsWritten.isEmpty()) {
165-
message.setTopics(new java.util.ArrayList<>(topicsWritten));
166-
}
163+
// Attach topics list - always set it, empty if no topics
164+
message.setTopics(new java.util.ArrayList<>(topicsWritten));
167165

168166
// When building the ClientStatsMessage, set the most impactful topic if available
169167
if (mostImpactfulTopic != null) {
@@ -178,7 +176,6 @@ void drainInto(Producer<String, String> producer) {
178176
ProducerRecord<String, String> record = new ProducerRecord<>(CLIENTS_TOPIC, json);
179177
producer.send(record);
180178

181-
// Log at INFO level that stats have been sent for this producer
182179
logger.debug("Producer {} stats sent: before={} bytes, after={} bytes",
183180
clientId, totalBytesBefore, totalBytesAfter);
184181
} catch (Exception e) {
@@ -308,6 +305,17 @@ private static class ClusterStatsCoordinator {
308305

309306
void addReporter(ClientStatsReporter r) {
310307
reporters.add(r);
308+
// Schedule immediate run for this specific reporter
309+
scheduler.schedule(() -> {
310+
try (Producer<String, String> producer = new KafkaProducer<>(baseProps)) {
311+
r.drainInto(producer);
312+
producer.flush();
313+
} catch (Exception e) {
314+
logger.error("[ERR-046] Failed to send immediate stats for new reporter: {}", e.getMessage(), e);
315+
}
316+
}, 0, TimeUnit.MILLISECONDS);
317+
318+
// Only schedule the periodic task if not already scheduled
311319
if (scheduled.compareAndSet(false, true)) {
312320
scheduler.scheduleAtFixedRate(this::run, reportIntervalMs, reportIntervalMs, TimeUnit.MILLISECONDS);
313321
}

0 commit comments

Comments
 (0)