Skip to content

Commit de080dc

Browse files
Merge pull request #8 from superstreamlabs/master
release
2 parents 33d6afc + bcc6077 commit de080dc

File tree

8 files changed

+994
-83
lines changed

8 files changed

+994
-83
lines changed

Jenkinsfile

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,19 @@ pipeline {
1616
}
1717

1818
stages {
19+
1920
stage('Prepare Environment') {
21+
when {
22+
anyOf {
23+
allOf {
24+
branch 'master'
25+
triggeredBy 'UserIdCause' // Manual trigger on master
26+
}
27+
allOf {
28+
branch 'latest'
29+
}
30+
}
31+
}
2032
steps {
2133
script {
2234
sh 'git config --global --add safe.directory $(pwd)'
@@ -41,8 +53,11 @@ pipeline {
4153

4254
stage('Beta Release') {
4355
when {
44-
branch 'master'
45-
}
56+
allOf {
57+
branch 'master'
58+
triggeredBy 'UserIdCause' // Manual "Build Now"
59+
}
60+
}
4661
steps {
4762
sh '''
4863
sed -i -E 's/^(name *= *")superstream-clients(")/\\1superstream-clients-beta\\2/' pyproject.toml
@@ -106,7 +121,7 @@ pipeline {
106121
}
107122
withCredentials([string(credentialsId: 'gh_token', variable: 'GH_TOKEN')]) {
108123
sh """
109-
gh release create $versionTag dist/superstream_confluent_kafka-${env.versionTag}.tar.gz --generate-notes
124+
gh release create $versionTag dist/superstream_clients-${env.versionTag}.tar.gz --generate-notes
110125
"""
111126
}
112127
}

README.md

Lines changed: 1 addition & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ Superstream Clients works as a Python import hook that intercepts Kafka producer
1515

1616
## Supported Libraries
1717

18-
Works with any Python library that implements Kafka producers, including:
19-
2018
- kafka-python
2119
- aiokafka
2220
- confluent-kafka
@@ -31,51 +29,6 @@ Works with any Python library that implements Kafka producers, including:
3129
- **Dynamic configuration**: Applies optimized settings based on topic-specific recommendations
3230
- **Intelligent optimization**: Identifies the most impactful topics to optimize
3331
- **Graceful fallback**: Falls back to default settings if optimization fails
34-
- **Minimal overhead**: Uses a single lightweight background thread (or async coroutine for aiokafka)
35-
36-
## Important: Producer Configuration Requirements
37-
38-
When initializing your Kafka producers, please ensure you pass the configuration as a mutable object. The Superstream library needs to modify the producer configuration to apply optimizations. The following initialization patterns are supported:
39-
40-
**Supported (Recommended)**:
41-
```python
42-
# Using kafka-python
43-
from kafka import KafkaProducer
44-
producer = KafkaProducer(
45-
bootstrap_servers=['localhost:9092'],
46-
compression_type='snappy',
47-
batch_size=16384
48-
)
49-
50-
# Using aiokafka
51-
from aiokafka import AIOKafkaProducer
52-
producer = AIOKafkaProducer(
53-
bootstrap_servers='localhost:9092',
54-
compression_type='snappy',
55-
batch_size=16384
56-
)
57-
58-
# Using confluent-kafka
59-
from confluent_kafka import Producer
60-
producer = Producer({
61-
'bootstrap.servers': 'localhost:9092',
62-
'compression.type': 'snappy',
63-
'batch.size': 16384
64-
})
65-
```
66-
67-
**Not Supported**:
68-
```python
69-
# Using frozen dictionaries or immutable configurations
70-
from types import MappingProxyType
71-
config = MappingProxyType({
72-
'bootstrap.servers': 'localhost:9092'
73-
})
74-
producer = KafkaProducer(**config)
75-
```
76-
77-
### Why This Matters
78-
The Superstream library needs to modify your producer's configuration to apply optimizations based on your cluster's characteristics. This includes adjusting settings like compression, batch size, and other performance parameters. When the configuration is immutable, these optimizations cannot be applied.
7932

8033
## Installation
8134

@@ -87,24 +40,7 @@ That's it! Superclient will now automatically load and optimize all Kafka produc
8740

8841
## Usage
8942

90-
After installation, superclient works automatically. Just use your Kafka clients as usual:
91-
92-
```python
93-
# kafka-python
94-
from kafka import KafkaProducer
95-
producer = KafkaProducer(bootstrap_servers='localhost:9092')
96-
# Automatically optimized!
97-
98-
# confluent-kafka
99-
from confluent_kafka import Producer
100-
producer = Producer({'bootstrap.servers': 'localhost:9092'})
101-
# Automatically optimized!
102-
103-
# aiokafka
104-
from aiokafka import AIOKafkaProducer
105-
producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
106-
# Automatically optimized!
107-
```
43+
After installation, superclient works automatically. Just use your Kafka clients as usual.
10844

10945
### Docker Integration
11046

examples/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
kafka-python==2.2.14
2-
confluent-kafka==2.3.0
2+
confluent-kafka==2.11.0
33
aiokafka==0.10.0
44
aws-msk-iam-sasl-signer-python==1.0.2

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "superstream-clients"
7-
version = "0.1.5"
7+
version = "1.0.0"
88
description = "Superstream optimisation library for Kafka producers"
99
authors = [{name = "Superstream Labs", email = "[email protected]"}]
1010
license = "Apache-2.0"

superclient/agent/interceptor.py

Lines changed: 113 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
"""Producer interception functionality."""
22

33
import os
4+
import uuid
45
from typing import Any, Dict
6+
import importlib
57

68
from ..util.logger import get_logger
79
from ..util.config import get_topics_list, is_disabled
10+
from ..util.metrics import configure_confluent_stats_callback
811
from .metadata import fetch_metadata_sync, optimal_cfg, _DEFAULTS
912
from ..core.reporter import send_clients_msg
1013
from ..core.manager import normalize_bootstrap
@@ -243,8 +246,102 @@ async def stop_patch(*a, **kw):
243246
await original_stop(*a, **kw)
244247

245248
self.stop = stop_patch
249+
250+
# Patch the SendProduceReqHandler.create_request method for this specific producer
251+
sender_mod = importlib.import_module("aiokafka.producer.sender")
252+
253+
# Only patch once globally
254+
if not hasattr(sender_mod.SendProduceReqHandler, '_superstream_patched'):
255+
orig_create_request = sender_mod.SendProduceReqHandler.create_request
256+
257+
def create_request_with_metrics(self_handler):
258+
# Call the original method to get the request, but collect metrics
259+
# self_handler._batches: Dict[TopicPartition, MessageBatch]
260+
261+
# Quick check: if sender has no tracker, it's an internal producer - skip metrics
262+
if not hasattr(self_handler._sender, '_superstream_tracker'):
263+
return orig_create_request(self_handler)
264+
265+
tracker = self_handler._sender._superstream_tracker
266+
267+
# Additional check: skip internal producers by client_id
268+
if tracker is None or tracker.client_id.startswith(_SUPERLIB_PREFIX):
269+
return orig_create_request(self_handler)
270+
271+
# Per-producer totals
272+
total_uncompressed = 0
273+
total_compressed = 0
274+
total_records = 0
275+
topic_stats = {}
276+
for tp, batch in self_handler._batches.items():
277+
# Get record count from the batch
278+
record_count = batch.record_count
279+
280+
# Get compressed size from the batch buffer
281+
compressed = 0
282+
try:
283+
compressed = len(batch.get_data_buffer())
284+
except Exception:
285+
pass
286+
287+
# Estimate uncompressed size based on record count
288+
# Since we can't easily access the original message data at this point,
289+
# we'll use a reasonable estimate based on the batch size and record count
290+
if record_count > 0:
291+
# Estimate uncompressed size based on compressed size and typical compression ratios
292+
# This is an approximation since we can't access the original message data
293+
estimated_compression_ratio = 0.7 # Assume 30% compression
294+
uncompressed = int(compressed / estimated_compression_ratio)
295+
else:
296+
uncompressed = 0
297+
298+
total_uncompressed += uncompressed
299+
total_compressed += compressed
300+
total_records += record_count
301+
# Per-topic
302+
if tp.topic not in topic_stats:
303+
topic_stats[tp.topic] = {'uncompressed': 0, 'compressed': 0, 'records': 0}
304+
topic_stats[tp.topic]['uncompressed'] += uncompressed
305+
topic_stats[tp.topic]['compressed'] += compressed
306+
topic_stats[tp.topic]['records'] += record_count
307+
# Update tracker
308+
if total_records > 0:
309+
tracker._superstream_metrics = getattr(tracker, '_superstream_metrics', {})
310+
m = tracker._superstream_metrics
311+
# Accumulate totals (aggregative counters)
312+
m['outgoing-byte-total'] = m.get('outgoing-byte-total', 0) + total_compressed
313+
m['record-send-total'] = m.get('record-send-total', 0) + total_records
314+
m['uncompressed-byte-total'] = m.get('uncompressed-byte-total', 0) + total_uncompressed
315+
316+
# Calculate rates from aggregated totals
317+
m['compression-rate-avg'] = (m['outgoing-byte-total'] / m['uncompressed-byte-total']) if m['uncompressed-byte-total'] else 1.0
318+
m['record-size-avg'] = (m['uncompressed-byte-total'] / m['record-send-total']) if m['record-send-total'] else 0
319+
320+
# Per-topic
321+
m['topics'] = m.get('topics', {})
322+
for topic, stats in topic_stats.items():
323+
t = m['topics'].setdefault(topic, {'byte-total': 0, 'record-send-total': 0, 'uncompressed-total': 0})
324+
# Accumulate totals (aggregative counters)
325+
t['byte-total'] = t.get('byte-total', 0) + stats['compressed']
326+
t['record-send-total'] = t.get('record-send-total', 0) + stats['records']
327+
t['uncompressed-total'] = t.get('uncompressed-total', 0) + stats['uncompressed']
328+
329+
# Calculate compression rate from aggregated totals
330+
t['compression-rate'] = (t['byte-total'] / t['uncompressed-total']) if t['uncompressed-total'] else 1.0
331+
332+
tracker._superstream_metrics = m
333+
return orig_create_request(self_handler)
334+
335+
sender_mod.SendProduceReqHandler.create_request = create_request_with_metrics
336+
sender_mod.SendProduceReqHandler._superstream_patched = True
337+
246338
self._superstream_patch = True
247339
orig_init(self, *args, **kwargs)
340+
341+
# Store tracker reference in the sender for metrics collection
342+
if hasattr(self, '_sender'):
343+
self._sender._superstream_tracker = tr
344+
248345
send_clients_msg(tr, error_msg)
249346

250347
# Log success message based on whether defaults were used
@@ -345,10 +442,18 @@ def __init__(self, conf: Dict[str, Any], *args, **kwargs):
345442
logger.debug("Overriding configuration: {} ((not set) -> {})", k, v)
346443
conf[k] = v
347444

445+
446+
# Generate UUID for this producer
447+
tracker_uuid = str(uuid.uuid4())
448+
449+
# Configure stats callback for metrics collection
450+
conf = configure_confluent_stats_callback(conf, tracker_uuid)
451+
348452
# Create the producer with optimized configuration
349453
self._producer = Producer(conf, *args, **kwargs)
350454

351455
report_interval = metadata.get("report_interval_ms") if metadata else _DEFAULT_REPORT_INTERVAL_MS
456+
# Create tracker with the generated UUID
352457
self._tracker = ProducerTracker(
353458
lib="confluent",
354459
producer=self._producer,
@@ -357,10 +462,12 @@ def __init__(self, conf: Dict[str, Any], *args, **kwargs):
357462
orig_cfg=orig_cfg,
358463
opt_cfg=opt_cfg,
359464
report_interval_ms=int(report_interval or _DEFAULT_REPORT_INTERVAL_MS),
360-
error=error_msg, # Store error message in tracker
465+
error=error_msg,
361466
metadata=metadata,
362467
topics_env=topics_env,
468+
uuid=tracker_uuid, # Use the generated UUID
363469
)
470+
364471
Heartbeat.register_tracker(self._tracker)
365472

366473
send_clients_msg(self._tracker, error_msg)
@@ -397,12 +504,15 @@ def __del__(self):
397504
self._superstream_closed = True
398505
self._tracker.close()
399506
Heartbeat.unregister_tracker(self._tracker.uuid)
507+
508+
# Remove metrics extractor from registry
509+
from ..util.metrics import remove_producer_metrics_extractor
510+
remove_producer_metrics_extractor(self._tracker.uuid)
511+
400512
logger.debug("Superstream tracking stopped for confluent-kafka producer with client_id: {}",
401513
getattr(self._tracker, 'client_id', 'unknown'))
402514
except Exception as e:
403515
logger.error("Error during automatic cleanup: {}", e)
404-
else:
405-
logger.debug("Producer already cleaned up or no tracker found")
406516

407517
def __getattr__(self, name):
408518
"""Delegate all other attributes to the underlying producer."""

0 commit comments

Comments
 (0)