Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/ducktape/consumer_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ def create_consumer(self):
'enable.auto.commit': 'true',
'auto.commit.interval.ms': '5000'
}

consumer = Consumer(config)
return consumer

Expand Down Expand Up @@ -172,6 +171,7 @@ def create_consumer(self):
}

self._consumer_instance = AIOConsumer(config, max_workers=20)

return self._consumer_instance

def get_final_metrics(self):
Expand Down
6 changes: 5 additions & 1 deletion tests/ducktape/run_ducktape_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ def get_test_info(test_type):
'producer_sr': {
'file': 'test_producer_with_schema_registry.py',
'description': 'Producer with Schema Registry Tests'
},
'transactions': {
'file': 'test_transactions.py',
'description': 'Transactional Producer and Consumer Tests'
}
}
return test_info.get(test_type)
Expand All @@ -34,7 +38,7 @@ def get_test_info(test_type):
def main():
"""Run the ducktape test based on specified type"""
parser = argparse.ArgumentParser(description="Confluent Kafka Python - Ducktape Test Runner")
parser.add_argument('test_type', choices=['producer', 'consumer', 'producer_sr'],
parser.add_argument('test_type', choices=['producer', 'consumer', 'producer_sr', 'transactions'],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be possible to run specific tests if we use "producer" string parameters . Eg.
ducktape tests/ducktape/test_producer.py::SimpleProducerTest.test_basic_produce

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently I extracted the transaction tests into a standalone file as I think those involve both producer + consumer

help='Type of test to run')
parser.add_argument('test_method', nargs='?',
help='Specific test method to run (optional)')
Expand Down
263 changes: 263 additions & 0 deletions tests/ducktape/test_transactions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
"""
Ducktape tests for transactions in async producer and consumer.
"""
import uuid
import asyncio
from ducktape.tests.test import Test

from confluent_kafka.aio._AIOProducer import AIOProducer
from tests.ducktape.consumer_strategy import AsyncConsumerStrategy, SyncConsumerStrategy
from tests.ducktape.services.kafka import KafkaClient
from confluent_kafka import Producer


class TransactionsTest(Test):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need overall design on how to perf test transaction. One of options can be : Run n number of transaction end to end for async vs sync and compare performance

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - I was mostly doing integration tests to verify behavior, but maybe we should benchmark test transactions as well. Let me think about this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making transactions an optional configuration of the perf tests seems best as a follow-up. Make the perf tests essentially a config driven execution engine for parameters of a performance scenario we can pick and choose for a given context.

def __init__(self, test_context):
super(TransactionsTest, self).__init__(test_context=test_context)
self.kafka = KafkaClient(test_context, bootstrap_servers="localhost:9092")

def setUp(self):
if not self.kafka.verify_connection():
raise Exception("Cannot connect to Kafka at localhost:9092. Please ensure Kafka is running.")

def _new_topic(self, partitions: int = 1) -> str:
topic = f"tx-{uuid.uuid4()}"
self.kafka.create_topic(topic, partitions=partitions, replication_factor=1)
assert self.kafka.wait_for_topic(topic, max_wait_time=30)
return topic

def _create_transactional_producer(self, producer_type):
"""Create transactional producer based on type"""
# TODO: use producer_strategy to simplify code once it's merged
if producer_type == "sync":
config = {
'bootstrap.servers': self.kafka.bootstrap_servers(),
'batch.size': 65536,
'linger.ms': 1,
'compression.type': 'lz4',
'transactional.id': f'sync-tx-producer-{uuid.uuid4()}',
'acks': 'all',
'enable.idempotence': True
}
return Producer(config)
else: # async
config = {
'bootstrap.servers': self.kafka.bootstrap_servers(),
'batch.size': 65536,
'linger.ms': 1,
'compression.type': 'lz4',
'transactional.id': f'async-tx-producer-{uuid.uuid4()}',
'acks': 'all',
'enable.idempotence': True,
}
return AIOProducer(config, max_workers=10)

def _create_transactional_consumer(self, consumer_type, group_id=None):
"""Create read_committed consumer based on type using strategy pattern"""
if group_id is None:
group_id = f'tx-consumer-{uuid.uuid4()}'

# Overrides for transactional consumers
overrides = {
'isolation.level': 'read_committed',
'enable.auto.commit': False,
'auto.offset.reset': 'earliest',
}

if consumer_type == "sync":
strategy = SyncConsumerStrategy(self.kafka.bootstrap_servers(), group_id, self.logger)
return strategy.create_consumer(config_overrides=overrides)
else: # async
strategy = AsyncConsumerStrategy(self.kafka.bootstrap_servers(), group_id, self.logger)
return strategy.create_consumer(config_overrides=overrides)

async def _transactional_produce(self, producer, topic: str, values: list[str], partition: int | None = None):
"""Produce values within an active transaction, polling to drive delivery."""
for v in values:
kwargs = {'topic': topic, 'value': v}
if partition is not None:
kwargs['partition'] = partition
await producer.produce(**kwargs)
await producer.poll(0.0)

async def _seed_topic(self, topic: str, values: list[str]):
"""Seed a topic using transactional producer."""
producer = self._create_transactional_producer("sync")
producer.init_transactions()
producer.begin_transaction()
for v in values:
producer.produce(topic, value=v)
producer.poll(0.0)
producer.commit_transaction()
producer.flush()

# =========== Functional tests (async) ===========

def test_commit_transaction(self):
"""Committed transactional messages must be visible to read_committed consumer."""
async def run():
topic = self._new_topic()
producer = self._create_transactional_producer("async")
consumer = self._create_transactional_consumer("async")
try:
await producer.init_transactions()
await producer.begin_transaction()
await self._transactional_produce(producer, topic, [f'c{i}' for i in range(5)])
await producer.commit_transaction()

await consumer.subscribe([topic])
seen = []
for _ in range(20):
msg = await consumer.poll(timeout=1.0)
if not msg or msg.error():
continue
seen.append(msg.value().decode('utf-8'))
if len(seen) >= 5:
break
assert len(seen) == 5, f"expected 5 committed messages, got {len(seen)}"
finally:
await consumer.close()
asyncio.run(run())

def test_abort_transaction_then_retry_commit(self):
"""Aborted messages must be invisible, and retrying with new transaction must only commit visible results."""
async def run():
topic = self._new_topic()
producer = self._create_transactional_producer("async")
consumer = self._create_transactional_consumer("async")
try:
await producer.init_transactions()

# Abort case
await producer.begin_transaction()
await self._transactional_produce(producer, topic, [f'a{i}' for i in range(3)])
await producer.abort_transaction()

await consumer.subscribe([topic])
aborted_seen = []
for _ in range(10):
msg = await consumer.poll(timeout=1.0)
if not msg or msg.error():
continue
val = msg.value().decode('utf-8')
if val.startswith('a'):
aborted_seen.append(val)
assert not aborted_seen, f"aborted messages should be invisible, saw {aborted_seen}"

# Retry-commit flow
await producer.begin_transaction()
retry_vals = [f'r{i}' for i in range(3)]
await self._transactional_produce(producer, topic, retry_vals)
await producer.commit_transaction()

# Verify only retry values appear
seen = []
for _ in range(20):
msg = await consumer.poll(timeout=1.0)
if not msg or msg.error():
continue
val = msg.value().decode('utf-8')
seen.append(val)
if all(rv in seen for rv in retry_vals):
break
assert all(rv in seen for rv in retry_vals), f"expected retry values {retry_vals}, saw {seen}"
assert all(not s.startswith('a') for s in seen), f"should not see aborted values, saw {seen}"
finally:
await consumer.close()
asyncio.run(run())

def test_send_offsets_to_transaction(self):
"""Offsets committed atomically with produced results using send_offsets_to_transaction."""
async def run():
input_topic = self._new_topic()
output_topic = self._new_topic()

# Seed input
input_vals = [f'in{i}' for i in range(5)]
await self._seed_topic(input_topic, input_vals)

producer = self._create_transactional_producer("async")
consumer = self._create_transactional_consumer("async")
try:
await consumer.subscribe([input_topic])

# Consume a small batch from input
consumed = []
for _ in range(20):
msg = await consumer.poll(timeout=1.0)
if not msg or msg.error():
continue
consumed.append(msg)
if len(consumed) >= 3:
break
assert consumed, "expected to consume at least 1 message from input"

# Begin transaction: produce results and commit consumer offsets atomically
await producer.init_transactions()
await producer.begin_transaction()

out_vals = [f'out:{m.value().decode("utf-8")}' for m in consumed]
await self._transactional_produce(producer, output_topic, out_vals)

assignment = await consumer.assignment()
positions = await consumer.position(assignment)
group_metadata = await consumer.consumer_group_metadata()

await producer.send_offsets_to_transaction(positions, group_metadata)
await producer.commit_transaction()

# Verify output has results
out_consumer = self._create_transactional_consumer("async")
try:
await out_consumer.subscribe([output_topic])
seen = []
for _ in range(20):
msg = await out_consumer.poll(timeout=1.0)
if not msg or msg.error():
continue
seen.append(msg.value().decode('utf-8'))
if len(seen) >= len(out_vals):
break
assert set(seen) == set(out_vals), f"expected {out_vals}, saw {seen}"
finally:
await out_consumer.close()

# Verify committed offsets advanced to positions
committed = await consumer.committed(assignment)
for pos, comm in zip(positions, committed):
assert comm.offset >= pos.offset, f"committed {comm.offset} < position {pos.offset}"
finally:
await consumer.close()
asyncio.run(run())

def test_commit_multiple_topics_partitions(self):
"""Commit atomically across multiple topics/partitions."""
async def run():
topic_a = self._new_topic(partitions=2)
topic_b = self._new_topic(partitions=1)

producer = self._create_transactional_producer("async")
consumer = self._create_transactional_consumer("async")
try:
await producer.init_transactions()
await producer.begin_transaction()
# Produce across A partitions and B
await self._transactional_produce(producer, topic_a, ["a0-p0", "a1-p0"], partition=0)
await self._transactional_produce(producer, topic_a, ["a0-p1", "a1-p1"], partition=1)
await self._transactional_produce(producer, topic_b, ["b0", "b1"]) # default partition
await producer.commit_transaction()

await consumer.subscribe([topic_a, topic_b])
expected = {"a0-p0", "a1-p0", "a0-p1", "a1-p1", "b0", "b1"}
seen = set()
for _ in range(30):
msg = await consumer.poll(timeout=1.0)
if not msg or msg.error():
continue
seen.add(msg.value().decode('utf-8'))
if seen == expected:
break
assert seen == expected, f"expected {expected}, saw {seen}"
finally:
await consumer.close()
asyncio.run(run())