Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
5266c79
Add async producer with metrics
k-raina Sep 9, 2025
d03a13c
Add producer strategy
k-raina Sep 9, 2025
c4306ea
Fix decorator
k-raina Sep 9, 2025
91998de
Try await in AsyncStrategy Produce.produce call
k-raina Sep 10, 2025
469a6b4
Rearchetect AIOProducer
k-raina Sep 10, 2025
6e9d67e
Pass producer config to strategy
k-raina Sep 11, 2025
63bfdd7
Pass polling interval
k-raina Sep 11, 2025
77315ec
Address feedback
k-raina Sep 11, 2025
762ebaa
Address feedback
k-raina Sep 11, 2025
babeabc
Temp commit to check latency of adding function to threadpool
k-raina Sep 12, 2025
be25ab4
Add produce batch api to producer
k-raina Sep 15, 2025
3aa9656
Implement AIO with produce_batch
k-raina Sep 15, 2025
1a1b26f
Add buffer timeout code
k-raina Sep 18, 2025
8e1f2c4
Reorganise code
k-raina Sep 18, 2025
ce534cf
Add capability of sync and async callbacks
k-raina Sep 18, 2025
287026d
Add unit tests
k-raina Sep 18, 2025
7ac679c
Add code comments for better understanding
k-raina Sep 18, 2025
ffadbaa
Add per message callback and callback pool
k-raina Sep 18, 2025
b066a15
Move batch to Producerbatchprocessor
k-raina Sep 18, 2025
80a0696
Minor
k-raina Sep 18, 2025
efcf6d4
Per message callbacks should be called for failed messages in batch
k-raina Sep 19, 2025
9014db9
refactor: extract callback handling into separate AsyncCallbackHandle…
k-raina Sep 19, 2025
1c36d3d
refactor: inject AsyncCallbackHandler into CallbackPool for better in…
k-raina Sep 19, 2025
35edc28
refactor: extract KafkaBatchExecutor for dedicated Kafka operations
k-raina Sep 19, 2025
fd772b4
refactor: eliminate AIOProducer dependency from BatchProcessor for tr…
k-raina Sep 19, 2025
215379d
refactor: complete Steps 5-7 - BufferManager, MessageBatch, and clean…
k-raina Sep 19, 2025
fe0df1c
refactor: rename BufferManager to BufferTimeoutManager for clarity
k-raina Sep 19, 2025
c44fbe8
refactor: organize producer components into dedicated src/aio/produce…
k-raina Sep 19, 2025
1562df2
refactor: merge CallbackPool and AsyncCallbackHandler into unified Ca…
k-raina Sep 19, 2025
2ab6208
Cleanup architecture
k-raina Sep 19, 2025
e91f163
refactor: rename classes for better clarity and consistency
k-raina Sep 19, 2025
e839926
Clean meta files
k-raina Sep 19, 2025
6a236e5
cleanup: remove duplicate test files in tests/aio/producer
k-raina Sep 19, 2025
b779fb8
cleanup: remove unrelated test result files and improve gitignore
k-raina Sep 19, 2025
e295d28
Remove callback from AIOProducer
k-raina Sep 23, 2025
6bdfb16
Implement minor feedback
k-raina Sep 23, 2025
533fdd3
Minor cleanup
k-raina Sep 23, 2025
a40466b
Cleanup tests
k-raina Sep 23, 2025
c63000c
Minor fix
k-raina Sep 24, 2025
950593f
Merge branch 'async' into kraina-add-async-sync-comparison
k-raina Sep 24, 2025
c3f38f3
Remove depreciated tests
k-raina Sep 24, 2025
1087737
Fix flake8
k-raina Sep 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 23 additions & 32 deletions src/confluent_kafka/aio/_AIOProducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,27 @@ def __init__(self, producer_conf, max_workers=1, executor=None):
wrap_common_callbacks(asyncio.get_running_loop(), producer_conf)

self._producer = confluent_kafka.Producer(producer_conf)

# Internal metrics tracking
self._metrics_callback = None
self._send_times = {} # Track send times for latency calculation

def set_metrics_callback(self, callback):
"""Set callback for delivery metrics: callback(latency_ms, topic, partition, success)"""
self._metrics_callback = callback

# ========================================================================
# HYBRID OPERATIONS - Blocking behavior depends on parameters
# ========================================================================

async def poll(self, timeout=0, *args, **kwargs):
"""Processes callbacks - blocking behavior depends on timeout
"""Processes delivery callbacks from librdkafka - blocking behavior depends on timeout

This method triggers any pending delivery callbacks (on_delivery) that have been
queued by librdkafka when messages are delivered or fail to deliver.

Args:
timeout: 0 = non-blocking, >0 = block up to timeout seconds, -1 = block indefinitely
timeout: Timeout in seconds for waiting for callbacks:
- 0 = non-blocking, return immediately after processing available callbacks
- >0 = block up to timeout seconds waiting for new callbacks to arrive
- -1 = block indefinitely until callbacks are available

Returns:
Number of callbacks processed during this call
"""
if timeout > 0:
if timeout > 0 or timeout == -1:
# Blocking call - use ThreadPool to avoid blocking event loop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you say the auto poll was blocking the event loop before this change? the wait is executed in one of the threads of the ThreadPoolExecutor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

He is referencing that the future assignment was getting blocked by poll because it was awaiting on that future instead of returning it. I had suggested trying returning the future without awaiting but I think he was still hitting contention. We should talk through that more

return await self._call(self._producer.poll, timeout, *args, **kwargs)
else:
Expand All @@ -74,36 +75,26 @@ async def produce(self, topic, value=None, key=None, *args, **kwargs):
"""
# Get current running event loop
result = asyncio.get_running_loop().create_future()

# Track send time for latency calculation if metrics are enabled
send_time = None
message_id = None
if self._metrics_callback:
import time
send_time = time.time()
message_id = f"{topic}:{key}:{id(result)}" # Unique message identifier
self._send_times[message_id] = send_time

# Store user's original callback if provided
user_callback = kwargs.get('on_delivery')

# Pre-bind variables to avoid closure overhead
def on_delivery(err, msg):
if err:
# Handle delivery failure
if self._metrics_callback and message_id:
latency_ms = 0.0
if message_id in self._send_times:
latency_ms = (time.time() - self._send_times[message_id]) * 1000
del self._send_times[message_id]
self._metrics_callback(latency_ms, topic, 0, False) # success=False
result.set_exception(_KafkaException(err))
else:
# Handle delivery success
if self._metrics_callback and message_id:
latency_ms = 0.0
if message_id in self._send_times:
latency_ms = (time.time() - self._send_times[message_id]) * 1000
del self._send_times[message_id]
self._metrics_callback(latency_ms, msg.topic(), msg.partition(), True) # success=True
result.set_result(msg)

# Call user's callback on successful delivery
if user_callback:
try:
user_callback(err, msg) # err is None here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd do the check here for if it's a sync or async function with asyncio.iscoroutinefunction(func) and await if it's true. Since the parent context is async it's likely the user may pass a async callback without thinking much about it.

except Exception:
# Log but don't propagate user callback errors to avoid breaking delivery confirmation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't swallow / catch general exceptions. This leads to silent failures and can catch system signals that should terminate. If the user really wants to each eat exceptions they can within the function they pass through.

pass

kwargs['on_delivery'] = on_delivery
self._producer.produce(topic, value, key, *args, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The producer.produce isn't exactly non-blocking, if everything goes well it is, but in rd_kafka_producev there are locks, rd_kafka_wrlock(rk);, rd_kafka_topic_rdlock(rkt);, mtx_lock(&rk->rk_curr_msgs.lock); a custom partitioner (that at the moment cannot be used in Python because of a deadlock to solve in librdkafka), some interceptors rd_kafka_interceptors_on_send, rd_kafka_interceptors_on_acknowledgement that` could have any code and be used later for tracing.

We can see if it's possible to implement rd_kafka_produce_batch in Python so if we have N futures still not completed we accumulate to a list and then when one of them finishes send the list of messages (through the ThreadPoolExecutor)

Copy link
Member

@fangnx fangnx Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think introducing batch-produce at the Python client layer (my understanding is that librdkafka already batches messages internally) can drastically improve the async produce() performance, similar to async consume() w/ batching, as the ThreadPool overhead will be amortized?

n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, as happened in the consume benchmark. Just in the consumer we cannot implement a single message poll with a larger consume call because when we call consume the full batch can be auto stored, we would need to use manual offset management and we don't want to do complex things here, just follow existing API, so we can just tell users to use the consume call for high throughput.
For the produce call instead we can accumulate the calls coming from produce and when finished sending a previous batch we send the next one. This will change a bit the ordering of messages but in the context of concurrent production we aren't ensuring it.

Expand Down
94 changes: 73 additions & 21 deletions tests/ducktape/producer_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,28 @@ def produce_messages(self, topic_name, test_duration, start_time, message_format


class SyncProducerStrategy(ProducerStrategy):
def create_producer(self):
producer = Producer({'bootstrap.servers': self.bootstrap_servers})
def create_producer(self, config_overrides=None):
config = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fun trick, you can do ** expansion to inline update dicts

{'bootstrap.servers': self.bootstrap_servers, **(config_overrides or {})}

'bootstrap.servers': self.bootstrap_servers
}

# Apply any test-specific overrides
if config_overrides:
config.update(config_overrides)

producer = Producer(config)

# Log the configuration for validation
self.logger.info("=== SYNC PRODUCER CONFIGURATION ===")
for key, value in config.items():
self.logger.info(f"{key}: {value}")
self.logger.info("=" * 40)

return producer

def produce_messages(self, topic_name, test_duration, start_time, message_formatter, delivered_container, failed_container=None):
producer = self.create_producer()
config_overrides = getattr(self, 'config_overrides', None)
producer = self.create_producer(config_overrides)
messages_sent = 0
send_times = {} # Track send times for latency calculation

Expand Down Expand Up @@ -78,8 +94,10 @@ def delivery_callback(err, msg):
produce_times.append(time.time() - produce_start)
messages_sent += 1

# Poll every 100 messages to prevent buffer overflow
if messages_sent % 100 == 0:
# Use configured polling interval (default to 50 if not set)
poll_interval = getattr(self, 'poll_interval', 50)

if messages_sent % poll_interval == 0:
poll_start = time.time()
producer.poll(0)
poll_times.append(time.time() - poll_start)
Expand Down Expand Up @@ -112,39 +130,52 @@ def delivery_callback(err, msg):
print("=" * 45)

return messages_sent

def get_final_metrics(self):
"""Return final metrics summary for the sync producer"""
if self.metrics:
return self.metrics
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically not needed since no return defaults to None return but ok to be explicit



class AsyncProducerStrategy(ProducerStrategy):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._producer_instance = None

def create_producer(self):
def create_producer(self, config_overrides=None):
from confluent_kafka.aio import AIOProducer
# Enable logging for AIOProducer
import logging
logging.basicConfig(level=logging.INFO)

self._producer_instance = AIOProducer({'bootstrap.servers': self.bootstrap_servers}, max_workers=20)
config = {
'bootstrap.servers': self.bootstrap_servers
}

# Set up internal metrics callback if metrics are enabled
if self.metrics:
def metrics_callback(latency_ms, topic, partition, success):
if success:
self.metrics.record_delivered(latency_ms, topic=topic, partition=partition)
else:
self.metrics.record_failed(topic=topic, partition=partition)
self._producer_instance.set_metrics_callback(metrics_callback)
# Apply any test-specific overrides
if config_overrides:
config.update(config_overrides)

self._producer_instance = AIOProducer(config, max_workers=20)

# Log the configuration for validation
self.logger.info("=== ASYNC PRODUCER CONFIGURATION ===")
for key, value in config.items():
self.logger.info(f"{key}: {value}")
self.logger.info("=" * 41)

return self._producer_instance


def produce_messages(self, topic_name, test_duration, start_time, message_formatter, delivered_container, failed_container=None):

async def async_produce():
producer = self.create_producer()
config_overrides = getattr(self, 'config_overrides', None)
producer = self.create_producer(config_overrides)
messages_sent = 0
pending_futures = []
send_times = {} # Track send times for latency calculation

# Temporary metrics for timing sections
produce_times = []
Expand All @@ -155,24 +186,39 @@ async def async_produce():
message_value, message_key = message_formatter(messages_sent)

try:
# Record sent message for metrics
# Record sent message for metrics and track send time
if self.metrics:
message_size = len(message_value.encode('utf-8')) + len(message_key.encode('utf-8'))
self.metrics.record_sent(message_size, topic=topic_name, partition=0)
send_times[message_key] = time.time() # Track send time for latency

# Produce message
# Create metrics callback for this message
def create_metrics_callback(msg_key):
def metrics_callback(err, msg):
if self.metrics and not err:
# Calculate latency if we have send time
latency_ms = 0.0
if msg_key in send_times:
latency_ms = (time.time() - send_times[msg_key]) * 1000
del send_times[msg_key]
self.metrics.record_delivered(latency_ms, topic=msg.topic(), partition=msg.partition())
return metrics_callback

# Produce message with metrics callback
produce_start = time.time()
delivery_future = await producer.produce(
topic=topic_name,
value=message_value,
key=message_key
key=message_key,
on_delivery=create_metrics_callback(message_key) if self.metrics else None
)
produce_times.append(time.time() - produce_start)
pending_futures.append((delivery_future, message_key)) # Store delivery future
messages_sent += 1

# Poll every 100 messages to prevent buffer overflow
if messages_sent % 100 == 0:
# Use configured polling interval (default to 100 for async)
poll_interval = getattr(self, 'poll_interval', 100)
if messages_sent % poll_interval == 0:
poll_start = time.time()
await producer.poll(0)
poll_times.append(time.time() - poll_start)
Expand Down Expand Up @@ -219,3 +265,9 @@ async def async_produce():

loop = asyncio.get_event_loop()
return loop.run_until_complete(async_produce())

def get_final_metrics(self):
"""Return final metrics summary for the async producer"""
if self.metrics:
return self.metrics
return None
68 changes: 50 additions & 18 deletions tests/ducktape/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ def setUp(self):

self.logger.info("Successfully connected to Kafka")

def createProducer(self, producer_type):
def create_producer(self, producer_type, config_overrides=None):
"""Create appropriate producer strategy based on type"""
if producer_type == "sync":
return SyncProducerStrategy(self.kafka.bootstrap_servers(), self.logger)
strategy = SyncProducerStrategy(self.kafka.bootstrap_servers(), self.logger)
else: # async
return AsyncProducerStrategy(self.kafka.bootstrap_servers(), self.logger)
strategy = AsyncProducerStrategy(self.kafka.bootstrap_servers(), self.logger)

# Store config overrides for later use in create_producer
strategy.config_overrides = config_overrides
return strategy

@matrix(producer_type=["sync", "async"])
def test_basic_produce(self, producer_type):
Expand All @@ -57,7 +61,7 @@ def test_basic_produce(self, producer_type):
bounds = MetricsBounds()

# Create appropriate producer strategy
strategy = self.createProducer(producer_type)
strategy = self.create_producer(producer_type)

# Assign metrics collector to strategy
strategy.metrics = metrics
Expand Down Expand Up @@ -128,7 +132,7 @@ def test_produce_multiple_batches(self, producer_type, test_duration):
bounds.min_throughput_msg_per_sec = 50.0 # Lower threshold for short tests

# Create appropriate producer strategy
strategy = self.createProducer(producer_type)
strategy = self.create_producer(producer_type)

# Assign metrics collector to strategy
strategy.metrics = metrics
Expand All @@ -140,7 +144,7 @@ def test_produce_multiple_batches(self, producer_type, test_duration):

# Message formatter for batch test
def message_formatter(msg_num):
return f"Batch message {msg_num}", f"batch-key-{msg_num % 10}"
return f"Batch message {msg_num}", f"batch-key-{msg_num}"

# Containers for results
delivered_messages = []
Expand Down Expand Up @@ -168,11 +172,14 @@ def message_formatter(msg_num):
print_metrics_report(metrics_summary, is_valid, violations)

if final_metrics:
self.logger.info(f"=== AIOProducer Built-in Metrics ===")
self.logger.info(f"Runtime: {final_metrics['runtime_seconds']:.2f}s")
self.logger.info(f"Success Rate: {final_metrics['success_rate_percent']:.1f}%")
self.logger.info(f"Throughput: {final_metrics['throughput_msg_per_sec']:.1f} msg/sec")
self.logger.info(f"Latency: Avg={final_metrics['latency_avg_ms']:.1f}ms")
# Get the actual metrics dictionary
producer_metrics_summary = final_metrics.get_summary()
if producer_metrics_summary:
self.logger.info(f"=== Producer Built-in Metrics ===")
self.logger.info(f"Runtime: {producer_metrics_summary['duration_seconds']:.2f}s")
self.logger.info(f"Success Rate: {producer_metrics_summary['success_rate']:.3f}")
self.logger.info(f"Throughput: {producer_metrics_summary['send_throughput_msg_per_sec']:.1f} msg/sec")
self.logger.info(f"Latency: Avg={producer_metrics_summary['avg_latency_ms']:.1f}ms")

# Enhanced assertions using metrics
assert messages_sent > 0, "No messages were sent"
Expand Down Expand Up @@ -209,13 +216,35 @@ def test_produce_with_compression(self, producer_type, compression_type):
bounds.min_throughput_msg_per_sec = 5.0 # Lower threshold for large messages
bounds.max_p95_latency_ms = 5000.0 # Allow higher latency for compression

# Create appropriate producer strategy
strategy = self.createProducer(producer_type)
# Create appropriate producer strategy with compression config
compression_config = {}
if compression_type != 'none':
compression_config['compression.type'] = compression_type

# Configure polling intervals based on compression type and producer type
if producer_type == 'async':
polling_config = {
'gzip': 10, # Poll every 10 messages for gzip (frequent)
'snappy': 50, # Poll every 50 messages for snappy (moderate)
'none': 100 # Poll every 100 messages for none (standard)
}
else: # sync
# Sync producers need more frequent polling to prevent buffer overflow as throughput is very high
polling_config = {
'gzip': 5, # Poll every 5 messages for gzip (most frequent)
'snappy': 25, # Poll every 25 messages for snappy (moderate)
'none': 50 # Poll every 50 messages for none (standard)
}
poll_interval = polling_config.get(compression_type, 50 if producer_type == 'sync' else 100)

strategy = self.create_producer(producer_type, compression_config)
strategy.poll_interval = poll_interval

# Assign metrics collector to strategy
strategy.metrics = metrics

self.logger.info(f"Testing {producer_type} producer with {compression_type} compression for {test_duration} seconds")
self.logger.info(f"Using polling interval: {poll_interval} messages per poll")

# Start metrics collection
metrics.start()
Expand Down Expand Up @@ -253,11 +282,14 @@ def message_formatter(msg_num):
print_metrics_report(metrics_summary, is_valid, violations)

if final_metrics:
self.logger.info(f"=== AIOProducer Built-in Metrics ===")
self.logger.info(f"Runtime: {final_metrics['runtime_seconds']:.2f}s")
self.logger.info(f"Success Rate: {final_metrics['success_rate_percent']:.1f}%")
self.logger.info(f"Throughput: {final_metrics['throughput_msg_per_sec']:.1f} msg/sec")
self.logger.info(f"Latency: Avg={final_metrics['latency_avg_ms']:.1f}ms")
# Get the actual metrics dictionary
producer_metrics_summary = final_metrics.get_summary()
if producer_metrics_summary:
self.logger.info(f"=== Producer Built-in Metrics ===")
self.logger.info(f"Runtime: {producer_metrics_summary['duration_seconds']:.2f}s")
self.logger.info(f"Success Rate: {producer_metrics_summary['success_rate']:.3f}")
self.logger.info(f"Throughput: {producer_metrics_summary['send_throughput_msg_per_sec']:.1f} msg/sec")
self.logger.info(f"Latency: Avg={producer_metrics_summary['avg_latency_ms']:.1f}ms")

# Enhanced assertions using metrics
assert messages_sent > 0, "No messages were sent"
Expand Down