Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 63 additions & 55 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -5415,6 +5415,64 @@ rd_kafka_cgrp_subscription_set(rd_kafka_cgrp_t *rkcg,
return rd_atomic32_add(&rkcg->rkcg_subscription_version, 1);
}

static rd_bool_t
rd_kafka_cgrp_stale_subscribed_topics_cache(rd_kafka_cgrp_t *rkcg) {
rd_bool_t stale = rd_false;
int32_t current_subscription_version =
rd_atomic32_get(&rkcg->rkcg_subscription_version);
rd_ts_t current_ts_metadata = rkcg->rkcg_rk->rk_ts_metadata;

stale = rkcg->rkcg_subscribed_topics_cache.rk_ts_metadata !=
current_ts_metadata ||
rkcg->rkcg_subscribed_topics_cache.subscription_version !=
current_subscription_version;
if (stale) {
rkcg->rkcg_subscribed_topics_cache.rk_ts_metadata =
current_ts_metadata;
rkcg->rkcg_subscribed_topics_cache.subscription_version =
current_subscription_version;
}
return stale;
}

static rd_bool_t
rd_kafka_cgrp_set_subscribed_topics_from_subscription(rd_kafka_cgrp_t *rkcg) {
rd_list_t *tinfos;
rd_kafka_topic_partition_list_t *errored;

if (!rd_kafka_cgrp_stale_subscribed_topics_cache(rkcg))
return rd_false; /* Not stale, no change */

/*
* Unmatched topics will be added to the errored list.
*/
errored = rd_kafka_topic_partition_list_new(0);

/*
* Create a list of the topics in metadata that matches our subscription
*/
tinfos = rd_list_new(rkcg->rkcg_subscription->cnt,
rd_kafka_topic_info_destroy_free);

if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
rd_kafka_metadata_topic_match(rkcg->rkcg_rk, tinfos,
rkcg->rkcg_subscription, errored);
else
rd_kafka_metadata_topic_filter(
rkcg->rkcg_rk, tinfos, rkcg->rkcg_subscription, errored);

/*
* Propagate consumer errors for any non-existent or errored topics.
* The function takes ownership of errored.
*/
rd_kafka_propagate_consumer_topic_errors(
rkcg, errored, "Subscribed topic not available");

/*
* Update effective list of topics (takes ownership of \c tinfos)
*/
return rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos);
}

/**
* @brief Handle a new subscription that is modifying an existing subscription
Expand All @@ -5427,11 +5485,10 @@ rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg,
rd_kafka_topic_partition_list_t *rktparlist) {
rd_kafka_topic_partition_list_t *unsubscribing_topics;
rd_kafka_topic_partition_list_t *revoking;
rd_list_t *tinfos;
rd_kafka_topic_partition_list_t *errored;
int metadata_age;
int old_cnt = rkcg->rkcg_subscription->cnt;
int32_t cgrp_subscription_version;
rd_bool_t changed = rd_false;

/* Topics in rkcg_subscribed_topics that don't match any pattern in
the new subscription. */
Expand Down Expand Up @@ -5486,27 +5543,10 @@ rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg,
if (unsubscribing_topics)
rd_kafka_topic_partition_list_destroy(unsubscribing_topics);

/* Create a list of the topics in metadata that matches the new
* subscription */
tinfos = rd_list_new(rkcg->rkcg_subscription->cnt,
rd_kafka_topic_info_destroy_free);

/* Unmatched topics will be added to the errored list. */
errored = rd_kafka_topic_partition_list_new(0);

if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
rd_kafka_metadata_topic_match(rkcg->rkcg_rk, tinfos,
rkcg->rkcg_subscription, errored);
else
rd_kafka_metadata_topic_filter(
rkcg->rkcg_rk, tinfos, rkcg->rkcg_subscription, errored);

/* Propagate consumer errors for any non-existent or errored topics.
* The function takes ownership of errored. */
rd_kafka_propagate_consumer_topic_errors(
rkcg, errored, "Subscribed topic not available");
changed = rd_kafka_cgrp_set_subscribed_topics_from_subscription(rkcg);

if (rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos) && !revoking) {
if (changed && !revoking) {
rd_kafka_cgrp_rejoin(rkcg, "Subscription modified");
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
Expand Down Expand Up @@ -6900,7 +6940,6 @@ rd_kafka_cgrp_owned_but_not_exist_partitions(rd_kafka_cgrp_t *rkcg) {
return result;
}


/**
* @brief Check if the latest metadata affects the current subscription:
* - matched topic added
Expand All @@ -6912,9 +6951,7 @@ rd_kafka_cgrp_owned_but_not_exist_partitions(rd_kafka_cgrp_t *rkcg) {
*/
void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg,
rd_bool_t do_join) {
rd_list_t *tinfos;
rd_kafka_topic_partition_list_t *errored;
rd_bool_t changed;
rd_bool_t changed = rd_false;

rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread));

Expand All @@ -6924,36 +6961,7 @@ void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg,
if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0)
return;

/*
* Unmatched topics will be added to the errored list.
*/
errored = rd_kafka_topic_partition_list_new(0);

/*
* Create a list of the topics in metadata that matches our subscription
*/
tinfos = rd_list_new(rkcg->rkcg_subscription->cnt,
rd_kafka_topic_info_destroy_free);

if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
rd_kafka_metadata_topic_match(rkcg->rkcg_rk, tinfos,
rkcg->rkcg_subscription, errored);
else
rd_kafka_metadata_topic_filter(
rkcg->rkcg_rk, tinfos, rkcg->rkcg_subscription, errored);


/*
* Propagate consumer errors for any non-existent or errored topics.
* The function takes ownership of errored.
*/
rd_kafka_propagate_consumer_topic_errors(
rkcg, errored, "Subscribed topic not available");

/*
* Update effective list of topics (takes ownership of \c tinfos)
*/
changed = rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos);
changed = rd_kafka_cgrp_set_subscribed_topics_from_subscription(rkcg);

if (!do_join ||
(!changed &&
Expand Down
11 changes: 11 additions & 0 deletions src/rdkafka_cgrp.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,17 @@ typedef struct rd_kafka_cgrp_s {

rd_atomic32_t rkcg_subscription_version; /**< Subscription version */

struct {
int32_t subscription_version; /**< Version of
* rkcg_subscription
* at time of
* last change. */
rd_ts_t rk_ts_metadata; /**< Timestamp of
* last metadata
* request at last
* change. */
} rkcg_subscribed_topics_cache;

/* Protected by rd_kafka_*lock() */
struct {
rd_ts_t ts_rebalance; /* Timestamp of
Expand Down
46 changes: 34 additions & 12 deletions src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,7 @@ rd_kafka_parse_Metadata_admin(rd_kafka_broker_t *rkb,
"(admin request)");
}

typedef RD_MAP_TYPE(const char *, const char *) map_str_str_t;

/**
* @brief Add all topics in current cached full metadata
Expand All @@ -1226,7 +1227,7 @@ rd_kafka_parse_Metadata_admin(rd_kafka_broker_t *rkb,
* an available topic will be added to this list with
* the appropriate error set.
*
* @returns the number of topics matched and added to \p list
* @returns the number of topics matched and added to \p tinfos
*
* @locks none
* @locality any
Expand All @@ -1236,13 +1237,16 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk,
rd_list_t *tinfos,
const rd_kafka_topic_partition_list_t *match,
rd_kafka_topic_partition_list_t *errored) {
int ti, i;
int i;
size_t cnt = 0;
rd_kafka_topic_partition_list_t *unmatched;
rd_list_t cached_topics;
const char *topic;
const struct rd_kafka_metadata_cache_entry *rkmce;
map_str_str_t map;

rd_kafka_rdlock(rk);
map = (map_str_str_t)RD_MAP_INITIALIZER(
0, rd_map_str_cmp, rd_map_str_hash, NULL /* topic list element */,
NULL /* topic list element */);
/* To keep track of which patterns and topics in `match` that
* did not match any topic (or matched an errored topic), we
* create a set of all topics to match in `unmatched` and then
Expand All @@ -1253,16 +1257,25 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk,

/* For each topic in the cluster, scan through the match list
* to find matching topic. */
rd_list_init(&cached_topics, rk->rk_metadata_cache.rkmc_cnt, rd_free);
rd_kafka_metadata_cache_topics_to_list(rk, &cached_topics, rd_false);
RD_LIST_FOREACH(topic, &cached_topics, ti) {
TAILQ_FOREACH(rkmce, &rk->rk_metadata_cache.rkmc_expiry, rkmce_link) {
const rd_kafka_metadata_topic_internal_t *mdti;
const rd_kafka_metadata_topic_t *mdt =
rd_kafka_metadata_cache_topic_get(rk, topic, &mdti,
rd_true /* valid */);
if (!mdt)
const rd_kafka_metadata_topic_t *mdt;
const char *topic = rkmce->rkmce_mtopic.topic;
rd_bool_t matched = rd_false;

if (!RD_KAFKA_METADATA_CACHE_VALID(rkmce) || !topic ||
RD_MAP_GET(&map, topic))
/* We could have multiple cache entries
* with different topic id and same topic name
* in some cases */
continue;

RD_MAP_SET(&map, topic, topic);

mdt = &rkmce->rkmce_mtopic;
Copy link

Copilot AI Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Extra space between 'mdt' and '=' creates inconsistent formatting. Should be single space like other assignments.

Suggested change
mdt = &rkmce->rkmce_mtopic;
mdt = &rkmce->rkmce_mtopic;

Copilot uses AI. Check for mistakes.

mdti = &rkmce->rkmce_metadata_internal_topic;


/* Ignore topics in blacklist */
if (rk->rk_conf.topic_blacklist &&
rd_kafka_pattern_match(rk->rk_conf.topic_blacklist, topic))
Expand All @@ -1279,6 +1292,15 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk,
unmatched, match->elems[i].topic,
RD_KAFKA_PARTITION_UA);

if (matched)
/*
* Just remove it from unmatched.
* Topic was already added to
* `tinfos` or `errored`.
*/
continue;
matched = rd_true;

if (mdt->err) {
rd_kafka_topic_partition_list_add(
errored, topic, RD_KAFKA_PARTITION_UA)
Expand All @@ -1293,6 +1315,7 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk,
cnt++;
}
}
RD_MAP_DESTROY(&map);
rd_kafka_rdunlock(rk);

/* Any topics/patterns still in unmatched did not match any
Expand All @@ -1306,7 +1329,6 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk,
}

rd_kafka_topic_partition_list_destroy(unmatched);
rd_list_destroy(&cached_topics);

return cnt;
}
Expand Down