-
Notifications
You must be signed in to change notification settings - Fork 930
[KIP-848] Added online upgrade and downgrade test #2012
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds comprehensive integration tests for Kafka consumer upgrade and downgrade scenarios, specifically testing the transition between classic and consumer group protocols. The tests verify that consumers can successfully upgrade from classic protocol to consumer protocol and downgrade back, while maintaining proper partition assignment and message consumption throughout the process.
Key changes:
- Added new integration test file with upgrade/downgrade test logic for multiple partition assignment strategies
- Enhanced cluster fixture with topic deletion capability and improved message seeding
- Refactored common utility functions for better group protocol configuration handling
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
File | Description |
---|---|
tests/integration/consumer/test_consumer_upgrade_downgrade.py | New test file implementing consumer protocol upgrade/downgrade tests with partition assignment validation |
tests/integration/cluster_fixture.py | Added delete_topic method and improved seed_topic to generate proper test keys |
tests/common/init.py | Refactored group protocol configuration utilities with new helper method and improved conditional logic |
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment appears to be truncated. It should likely read '# limitations under the License.' to complete the Apache License header.
# limit | |
# limitations under the License. |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it's truncated, please fix
# def produce_messages(producer, topic, partitions, num_messages): | ||
# for i in range(num_messages): | ||
# key = "key-{}".format(i) | ||
# value = "value-{}".format(i) | ||
# partition = i % partitions | ||
# producer.produce(topic, key=key, value=value, partition=partition) | ||
# producer.flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This commented-out function should be removed as it's not being used and adds clutter to the codebase.
# def produce_messages(producer, topic, partitions, num_messages): | |
# for i in range(num_messages): | |
# key = "key-{}".format(i) | |
# value = "value-{}".format(i) | |
# partition = i % partitions | |
# producer.produce(topic, key=key, value=value, partition=partition) | |
# producer.flush() | |
# Removed the commented-out `produce_messages` function to reduce clutter. |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove it as you're using seed_topic
""" | ||
Test consumer upgrade and downgrade. | ||
""" | ||
topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method name contains a typo: 'propogation' should be 'propagation'.
topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix, | |
topic = kafka_cluster.create_topic_and_wait_propagation(topic_prefix, |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this time let's fix it instead of propagating the typo. It's fine given it's not public API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use a different topic
name, given it's also the group name.
topic_prefix + partition_assignment_strategy
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pranavrth ! Here are my comments:
""" | ||
Test consumer upgrade and downgrade. | ||
""" | ||
topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this time let's fix it instead of propagating the typo. It's fine given it's not public API.
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it's truncated, please fix
# def produce_messages(producer, topic, partitions, num_messages): | ||
# for i in range(num_messages): | ||
# key = "key-{}".format(i) | ||
# value = "value-{}".format(i) | ||
# partition = i % partitions | ||
# producer.produce(topic, key=key, value=value, partition=partition) | ||
# producer.flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove it as you're using seed_topic
@@ -273,7 +286,7 @@ def seed_topic(self, topic, value_source=None, key_source=None, header_source=No | |||
value_source = ['test-data{}'.format(i) for i in range(0, 100)] | |||
|
|||
if key_source is None: | |||
key_source = [None] | |||
key_source = ['test-key{}'.format(i) for i in range(0, 100)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's avoid changing this function that is used in other places and pass a key_source
and value_source
this time, or otherwise if you change it let's change the other invocations so you don't add keys to those tests.
|
||
# Produce some messages to the topic | ||
kafka_cluster.seed_topic(topic) | ||
list_offsets(admin_client, topic, number_of_partitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The list_offsets
can be removed given we're producing and then consuming all messages, it'll be necessary all of them were produced.
|
||
def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_protocol): | ||
total_msg_read = 0 | ||
while len(consumers[-1].assignment()) != number_of_partitions // len(consumers): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
number_of_partitions // len(consumers)
is used multiple times you can add a variable expected_partions_per_consumer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it's checking only the last consumer assignment and it works only because the consumers are two and the number of partitions can be evenly divided by the number of consumers. To be generic you can check for all consumers that diff = len(consumer.assignment()) - expected_partions_per_consumer
is 0 or 1 for all consumers.
consumer.poll(0.1) | ||
|
||
for consumer in consumers: | ||
assert len(consumer.assignment()) == number_of_partitions // len(consumers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This way we're only checking that they've the expected number of partitions but it could be there a duplicate assignments and partitions not assigned and the count becomes correct even if the assignment is not. You can add to a set called for example all_partitions
and verify that len(all_partitions) == number_of_partitions
(no unassigned partition), you can also verify that the total sum of len(assignment)
equals to number_of_partitions
(no duplicate assignment)
consumer_conf = {'group.id': topic, | ||
'auto.offset.reset': 'earliest', | ||
'group.protocol': 'classic'} | ||
consumer_conf['partition.assignment.strategy'] = partition_assignment_strategy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of deleting and replacing properties on the dict you could create two dicts
consumer_conf = {'group.id': topic,
'auto.offset.reset': 'earliest'}
classic_consumer_conf = {
'group.protocol': 'classic',
'partition.assignment.strategy': partition_assignment_strategy,
**consumer_conf
}
consumer_consumer_conf = {
'group.protocol': 'consumer',
**consumer_conf
}
consumer = kafka_cluster.consumer(consumer_conf) | ||
assert consumer is not None | ||
consumer.subscribe([topic]) | ||
check_consumer(kafka_cluster, [consumer], admin_client, topic, ConsumerGroupType.CLASSIC) | ||
del consumer_conf['partition.assignment.strategy'] | ||
|
||
# Now simulate an upgrade by creating a new consumer with 'consumer' protocol | ||
consumer_conf['group.protocol'] = 'consumer' | ||
consumer2 = kafka_cluster.consumer(consumer_conf) | ||
assert consumer2 is not None | ||
consumer2.subscribe([topic]) | ||
check_consumer(kafka_cluster, [consumer, consumer2], admin_client, topic, ConsumerGroupType.CONSUMER) | ||
|
||
# Now simulate a downgrade by deleting the second consumer and keeping only 'classic' consumer | ||
consumer2.close() | ||
check_consumer(kafka_cluster, [consumer], admin_client, topic, ConsumerGroupType.CLASSIC) | ||
|
||
consumer.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part is limited to two consumers and it's not checking the case where all classic consumers are removed.
What you can do is having more consumer, let's say consumers_cnt = 5
then, running check_consumer
after each step:
- first you add all classic ones, you verify the protocol that is
CLASSIC
- one by one you
pop
the first consumer, close it and add a new consumer withCONSUMER
protocol. you verify the protocol that isCONSUMER
because it changes on first consumer added. - when all consumers are with new protocol you do the same, this time it replaces
consumer
protocol consumers withclassic
consumers. You verify that the protocol isCONSUMER
until after last replacement where it returns toCLASSIC
. - you close all consumers for next test run.
""" | ||
Test consumer upgrade and downgrade. | ||
""" | ||
topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use a different topic
name, given it's also the group name.
topic_prefix + partition_assignment_strategy
.
No description provided.