Skip to content

Commit b6f4d5f

Browse files
support optimized/original config and topics as part of stats report
1 parent e26c987 commit b6f4d5f

File tree

5 files changed

+273
-307
lines changed

5 files changed

+273
-307
lines changed

superstream-clients/src/main/java/ai/superstream/agent/KafkaProducerInterceptor.java

Lines changed: 141 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ public class KafkaProducerInterceptor {
4848
public static final ThreadLocal<java.util.Deque<Properties>> TL_PROPS_STACK =
4949
ThreadLocal.withInitial(java.util.ArrayDeque::new);
5050

51+
// ThreadLocal stack to pass original/optimized configuration maps from optimization phase to reporter creation.
52+
public static final ThreadLocal<java.util.Deque<ConfigInfo>> TL_CFG_STACK =
53+
ThreadLocal.withInitial(java.util.ArrayDeque::new);
54+
5155
// Static initializer to start the shared collector if enabled
5256
static {
5357
if (!DISABLED) {
@@ -211,6 +215,16 @@ public static void onExit(@Advice.This Object producer) {
211215
producerMetricsMap.put(producerId, metricsInfo);
212216
clientStatsReporters.put(producerId, reporter);
213217

218+
// Pop configuration info from ThreadLocal stack (if any) and attach to reporter
219+
java.util.Deque<ConfigInfo> cfgStack = TL_CFG_STACK.get();
220+
ConfigInfo cfgInfo = cfgStack.isEmpty()? null : cfgStack.pop();
221+
if (cfgStack.isEmpty()) {
222+
TL_CFG_STACK.remove();
223+
}
224+
if (cfgInfo != null) {
225+
reporter.setConfigurations(cfgInfo.originalConfig, cfgInfo.optimizedConfig);
226+
}
227+
214228
logger.debug("Producer {} registered with shared metrics collector", producerId);
215229
}
216230
} catch (Exception e) {
@@ -662,6 +676,89 @@ public boolean collectMetricsForProducer(String producerId, ProducerMetricsInfo
662676
info.updateLastStats(
663677
new CompressionStats(totalOutgoingBytes, prevStats.uncompressedBytes + uncompressedBytes));
664678

679+
// Extract a snapshot of all producer metrics to include in stats reporting
680+
java.util.Map<String, Double> allMetricsSnapshot = new java.util.HashMap<>();
681+
try {
682+
java.util.Map<?, ?> rawMetricsMap = extractMetricsMap(metrics);
683+
if (rawMetricsMap != null) {
684+
for (java.util.Map.Entry<?, ?> mEntry : rawMetricsMap.entrySet()) {
685+
Object mKey = mEntry.getKey();
686+
String group = null;
687+
String namePart;
688+
String keyString = null;
689+
if (mKey == null) continue;
690+
691+
if (mKey.getClass().getName().endsWith("MetricName")) {
692+
try {
693+
java.lang.reflect.Method nameMethod = findMethod(mKey.getClass(), "name");
694+
java.lang.reflect.Method groupMethod = findMethod(mKey.getClass(), "group");
695+
namePart = (nameMethod != null) ? nameMethod.invoke(mKey).toString() : mKey.toString();
696+
group = (groupMethod != null) ? groupMethod.invoke(mKey).toString() : "";
697+
if (!"producer-metrics".equals(group)) {
698+
continue; // skip non-producer groups
699+
}
700+
keyString = namePart; // store without the producer-metrics prefix
701+
} catch (Exception ignored) {}
702+
} else if (mKey instanceof String) {
703+
keyString = mKey.toString();
704+
if (!keyString.startsWith("producer-metrics")) {
705+
continue; // skip
706+
}
707+
// strip the prefix (and the following dot if present)
708+
if (keyString.startsWith("producer-metrics.")) {
709+
keyString = keyString.substring("producer-metrics.".length());
710+
} else if ("producer-metrics".equals(keyString)) {
711+
continue; // unlikely but skip bare prefix
712+
}
713+
}
714+
if (keyString == null) continue;
715+
double mVal = extractMetricValue(mEntry.getValue());
716+
if (!Double.isNaN(mVal)) {
717+
allMetricsSnapshot.put(keyString, mVal);
718+
}
719+
}
720+
}
721+
} catch (Exception snapshotEx) {
722+
// ignore snapshot errors
723+
}
724+
725+
// Update reporter with latest metrics snapshot
726+
reporter.updateProducerMetrics(allMetricsSnapshot);
727+
728+
// Aggregate topics written by this producer from producer-topic-metrics
729+
java.util.Set<String> newTopics = new java.util.HashSet<>();
730+
try {
731+
java.util.Map<?,?> rawMapForTopics = extractMetricsMap(metrics);
732+
if (rawMapForTopics != null) {
733+
for (java.util.Map.Entry<?,?> me : rawMapForTopics.entrySet()) {
734+
Object k = me.getKey();
735+
if (k == null) continue;
736+
if (k.getClass().getName().endsWith("MetricName")) {
737+
try {
738+
java.lang.reflect.Method groupMethod = findMethod(k.getClass(), "group");
739+
java.lang.reflect.Method tagsMethod = findMethod(k.getClass(), "tags");
740+
if (groupMethod != null && tagsMethod != null) {
741+
groupMethod.setAccessible(true);
742+
String g = groupMethod.invoke(k).toString();
743+
if ("producer-topic-metrics".equals(g)) {
744+
tagsMethod.setAccessible(true);
745+
Object tagObj = tagsMethod.invoke(k);
746+
if (tagObj instanceof java.util.Map) {
747+
Object topicObj = ((java.util.Map<?,?>)tagObj).get("topic");
748+
if (topicObj != null) newTopics.add(topicObj.toString());
749+
}
750+
}
751+
}
752+
} catch (Exception ignore) {}
753+
}
754+
}
755+
}
756+
} catch (Exception ignore) {}
757+
758+
if (!newTopics.isEmpty()) {
759+
reporter.addTopics(newTopics);
760+
}
761+
665762
// Report the compression statistics for this interval (delta)
666763
reporter.recordBatch(uncompressedBytes, compressedBytes);
667764

@@ -704,11 +801,11 @@ public double getCompressionRatio(Object metrics) {
704801
* Find direct compression metrics in the metrics map.
705802
*/
706803
private double findDirectCompressionMetric(Map<?, ?> metricsMap) {
707-
// Look for compression metrics directly in the map
804+
// Look for compression metrics in the *producer-metrics* group only
708805
for (Map.Entry<?, ?> entry : metricsMap.entrySet()) {
709806
Object key = entry.getKey();
710807

711-
// Handle keys that are MetricName objects
808+
// Handle MetricName keys
712809
if (key.getClass().getName().endsWith("MetricName")) {
713810
try {
714811
Method nameMethod = findMethod(key.getClass(), "name");
@@ -721,33 +818,28 @@ private double findDirectCompressionMetric(Map<?, ?> metricsMap) {
721818
String name = nameMethod.invoke(key).toString();
722819
String group = groupMethod.invoke(key).toString();
723820

724-
// Check for common compression metrics
725-
if ((group.equals("producer-metrics") || group.equals("producer-topic-metrics")) &&
726-
(name.equals("compression-rate-avg") || name.equals("record-compression-rate") ||
727-
name.equals("compression-ratio"))) {
821+
// Only accept metrics from producer-metrics group
822+
if (group.equals("producer-metrics") &&
823+
(name.equals("compression-rate-avg") || name.equals("compression-ratio"))) {
728824

729-
logger.debug("Found compression metric: {}.{}", group, name);
730825
double value = extractMetricValue(entry.getValue());
731826
if (value > 0) {
732-
logger.debug("Compression ratio value: {}", value);
827+
logger.debug("Found producer-metrics compression metric: {} -> {}", name, value);
733828
return value;
734829
}
735830
}
736831
}
737-
} catch (Exception e) {
738-
// Ignore and continue checking other keys
832+
} catch (Exception ignored) {
739833
}
740834
}
741835
// Handle String keys
742836
else if (key instanceof String) {
743837
String keyStr = (String) key;
744-
if ((keyStr.contains("producer-metrics") || keyStr.contains("producer-topic-metrics")) &&
745-
(keyStr.contains("compression-rate") || keyStr.contains("compression-ratio"))) {
746-
747-
logger.debug("Found compression metric with string key: {}", keyStr);
838+
if (keyStr.startsWith("producer-metrics") &&
839+
(keyStr.contains("compression-rate-avg") || keyStr.contains("compression-ratio"))) {
748840
double value = extractMetricValue(entry.getValue());
749841
if (value > 0) {
750-
logger.debug("Compression ratio value: {}", value);
842+
logger.debug("Found producer-metrics compression metric (string key): {} -> {}", keyStr, value);
751843
return value;
752844
}
753845
}
@@ -757,101 +849,57 @@ else if (key instanceof String) {
757849
}
758850

759851
/**
760-
* Get the total outgoing bytes across all nodes from the metrics object.
761-
* This metric exists per broker node and represents bytes after compression.
852+
* Get the total outgoing bytes for the *producer* (after compression).
853+
* Uses producer-metrics group only to keep numbers per-producer rather than per-node.
762854
*/
763855
private long getOutgoingBytesTotal(Object metrics) {
764856
try {
765-
// Extract the metrics map from the Metrics object
766857
Map<?, ?> metricsMap = extractMetricsMap(metrics);
767858
if (metricsMap != null) {
768-
// The outgoing-byte-total is in the producer-node-metrics group
769-
String targetGroup = "producer-node-metrics";
770-
String targetMetric = "outgoing-byte-total";
771-
long totalBytes = 0;
772-
boolean foundAnyNodeMetric = false;
859+
String targetGroup = "producer-metrics";
860+
String[] candidateNames = {"outgoing-byte-total", "byte-total"};
773861

774-
// Iterate through all metrics
775862
for (Map.Entry<?, ?> entry : metricsMap.entrySet()) {
776863
Object key = entry.getKey();
777864

778-
// Handle MetricName objects
865+
// MetricName keys
779866
if (key.getClass().getName().endsWith("MetricName")) {
780867
try {
781868
Method nameMethod = findMethod(key.getClass(), "name");
782869
Method groupMethod = findMethod(key.getClass(), "group");
783-
Method tagsMethod = findMethod(key.getClass(), "tags");
784-
785870
if (nameMethod != null && groupMethod != null) {
786871
nameMethod.setAccessible(true);
787872
groupMethod.setAccessible(true);
788-
789873
String name = nameMethod.invoke(key).toString();
790874
String group = groupMethod.invoke(key).toString();
791875

792-
// If this is a node metric with outgoing bytes
793-
if (group.equals(targetGroup) && name.equals(targetMetric)) {
794-
foundAnyNodeMetric = true;
795-
796-
double value = extractMetricValue(entry.getValue());
797-
798-
// Get the node-id from tags if possible
799-
String nodeId = "unknown";
800-
if (tagsMethod != null) {
801-
tagsMethod.setAccessible(true);
802-
Object tags = tagsMethod.invoke(key);
803-
if (tags instanceof Map) {
804-
Object nodeIdObj = ((Map<?, ?>) tags).get("node-id");
805-
if (nodeIdObj != null) {
806-
nodeId = nodeIdObj.toString();
876+
if (group.equals(targetGroup)) {
877+
for (String n : candidateNames) {
878+
if (n.equals(name)) {
879+
double val = extractMetricValue(entry.getValue());
880+
if (val > 0) {
881+
logger.debug("Found producer-metrics {} = {}", name, val);
882+
return (long) val;
807883
}
808884
}
809885
}
810-
811-
logger.debug("Found outgoing bytes for node {}: {}", nodeId, value);
812-
totalBytes += (long) value;
813-
}
814-
815-
// Fall back to producer-metrics.byte-total if needed
816-
if (!foundAnyNodeMetric && group.equals("producer-metrics") &&
817-
(name.equals("byte-total") || name.equals("outgoing-byte-total"))) {
818-
double value = extractMetricValue(entry.getValue());
819-
logger.debug("Found fallback byte metric: {}={}", name, value);
820-
// Save this value but keep looking for node-specific metrics
821-
if (totalBytes == 0) {
822-
totalBytes = (long) value;
823-
}
824886
}
825887
}
826-
} catch (Exception e) {
827-
logger.debug("Error extracting metrics: {}", e.getMessage());
828-
}
829-
}
830-
// Handle String keys
831-
else if (key instanceof String) {
888+
} catch (Exception ignored) {}
889+
} else if (key instanceof String) {
832890
String keyStr = (String) key;
833-
if (keyStr.contains(targetGroup) && keyStr.contains(targetMetric)) {
834-
foundAnyNodeMetric = true;
835-
double value = extractMetricValue(entry.getValue());
836-
logger.debug("Found outgoing bytes with string key {}: {}", keyStr, value);
837-
totalBytes += (long) value;
891+
if (keyStr.startsWith(targetGroup) && (keyStr.contains("outgoing-byte-total") || keyStr.contains("byte-total"))) {
892+
double val = extractMetricValue(entry.getValue());
893+
if (val > 0) {
894+
logger.debug("Found producer-metrics byte counter (string key) {} = {}", keyStr, val);
895+
return (long) val;
896+
}
838897
}
839898
}
840899
}
841-
842-
if (totalBytes > 0) {
843-
if (foundAnyNodeMetric) {
844-
logger.debug("Total outgoing bytes across all nodes: {}", totalBytes);
845-
} else {
846-
logger.debug("Using fallback byte metric total: {}", totalBytes);
847-
}
848-
return totalBytes;
849-
}
850-
} else {
851-
logger.debug("Could not extract metrics map from: {}", metrics.getClass().getName());
852900
}
853901
} catch (Exception e) {
854-
logger.debug("Error getting outgoing bytes total: {}", e.getMessage(), e);
902+
logger.debug("Error getting outgoing bytes total from producer-metrics: {}", e.getMessage());
855903
}
856904

857905
return 0;
@@ -942,4 +990,17 @@ public CompressionStats(long compressedBytes, long uncompressedBytes) {
942990
this.uncompressedBytes = uncompressedBytes;
943991
}
944992
}
993+
994+
/**
995+
* Holder for original and optimized configuration maps passed between optimization
996+
* phase and stats reporter creation using ThreadLocal.
997+
*/
998+
public static class ConfigInfo {
999+
public final java.util.Map<String,Object> originalConfig;
1000+
public final java.util.Map<String,Object> optimizedConfig;
1001+
public ConfigInfo(java.util.Map<String,Object> orig, java.util.Map<String,Object> opt) {
1002+
this.originalConfig = orig;
1003+
this.optimizedConfig = opt;
1004+
}
1005+
}
9451006
}

superstream-clients/src/main/java/ai/superstream/core/ClientStatsCollector.java

Lines changed: 0 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -25,47 +25,6 @@ public void recordBatch(long uncompressedSize, long compressedSize) {
2525
totalBytesAfterCompression.addAndGet(compressedSize);
2626
}
2727

28-
/**
29-
* Gets the current compression ratio.
30-
* A value of 1.0 means no compression, while smaller values indicate better compression.
31-
* For example, 0.5 means data was compressed to half its original size.
32-
*
33-
* @return The compression ratio (compressed size / uncompressed size)
34-
*/
35-
public double getCompressionRatio() {
36-
long uncompressedSize = totalBytesBeforeCompression.get();
37-
if (uncompressedSize == 0) {
38-
return 1.0; // No data recorded yet, return 1.0 (no compression)
39-
}
40-
return (double) totalBytesAfterCompression.get() / uncompressedSize;
41-
}
42-
43-
/**
44-
* Gets the total bytes written before compression since the last reset.
45-
*
46-
* @return Total bytes before compression
47-
*/
48-
public long getTotalBytesBeforeCompression() {
49-
return totalBytesBeforeCompression.get();
50-
}
51-
52-
/**
53-
* Gets the total bytes written after compression since the last reset.
54-
*
55-
* @return Total bytes after compression
56-
*/
57-
public long getTotalBytesAfterCompression() {
58-
return totalBytesAfterCompression.get();
59-
}
60-
61-
/**
62-
* Resets all statistics counters after they've been reported.
63-
*/
64-
public void reset() {
65-
totalBytesBeforeCompression.set(0);
66-
totalBytesAfterCompression.set(0);
67-
}
68-
6928
/**
7029
* Captures current statistics and resets counters atomically.
7130
* This prevents race conditions between reading values and resetting them.

0 commit comments

Comments
 (0)