Skip to content

Conversation

fangnx
Copy link
Member

@fangnx fangnx commented Sep 16, 2025

What

Add new unit and integration tests to async consumer:

  • test_async_consumer_joins_and_leaves_rebalance: Tests the complete lifecycle of consumers joining and leaving a consumer group, ensuring proper partition redistribution during rebalancing events
  • test_async_topic_partition_changes_rebalance: Tests dynamic partition addition to existing topics and verifies that consumers properly detect and rebalance across the new partitions
  • test_async_callback_exception_behavior: Tests how async consumer handles exceptions in callback (currently propagating the failure and failing the consumer). TODO: check whether this behavior is intended
  • TestAIOConsumer: new unit tests for error handling

Checklist

  • Contains customer facing changes? Including API/behavior changes
  • Did you add sufficient unit test and/or integration test coverage for this PR?
    • If not, please explain why it is not required

References

JIRA: https://confluentinc.atlassian.net/browse/NONJAVACLI-3988

Test & Review

Open questions / Follow-ups

@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@sonarqube-confluent

This comment has been minimized.

@fangnx fangnx changed the title WIP: Add more async consumer integration tests Add more async consumer unit & integration tests Sep 16, 2025
@fangnx fangnx marked this pull request as ready for review September 16, 2025 21:42
@fangnx fangnx requested review from MSeal and a team as code owners September 16, 2025 21:42
Copy link
Contributor

@MSeal MSeal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor change request, glad to see wider edge case coverage

def add_partitions(self, topic_name, new_partition_count):
"""Add partitions to an existing topic"""
try:
from confluent_kafka.admin import AdminClient, NewPartitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move imports to top, no need for JIT here

finally:
await consumer1.close()

asyncio.run(async_rebalance_test())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A quick look at ducktape looked like it doesn't support async test defs to avoid this wrapper. Maybe we could make an async supporting wrapper class that does this in the Metaclass? Could contribute it back to enable async tests in ducktape later on. Can be a follow-up task -- don't block this PR on it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes ducktape doesn't support it right now. Agreed - it's kinda verbose, and I can add a backlog ticket to work on after the main tasks are done

await consumer3.subscribe([topic_name], on_assign=track_rebalance)

# Poll all consumers until they detect new partitions and rebalance
for _ in range(15): # Max 15 seconds for partition discovery
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be longer given the consumers each poll as well for 1 second consecutively correct?


asyncio.run(async_topic_change_test())

# TODO: verify if the current behavior is correct/intended
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be surprised if it swallowed a callback error without reraising tbh

@fangnx fangnx requested a review from MSeal September 17, 2025 17:20
@sonarqube-confluent
Copy link

Failed

  • 67.50% Coverage on New Code (is less than 80.00%)

Analysis Details

22 Issues

  • Bug 2 Bugs
  • Vulnerability 0 Vulnerabilities
  • Code Smell 20 Code Smells

Coverage and Duplications

  • Coverage 67.50% Coverage (66.10% Estimated after merge)
  • Duplications No duplication information (5.10% Estimated after merge)

Project ID: confluent-kafka-python

View in SonarQube

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants