-
Notifications
You must be signed in to change notification settings - Fork 934
Update AsyncIO producer architecture to improve performance #2044
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
5266c79
d03a13c
c4306ea
91998de
469a6b4
6e9d67e
63bfdd7
77315ec
762ebaa
babeabc
be25ab4
3aa9656
1a1b26f
8e1f2c4
ce534cf
287026d
7ac679c
ffadbaa
b066a15
80a0696
efcf6d4
9014db9
1c36d3d
35edc28
fd772b4
215379d
fe0df1c
c44fbe8
1562df2
2ab6208
e91f163
e839926
6a236e5
b779fb8
e295d28
6bdfb16
533fdd3
a40466b
c63000c
950593f
c3f38f3
1087737
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -32,26 +32,27 @@ def __init__(self, producer_conf, max_workers=1, executor=None): | |||
| wrap_common_callbacks(asyncio.get_running_loop(), producer_conf) | ||||
|
|
||||
| self._producer = confluent_kafka.Producer(producer_conf) | ||||
|
|
||||
| # Internal metrics tracking | ||||
| self._metrics_callback = None | ||||
| self._send_times = {} # Track send times for latency calculation | ||||
|
|
||||
| def set_metrics_callback(self, callback): | ||||
| """Set callback for delivery metrics: callback(latency_ms, topic, partition, success)""" | ||||
| self._metrics_callback = callback | ||||
|
|
||||
| # ======================================================================== | ||||
| # HYBRID OPERATIONS - Blocking behavior depends on parameters | ||||
| # ======================================================================== | ||||
|
|
||||
| async def poll(self, timeout=0, *args, **kwargs): | ||||
| """Processes callbacks - blocking behavior depends on timeout | ||||
| """Processes delivery callbacks from librdkafka - blocking behavior depends on timeout | ||||
|
|
||||
| This method triggers any pending delivery callbacks (on_delivery) that have been | ||||
| queued by librdkafka when messages are delivered or fail to deliver. | ||||
|
|
||||
| Args: | ||||
| timeout: 0 = non-blocking, >0 = block up to timeout seconds, -1 = block indefinitely | ||||
| timeout: Timeout in seconds for waiting for callbacks: | ||||
| - 0 = non-blocking, return immediately after processing available callbacks | ||||
| - >0 = block up to timeout seconds waiting for new callbacks to arrive | ||||
| - -1 = block indefinitely until callbacks are available | ||||
|
|
||||
| Returns: | ||||
| Number of callbacks processed during this call | ||||
| """ | ||||
| if timeout > 0: | ||||
| if timeout > 0 or timeout == -1: | ||||
k-raina marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
| # Blocking call - use ThreadPool to avoid blocking event loop | ||||
|
||||
| return await self._call(self._producer.poll, timeout, *args, **kwargs) | ||||
| else: | ||||
k-raina marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
|
|
@@ -74,36 +75,26 @@ async def produce(self, topic, value=None, key=None, *args, **kwargs): | |||
| """ | ||||
| # Get current running event loop | ||||
| result = asyncio.get_running_loop().create_future() | ||||
|
|
||||
| # Track send time for latency calculation if metrics are enabled | ||||
| send_time = None | ||||
| message_id = None | ||||
| if self._metrics_callback: | ||||
| import time | ||||
| send_time = time.time() | ||||
| message_id = f"{topic}:{key}:{id(result)}" # Unique message identifier | ||||
| self._send_times[message_id] = send_time | ||||
|
|
||||
| # Store user's original callback if provided | ||||
| user_callback = kwargs.get('on_delivery') | ||||
|
|
||||
| # Pre-bind variables to avoid closure overhead | ||||
| def on_delivery(err, msg): | ||||
k-raina marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
| if err: | ||||
| # Handle delivery failure | ||||
| if self._metrics_callback and message_id: | ||||
| latency_ms = 0.0 | ||||
| if message_id in self._send_times: | ||||
| latency_ms = (time.time() - self._send_times[message_id]) * 1000 | ||||
| del self._send_times[message_id] | ||||
| self._metrics_callback(latency_ms, topic, 0, False) # success=False | ||||
| result.set_exception(_KafkaException(err)) | ||||
| else: | ||||
| # Handle delivery success | ||||
| if self._metrics_callback and message_id: | ||||
| latency_ms = 0.0 | ||||
| if message_id in self._send_times: | ||||
| latency_ms = (time.time() - self._send_times[message_id]) * 1000 | ||||
| del self._send_times[message_id] | ||||
| self._metrics_callback(latency_ms, msg.topic(), msg.partition(), True) # success=True | ||||
| result.set_result(msg) | ||||
|
|
||||
| # Call user's callback on successful delivery | ||||
| if user_callback: | ||||
| try: | ||||
| user_callback(err, msg) # err is None here | ||||
|
||||
| except Exception: | ||||
| # Log but don't propagate user callback errors to avoid breaking delivery confirmation | ||||
|
||||
| pass | ||||
|
|
||||
| kwargs['on_delivery'] = on_delivery | ||||
k-raina marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
| self._producer.produce(topic, value, key, *args, **kwargs) | ||||
|
||||
| n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, as happened in the consume benchmark. Just in the consumer we cannot implement a single message poll with a larger consume call because when we call consume the full batch can be auto stored, we would need to use manual offset management and we don't want to do complex things here, just follow existing API, so we can just tell users to use the consume call for high throughput.
For the produce call instead we can accumulate the calls coming from produce and when finished sending a previous batch we send the next one. This will change a bit the ordering of messages but in the context of concurrent production we aren't ensuring it.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,12 +24,28 @@ def produce_messages(self, topic_name, test_duration, start_time, message_format | |
|
|
||
|
|
||
| class SyncProducerStrategy(ProducerStrategy): | ||
| def create_producer(self): | ||
| producer = Producer({'bootstrap.servers': self.bootstrap_servers}) | ||
| def create_producer(self, config_overrides=None): | ||
| config = { | ||
|
||
| 'bootstrap.servers': self.bootstrap_servers | ||
| } | ||
|
|
||
| # Apply any test-specific overrides | ||
| if config_overrides: | ||
| config.update(config_overrides) | ||
|
|
||
| producer = Producer(config) | ||
|
|
||
| # Log the configuration for validation | ||
| self.logger.info("=== SYNC PRODUCER CONFIGURATION ===") | ||
| for key, value in config.items(): | ||
| self.logger.info(f"{key}: {value}") | ||
| self.logger.info("=" * 40) | ||
|
|
||
| return producer | ||
|
|
||
| def produce_messages(self, topic_name, test_duration, start_time, message_formatter, delivered_container, failed_container=None): | ||
k-raina marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| producer = self.create_producer() | ||
| config_overrides = getattr(self, 'config_overrides', None) | ||
| producer = self.create_producer(config_overrides) | ||
| messages_sent = 0 | ||
| send_times = {} # Track send times for latency calculation | ||
|
|
||
|
|
@@ -78,8 +94,10 @@ def delivery_callback(err, msg): | |
| produce_times.append(time.time() - produce_start) | ||
| messages_sent += 1 | ||
|
|
||
| # Poll every 100 messages to prevent buffer overflow | ||
| if messages_sent % 100 == 0: | ||
| # Use configured polling interval (default to 50 if not set) | ||
| poll_interval = getattr(self, 'poll_interval', 50) | ||
|
|
||
| if messages_sent % poll_interval == 0: | ||
| poll_start = time.time() | ||
| producer.poll(0) | ||
| poll_times.append(time.time() - poll_start) | ||
|
|
@@ -112,39 +130,52 @@ def delivery_callback(err, msg): | |
| print("=" * 45) | ||
|
|
||
| return messages_sent | ||
|
|
||
| def get_final_metrics(self): | ||
| """Return final metrics summary for the sync producer""" | ||
| if self.metrics: | ||
| return self.metrics | ||
| return None | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically not needed since no return defaults to None return but ok to be explicit |
||
|
|
||
|
|
||
| class AsyncProducerStrategy(ProducerStrategy): | ||
| def __init__(self, *args, **kwargs): | ||
| super().__init__(*args, **kwargs) | ||
| self._producer_instance = None | ||
|
|
||
| def create_producer(self): | ||
| def create_producer(self, config_overrides=None): | ||
| from confluent_kafka.aio import AIOProducer | ||
| # Enable logging for AIOProducer | ||
| import logging | ||
| logging.basicConfig(level=logging.INFO) | ||
|
|
||
| self._producer_instance = AIOProducer({'bootstrap.servers': self.bootstrap_servers}, max_workers=20) | ||
| config = { | ||
| 'bootstrap.servers': self.bootstrap_servers | ||
| } | ||
|
|
||
| # Set up internal metrics callback if metrics are enabled | ||
| if self.metrics: | ||
| def metrics_callback(latency_ms, topic, partition, success): | ||
| if success: | ||
| self.metrics.record_delivered(latency_ms, topic=topic, partition=partition) | ||
| else: | ||
| self.metrics.record_failed(topic=topic, partition=partition) | ||
| self._producer_instance.set_metrics_callback(metrics_callback) | ||
| # Apply any test-specific overrides | ||
| if config_overrides: | ||
| config.update(config_overrides) | ||
|
|
||
| self._producer_instance = AIOProducer(config, max_workers=20) | ||
|
|
||
| # Log the configuration for validation | ||
| self.logger.info("=== ASYNC PRODUCER CONFIGURATION ===") | ||
| for key, value in config.items(): | ||
| self.logger.info(f"{key}: {value}") | ||
| self.logger.info("=" * 41) | ||
|
|
||
| return self._producer_instance | ||
|
|
||
|
|
||
| def produce_messages(self, topic_name, test_duration, start_time, message_formatter, delivered_container, failed_container=None): | ||
|
|
||
| async def async_produce(): | ||
| producer = self.create_producer() | ||
| config_overrides = getattr(self, 'config_overrides', None) | ||
| producer = self.create_producer(config_overrides) | ||
| messages_sent = 0 | ||
| pending_futures = [] | ||
| send_times = {} # Track send times for latency calculation | ||
|
|
||
| # Temporary metrics for timing sections | ||
| produce_times = [] | ||
|
|
@@ -155,24 +186,39 @@ async def async_produce(): | |
| message_value, message_key = message_formatter(messages_sent) | ||
|
|
||
| try: | ||
| # Record sent message for metrics | ||
| # Record sent message for metrics and track send time | ||
| if self.metrics: | ||
| message_size = len(message_value.encode('utf-8')) + len(message_key.encode('utf-8')) | ||
| self.metrics.record_sent(message_size, topic=topic_name, partition=0) | ||
| send_times[message_key] = time.time() # Track send time for latency | ||
|
|
||
| # Produce message | ||
| # Create metrics callback for this message | ||
| def create_metrics_callback(msg_key): | ||
| def metrics_callback(err, msg): | ||
| if self.metrics and not err: | ||
| # Calculate latency if we have send time | ||
| latency_ms = 0.0 | ||
| if msg_key in send_times: | ||
| latency_ms = (time.time() - send_times[msg_key]) * 1000 | ||
| del send_times[msg_key] | ||
| self.metrics.record_delivered(latency_ms, topic=msg.topic(), partition=msg.partition()) | ||
| return metrics_callback | ||
|
|
||
| # Produce message with metrics callback | ||
| produce_start = time.time() | ||
| delivery_future = await producer.produce( | ||
| topic=topic_name, | ||
| value=message_value, | ||
| key=message_key | ||
| key=message_key, | ||
| on_delivery=create_metrics_callback(message_key) if self.metrics else None | ||
| ) | ||
| produce_times.append(time.time() - produce_start) | ||
| pending_futures.append((delivery_future, message_key)) # Store delivery future | ||
| messages_sent += 1 | ||
|
|
||
| # Poll every 100 messages to prevent buffer overflow | ||
| if messages_sent % 100 == 0: | ||
| # Use configured polling interval (default to 100 for async) | ||
| poll_interval = getattr(self, 'poll_interval', 100) | ||
| if messages_sent % poll_interval == 0: | ||
| poll_start = time.time() | ||
| await producer.poll(0) | ||
| poll_times.append(time.time() - poll_start) | ||
|
|
@@ -219,3 +265,9 @@ async def async_produce(): | |
|
|
||
| loop = asyncio.get_event_loop() | ||
| return loop.run_until_complete(async_produce()) | ||
|
|
||
| def get_final_metrics(self): | ||
| """Return final metrics summary for the async producer""" | ||
| if self.metrics: | ||
| return self.metrics | ||
| return None | ||
Uh oh!
There was an error while loading. Please reload this page.