Skip to content

Conversation

pranavrth
Copy link
Member

No description provided.

@Copilot Copilot AI review requested due to automatic review settings July 17, 2025 07:11
@pranavrth pranavrth requested review from MSeal and a team as code owners July 17, 2025 07:11
@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@pranavrth pranavrth changed the title Added online upgrade and downgrade test [KIP-848] Added online upgrade and downgrade test Jul 17, 2025
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds comprehensive integration tests for Kafka consumer upgrade and downgrade scenarios, specifically testing the transition between classic and consumer group protocols. The tests verify that consumers can successfully upgrade from classic protocol to consumer protocol and downgrade back, while maintaining proper partition assignment and message consumption throughout the process.

Key changes:

  • Added new integration test file with upgrade/downgrade test logic for multiple partition assignment strategies
  • Enhanced cluster fixture with topic deletion capability and improved message seeding
  • Refactored common utility functions for better group protocol configuration handling

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.

File Description
tests/integration/consumer/test_consumer_upgrade_downgrade.py New test file implementing consumer protocol upgrade/downgrade tests with partition assignment validation
tests/integration/cluster_fixture.py Added delete_topic method and improved seed_topic to generate proper test keys
tests/common/init.py Refactored group protocol configuration utilities with new helper method and improved conditional logic

# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limit
Copy link
Preview

Copilot AI Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment appears to be truncated. It should likely read '# limitations under the License.' to complete the Apache License header.

Suggested change
# limit
# limitations under the License.

Copilot uses AI. Check for mistakes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's truncated, please fix

Comment on lines +56 to +62
# def produce_messages(producer, topic, partitions, num_messages):
# for i in range(num_messages):
# key = "key-{}".format(i)
# value = "value-{}".format(i)
# partition = i % partitions
# producer.produce(topic, key=key, value=value, partition=partition)
# producer.flush()
Copy link
Preview

Copilot AI Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commented-out function should be removed as it's not being used and adds clutter to the codebase.

Suggested change
# def produce_messages(producer, topic, partitions, num_messages):
# for i in range(num_messages):
# key = "key-{}".format(i)
# value = "value-{}".format(i)
# partition = i % partitions
# producer.produce(topic, key=key, value=value, partition=partition)
# producer.flush()
# Removed the commented-out `produce_messages` function to reduce clutter.

Copilot uses AI. Check for mistakes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove it as you're using seed_topic

"""
Test consumer upgrade and downgrade.
"""
topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix,
Copy link
Preview

Copilot AI Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method name contains a typo: 'propogation' should be 'propagation'.

Suggested change
topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix,
topic = kafka_cluster.create_topic_and_wait_propagation(topic_prefix,

Copilot uses AI. Check for mistakes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this time let's fix it instead of propagating the typo. It's fine given it's not public API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use a different topic name, given it's also the group name.
topic_prefix + partition_assignment_strategy.

Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@sonarqube-confluent
Copy link

Passed

Analysis Details

4 Issues

  • Bug 0 Bugs
  • Vulnerability 0 Vulnerabilities
  • Code Smell 4 Code Smells

Coverage and Duplications

  • Coverage No coverage information (66.00% Estimated after merge)
  • Duplications No duplication information (5.20% Estimated after merge)

Project ID: confluent-kafka-python

View in SonarQube

Copy link
Contributor

@emasab emasab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pranavrth ! Here are my comments:

"""
Test consumer upgrade and downgrade.
"""
topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this time let's fix it instead of propagating the typo. It's fine given it's not public API.

# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's truncated, please fix

Comment on lines +56 to +62
# def produce_messages(producer, topic, partitions, num_messages):
# for i in range(num_messages):
# key = "key-{}".format(i)
# value = "value-{}".format(i)
# partition = i % partitions
# producer.produce(topic, key=key, value=value, partition=partition)
# producer.flush()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove it as you're using seed_topic

@@ -273,7 +286,7 @@ def seed_topic(self, topic, value_source=None, key_source=None, header_source=No
value_source = ['test-data{}'.format(i) for i in range(0, 100)]

if key_source is None:
key_source = [None]
key_source = ['test-key{}'.format(i) for i in range(0, 100)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid changing this function that is used in other places and pass a key_source and value_source this time, or otherwise if you change it let's change the other invocations so you don't add keys to those tests.


# Produce some messages to the topic
kafka_cluster.seed_topic(topic)
list_offsets(admin_client, topic, number_of_partitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The list_offsets can be removed given we're producing and then consuming all messages, it'll be necessary all of them were produced.


def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_protocol):
total_msg_read = 0
while len(consumers[-1].assignment()) != number_of_partitions // len(consumers):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

number_of_partitions // len(consumers) is used multiple times you can add a variable expected_partions_per_consumer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it's checking only the last consumer assignment and it works only because the consumers are two and the number of partitions can be evenly divided by the number of consumers. To be generic you can check for all consumers that diff = len(consumer.assignment()) - expected_partions_per_consumer is 0 or 1 for all consumers.

consumer.poll(0.1)

for consumer in consumers:
assert len(consumer.assignment()) == number_of_partitions // len(consumers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way we're only checking that they've the expected number of partitions but it could be there a duplicate assignments and partitions not assigned and the count becomes correct even if the assignment is not. You can add to a set called for example all_partitions and verify that len(all_partitions) == number_of_partitions (no unassigned partition), you can also verify that the total sum of len(assignment) equals to number_of_partitions (no duplicate assignment)

Comment on lines +101 to +104
consumer_conf = {'group.id': topic,
'auto.offset.reset': 'earliest',
'group.protocol': 'classic'}
consumer_conf['partition.assignment.strategy'] = partition_assignment_strategy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of deleting and replacing properties on the dict you could create two dicts

    consumer_conf = {'group.id': topic,
                     'auto.offset.reset': 'earliest'}
    classic_consumer_conf = {
        'group.protocol': 'classic',
        'partition.assignment.strategy': partition_assignment_strategy,
        **consumer_conf
    }
    consumer_consumer_conf = {
        'group.protocol': 'consumer',
        **consumer_conf
    }

Comment on lines +105 to +122
consumer = kafka_cluster.consumer(consumer_conf)
assert consumer is not None
consumer.subscribe([topic])
check_consumer(kafka_cluster, [consumer], admin_client, topic, ConsumerGroupType.CLASSIC)
del consumer_conf['partition.assignment.strategy']

# Now simulate an upgrade by creating a new consumer with 'consumer' protocol
consumer_conf['group.protocol'] = 'consumer'
consumer2 = kafka_cluster.consumer(consumer_conf)
assert consumer2 is not None
consumer2.subscribe([topic])
check_consumer(kafka_cluster, [consumer, consumer2], admin_client, topic, ConsumerGroupType.CONSUMER)

# Now simulate a downgrade by deleting the second consumer and keeping only 'classic' consumer
consumer2.close()
check_consumer(kafka_cluster, [consumer], admin_client, topic, ConsumerGroupType.CLASSIC)

consumer.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is limited to two consumers and it's not checking the case where all classic consumers are removed.
What you can do is having more consumer, let's say consumers_cnt = 5 then, running check_consumer after each step:

  1. first you add all classic ones, you verify the protocol that is CLASSIC
  2. one by one you pop the first consumer, close it and add a new consumer with CONSUMER protocol. you verify the protocol that is CONSUMER because it changes on first consumer added.
  3. when all consumers are with new protocol you do the same, this time it replaces consumer protocol consumers with classic consumers. You verify that the protocol is CONSUMER until after last replacement where it returns to CLASSIC.
  4. you close all consumers for next test run.

"""
Test consumer upgrade and downgrade.
"""
topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use a different topic name, given it's also the group name.
topic_prefix + partition_assignment_strategy.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants