-
Notifications
You must be signed in to change notification settings - Fork 930
Add more async consumer unit & integration tests #2052
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: async
Are you sure you want to change the base?
Conversation
🎉 All Contributor License Agreements have been signed. Ready to merge. |
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor change request, glad to see wider edge case coverage
def add_partitions(self, topic_name, new_partition_count): | ||
"""Add partitions to an existing topic""" | ||
try: | ||
from confluent_kafka.admin import AdminClient, NewPartitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move imports to top, no need for JIT here
finally: | ||
await consumer1.close() | ||
|
||
asyncio.run(async_rebalance_test()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A quick look at ducktape looked like it doesn't support async test defs to avoid this wrapper. Maybe we could make an async supporting wrapper class that does this in the Metaclass? Could contribute it back to enable async tests in ducktape later on. Can be a follow-up task -- don't block this PR on it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes ducktape doesn't support it right now. Agreed - it's kinda verbose, and I can add a backlog ticket to work on after the main tasks are done
tests/ducktape/test_consumer.py
Outdated
await consumer3.subscribe([topic_name], on_assign=track_rebalance) | ||
|
||
# Poll all consumers until they detect new partitions and rebalance | ||
for _ in range(15): # Max 15 seconds for partition discovery |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be longer given the consumers each poll as well for 1 second consecutively correct?
tests/ducktape/test_consumer.py
Outdated
|
||
asyncio.run(async_topic_change_test()) | ||
|
||
# TODO: verify if the current behavior is correct/intended |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be surprised if it swallowed a callback error without reraising tbh
What
Add new unit and integration tests to async consumer:
test_async_consumer_joins_and_leaves_rebalance
: Tests the complete lifecycle of consumers joining and leaving a consumer group, ensuring proper partition redistribution during rebalancing eventstest_async_topic_partition_changes_rebalance
: Tests dynamic partition addition to existing topics and verifies that consumers properly detect and rebalance across the new partitionstest_async_callback_exception_behavior
: Tests how async consumer handles exceptions in callback (currently propagating the failure and failing the consumer). TODO: check whether this behavior is intendedChecklist
References
JIRA: https://confluentinc.atlassian.net/browse/NONJAVACLI-3988
Test & Review
Open questions / Follow-ups