Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions tests/ducktape/consumer_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def get_final_metrics(self):


class SyncConsumerStrategy(ConsumerStrategy):
def create_consumer(self):
def create_consumer(self, config_overrides=None):
config = {
'bootstrap.servers': self.bootstrap_servers,
'group.id': self.group_id,
Expand All @@ -39,7 +39,19 @@ def create_consumer(self):
'auto.commit.interval.ms': '5000'
}

# Apply any test-specific overrides
if config_overrides:
config.update(config_overrides)

consumer = Consumer(config)

# Log the configuration for validation
if self.logger:
self.logger.info("=== SYNC CONSUMER CONFIGURATION ===")
for key, value in config.items():
self.logger.info(f"{key}: {value}")
self.logger.info("=" * 40)

return consumer

def get_final_metrics(self):
Expand Down Expand Up @@ -162,7 +174,7 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._consumer_instance = None

def create_consumer(self):
def create_consumer(self, config_overrides=None):
config = {
'bootstrap.servers': self.bootstrap_servers,
'group.id': self.group_id,
Expand All @@ -171,7 +183,19 @@ def create_consumer(self):
'auto.commit.interval.ms': '5000'
}

# Apply any test-specific overrides
if config_overrides:
config.update(config_overrides)

self._consumer_instance = AIOConsumer(config, max_workers=20)

# Log the configuration for validation
if self.logger:
self.logger.info("=== ASYNC CONSUMER CONFIGURATION ===")
for key, value in config.items():
self.logger.info(f"{key}: {value}")
self.logger.info("=" * 41)

return self._consumer_instance

def get_final_metrics(self):
Expand Down
77 changes: 77 additions & 0 deletions tests/ducktape/producer_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""
Producer strategies for testing sync and async Kafka producers.

This module contains strategy classes that encapsulate the different producer
implementations (sync vs async) with consistent interfaces for testing.
"""


class ProducerStrategy:
"""Base class for producer strategies"""
def __init__(self, bootstrap_servers, logger):
self.bootstrap_servers = bootstrap_servers
self.logger = logger
self.metrics = None

def create_producer(self):
raise NotImplementedError()

def produce_messages(
self,
topic_name,
test_duration,
start_time,
message_formatter,
delivered_container,
failed_container=None,
):
raise NotImplementedError()


class AsyncProducerStrategy(ProducerStrategy):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._producer_instance = None

def create_producer(self, config_overrides=None):
from confluent_kafka.aio import AIOProducer
# Enable logging for AIOProducer
import logging
logging.basicConfig(level=logging.INFO)

config = {
'bootstrap.servers': self.bootstrap_servers,
# Optimized configuration for low-latency, high-throughput (same as async)
'queue.buffering.max.messages': 1000000, # 1M messages (sufficient)
'queue.buffering.max.kbytes': 1048576, # 1GB (default)
'batch.size': 65536, # 64KB batches
'batch.num.messages': 50000, # 50K messages per batch
'message.max.bytes': 2097152, # 2MB max message size
'linger.ms': 1, # Low latency
'compression.type': 'lz4', # Fast compression
'acks': 1, # Leader only
'retries': 3, # Retry failed sends
'delivery.timeout.ms': 30000, # 30s
'max.in.flight.requests.per.connection': 5
}

# Apply any test-specific overrides
if config_overrides:
config.update(config_overrides)

# Get producer configuration from strategy attributes
max_workers = getattr(self, 'max_workers', 4)
batch_size = getattr(self, 'batch_size', 1000) # Optimal batch size for low latency

# Use updated defaults with configurable parameters
self._producer_instance = AIOProducer(config, max_workers=max_workers)

# Log the configuration for validation
self.logger.info("=== ASYNC PRODUCER CONFIGURATION ===")
for key, value in config.items():
self.logger.info(f"{key}: {value}")
self.logger.info(f"max_workers: {max_workers}")
self.logger.info(f"batch_size: {batch_size}")
self.logger.info("=" * 41)

return self._producer_instance
6 changes: 5 additions & 1 deletion tests/ducktape/run_ducktape_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ def get_test_info(test_type):
'producer_sr': {
'file': 'test_producer_with_schema_registry.py',
'description': 'Producer with Schema Registry Tests'
},
'transactions': {
'file': 'test_transactions.py',
'description': 'Transactional Producer and Consumer Tests'
}
}
return test_info.get(test_type)
Expand All @@ -34,7 +38,7 @@ def get_test_info(test_type):
def main():
"""Run the ducktape test based on specified type"""
parser = argparse.ArgumentParser(description="Confluent Kafka Python - Ducktape Test Runner")
parser.add_argument('test_type', choices=['producer', 'consumer', 'producer_sr'],
parser.add_argument('test_type', choices=['producer', 'consumer', 'producer_sr', 'transactions'],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be possible to run specific tests if we use "producer" string parameters . Eg.
ducktape tests/ducktape/test_producer.py::SimpleProducerTest.test_basic_produce

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently I extracted the transaction tests into a standalone file as I think those involve both producer + consumer

help='Type of test to run')
parser.add_argument('test_method', nargs='?',
help='Specific test method to run (optional)')
Expand Down
238 changes: 238 additions & 0 deletions tests/ducktape/test_transactions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
"""
Ducktape tests for transactions in async producer and consumer.
"""
import uuid
import asyncio
from ducktape.tests.test import Test

from tests.ducktape.producer_strategy import AsyncProducerStrategy
from tests.ducktape.consumer_strategy import AsyncConsumerStrategy
from tests.ducktape.services.kafka import KafkaClient
from confluent_kafka.aio import AIOConsumer


class TransactionsTest(Test):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need overall design on how to perf test transaction. One of options can be : Run n number of transaction end to end for async vs sync and compare performance

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - I was mostly doing integration tests to verify behavior, but maybe we should benchmark test transactions as well. Let me think about this

def __init__(self, test_context):
super(TransactionsTest, self).__init__(test_context=test_context)
self.kafka = KafkaClient(test_context, bootstrap_servers="localhost:9092")

def setUp(self):
if not self.kafka.verify_connection():
raise Exception("Cannot connect to Kafka at localhost:9092. Please ensure Kafka is running.")

def _new_topic(self, partitions: int = 1) -> str:
topic = f"tx-{uuid.uuid4()}"
self.kafka.create_topic(topic, partitions=partitions, replication_factor=1)
assert self.kafka.wait_for_topic(topic, max_wait_time=30)
return topic

def _new_tx_producer(self):
"""Create an async Producer via strategy with transactional config."""
strategy = AsyncProducerStrategy(self.kafka.bootstrap_servers(), self.logger)
overrides = {
'transactional.id': f'tx-producer-{uuid.uuid4()}',
'acks': 'all',
'enable.idempotence': True,
}
return strategy.create_producer(config_overrides=overrides)

def _new_tx_consumer(self) -> AIOConsumer:
group_id = f'tx-consumer-{uuid.uuid4()}'
strategy = AsyncConsumerStrategy(self.kafka.bootstrap_servers(), group_id, self.logger)
overrides = {
'isolation.level': 'read_committed',
'enable.auto.commit': False,
}
return strategy.create_consumer(config_overrides=overrides)

async def _transactional_produce(self, producer, topic: str, values: list[str], partition: int | None = None):
"""Produce values within an active transaction, polling to drive delivery."""
for v in values:
kwargs = {'topic': topic, 'value': v}
if partition is not None:
kwargs['partition'] = partition
await producer.produce(**kwargs)
await producer.poll(0.0)

async def _seed_topic(self, topic: str, values: list[str]):
"""Seed a topic using async producer for consistency."""
strategy = AsyncProducerStrategy(self.kafka.bootstrap_servers(), self.logger)
producer = strategy.create_producer()
for v in values:
await producer.produce(topic, value=v)
await producer.poll(0.0)
# best-effort flush
if hasattr(producer, 'flush'):
try:
await producer.flush()
except Exception:
pass

def test_commit_transaction(self):
"""Committed transactional messages must be visible to read_committed consumer."""
async def run():
topic = self._new_topic()
producer = self._new_tx_producer()
consumer = self._new_tx_consumer()
try:
await producer.init_transactions()
await producer.begin_transaction()
await self._transactional_produce(producer, topic, [f'c{i}' for i in range(5)])
await producer.commit_transaction()

await consumer.subscribe([topic])
seen = []
for _ in range(20):
msg = await consumer.poll(timeout=1.0)
if not msg or msg.error():
continue
seen.append(msg.value().decode('utf-8'))
if len(seen) >= 5:
break
assert len(seen) == 5, f"expected 5 committed messages, got {len(seen)}"
finally:
await consumer.close()
asyncio.run(run())

def test_abort_transaction_then_retry_commit(self):
"""Aborted messages must be invisible, and retrying with new transaction must only commit visible results."""
async def run():
topic = self._new_topic()
producer = self._new_tx_producer()
consumer = self._new_tx_consumer()
try:
await producer.init_transactions()

# Abort case
await producer.begin_transaction()
await self._transactional_produce(producer, topic, [f'a{i}' for i in range(3)])
await producer.abort_transaction()

await consumer.subscribe([topic])
aborted_seen = []
for _ in range(10):
msg = await consumer.poll(timeout=1.0)
if not msg or msg.error():
continue
val = msg.value().decode('utf-8')
if val.startswith('a'):
aborted_seen.append(val)
assert not aborted_seen, f"aborted messages should be invisible, saw {aborted_seen}"

# Retry-commit flow
await producer.begin_transaction()
retry_vals = [f'r{i}' for i in range(3)]
await self._transactional_produce(producer, topic, retry_vals)
await producer.commit_transaction()

# Verify only retry values appear
seen = []
for _ in range(20):
msg = await consumer.poll(timeout=1.0)
if not msg or msg.error():
continue
val = msg.value().decode('utf-8')
seen.append(val)
if all(rv in seen for rv in retry_vals):
break
assert all(rv in seen for rv in retry_vals), f"expected retry values {retry_vals}, saw {seen}"
assert all(not s.startswith('a') for s in seen), f"should not see aborted values, saw {seen}"
finally:
await consumer.close()
asyncio.run(run())

def test_send_offsets_to_transaction(self):
"""Offsets committed atomically with produced results using send_offsets_to_transaction."""
async def run():
input_topic = self._new_topic()
output_topic = self._new_topic()

# Seed input
input_vals = [f'in{i}' for i in range(5)]
await self._seed_topic(input_topic, input_vals)

producer = self._new_tx_producer()
consumer = self._new_tx_consumer()
try:
await consumer.subscribe([input_topic])

# Consume a small batch from input
consumed = []
for _ in range(20):
msg = await consumer.poll(timeout=1.0)
if not msg or msg.error():
continue
consumed.append(msg)
if len(consumed) >= 3:
break
assert consumed, "expected to consume at least 1 message from input"

# Begin transaction: produce results and commit consumer offsets atomically
await producer.init_transactions()
await producer.begin_transaction()

out_vals = [f'out:{m.value().decode("utf-8")}' for m in consumed]
await self._transactional_produce(producer, output_topic, out_vals)

assignment = await consumer.assignment()
positions = await consumer.position(assignment)
group_metadata = await consumer.consumer_group_metadata()

await producer.send_offsets_to_transaction(positions, group_metadata)
await producer.commit_transaction()

# Verify output has results
out_consumer = self._new_tx_consumer()
try:
await out_consumer.subscribe([output_topic])
seen = []
for _ in range(20):
msg = await out_consumer.poll(timeout=1.0)
if not msg or msg.error():
continue
seen.append(msg.value().decode('utf-8'))
if len(seen) >= len(out_vals):
break
assert set(seen) == set(out_vals), f"expected {out_vals}, saw {seen}"
finally:
await out_consumer.close()

# Verify committed offsets advanced to positions
committed = await consumer.committed(assignment)
for pos, comm in zip(positions, committed):
assert comm.offset >= pos.offset, f"committed {comm.offset} < position {pos.offset}"
finally:
await consumer.close()
asyncio.run(run())

def test_commit_multiple_topics_partitions(self):
"""Commit atomically across multiple topics/partitions."""
async def run():
topic_a = self._new_topic(partitions=2)
topic_b = self._new_topic(partitions=1)

producer = self._new_tx_producer()
consumer = self._new_tx_consumer()
try:
await producer.init_transactions()
await producer.begin_transaction()
# Produce across A partitions and B
await self._transactional_produce(producer, topic_a, ["a0-p0", "a1-p0"], partition=0)
await self._transactional_produce(producer, topic_a, ["a0-p1", "a1-p1"], partition=1)
await self._transactional_produce(producer, topic_b, ["b0", "b1"]) # default partition
await producer.commit_transaction()

await consumer.subscribe([topic_a, topic_b])
expected = {"a0-p0", "a1-p0", "a0-p1", "a1-p1", "b0", "b1"}
seen = set()
for _ in range(30):
msg = await consumer.poll(timeout=1.0)
if not msg or msg.error():
continue
seen.add(msg.value().decode('utf-8'))
if seen == expected:
break
assert seen == expected, f"expected {expected}, saw {seen}"
finally:
await consumer.close()
asyncio.run(run())