diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index e0aa01bd1..a65976c7b 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -5415,6 +5415,64 @@ rd_kafka_cgrp_subscription_set(rd_kafka_cgrp_t *rkcg, return rd_atomic32_add(&rkcg->rkcg_subscription_version, 1); } +static rd_bool_t +rd_kafka_cgrp_stale_subscribed_topics_cache(rd_kafka_cgrp_t *rkcg) { + rd_bool_t stale = rd_false; + int32_t current_subscription_version = + rd_atomic32_get(&rkcg->rkcg_subscription_version); + rd_ts_t current_ts_metadata = rkcg->rkcg_rk->rk_ts_metadata; + + stale = rkcg->rkcg_subscribed_topics_cache.rk_ts_metadata != + current_ts_metadata || + rkcg->rkcg_subscribed_topics_cache.subscription_version != + current_subscription_version; + if (stale) { + rkcg->rkcg_subscribed_topics_cache.rk_ts_metadata = + current_ts_metadata; + rkcg->rkcg_subscribed_topics_cache.subscription_version = + current_subscription_version; + } + return stale; +} + +static rd_bool_t +rd_kafka_cgrp_set_subscribed_topics_from_subscription(rd_kafka_cgrp_t *rkcg) { + rd_list_t *tinfos; + rd_kafka_topic_partition_list_t *errored; + + if (!rd_kafka_cgrp_stale_subscribed_topics_cache(rkcg)) + return rd_false; /* Not stale, no change */ + + /* + * Unmatched topics will be added to the errored list. + */ + errored = rd_kafka_topic_partition_list_new(0); + + /* + * Create a list of the topics in metadata that matches our subscription + */ + tinfos = rd_list_new(rkcg->rkcg_subscription->cnt, + rd_kafka_topic_info_destroy_free); + + if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) + rd_kafka_metadata_topic_match(rkcg->rkcg_rk, tinfos, + rkcg->rkcg_subscription, errored); + else + rd_kafka_metadata_topic_filter( + rkcg->rkcg_rk, tinfos, rkcg->rkcg_subscription, errored); + + /* + * Propagate consumer errors for any non-existent or errored topics. + * The function takes ownership of errored. + */ + rd_kafka_propagate_consumer_topic_errors( + rkcg, errored, "Subscribed topic not available"); + + /* + * Update effective list of topics (takes ownership of \c tinfos) + */ + return rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos); +} /** * @brief Handle a new subscription that is modifying an existing subscription @@ -5427,11 +5485,10 @@ rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *rktparlist) { rd_kafka_topic_partition_list_t *unsubscribing_topics; rd_kafka_topic_partition_list_t *revoking; - rd_list_t *tinfos; - rd_kafka_topic_partition_list_t *errored; int metadata_age; int old_cnt = rkcg->rkcg_subscription->cnt; int32_t cgrp_subscription_version; + rd_bool_t changed = rd_false; /* Topics in rkcg_subscribed_topics that don't match any pattern in the new subscription. */ @@ -5486,27 +5543,10 @@ rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg, if (unsubscribing_topics) rd_kafka_topic_partition_list_destroy(unsubscribing_topics); - /* Create a list of the topics in metadata that matches the new - * subscription */ - tinfos = rd_list_new(rkcg->rkcg_subscription->cnt, - rd_kafka_topic_info_destroy_free); - - /* Unmatched topics will be added to the errored list. */ - errored = rd_kafka_topic_partition_list_new(0); - if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) - rd_kafka_metadata_topic_match(rkcg->rkcg_rk, tinfos, - rkcg->rkcg_subscription, errored); - else - rd_kafka_metadata_topic_filter( - rkcg->rkcg_rk, tinfos, rkcg->rkcg_subscription, errored); - - /* Propagate consumer errors for any non-existent or errored topics. - * The function takes ownership of errored. */ - rd_kafka_propagate_consumer_topic_errors( - rkcg, errored, "Subscribed topic not available"); + changed = rd_kafka_cgrp_set_subscribed_topics_from_subscription(rkcg); - if (rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos) && !revoking) { + if (changed && !revoking) { rd_kafka_cgrp_rejoin(rkcg, "Subscription modified"); return RD_KAFKA_RESP_ERR_NO_ERROR; } @@ -6900,7 +6940,6 @@ rd_kafka_cgrp_owned_but_not_exist_partitions(rd_kafka_cgrp_t *rkcg) { return result; } - /** * @brief Check if the latest metadata affects the current subscription: * - matched topic added @@ -6912,9 +6951,7 @@ rd_kafka_cgrp_owned_but_not_exist_partitions(rd_kafka_cgrp_t *rkcg) { */ void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg, rd_bool_t do_join) { - rd_list_t *tinfos; - rd_kafka_topic_partition_list_t *errored; - rd_bool_t changed; + rd_bool_t changed = rd_false; rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread)); @@ -6924,36 +6961,7 @@ void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg, if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0) return; - /* - * Unmatched topics will be added to the errored list. - */ - errored = rd_kafka_topic_partition_list_new(0); - - /* - * Create a list of the topics in metadata that matches our subscription - */ - tinfos = rd_list_new(rkcg->rkcg_subscription->cnt, - rd_kafka_topic_info_destroy_free); - - if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) - rd_kafka_metadata_topic_match(rkcg->rkcg_rk, tinfos, - rkcg->rkcg_subscription, errored); - else - rd_kafka_metadata_topic_filter( - rkcg->rkcg_rk, tinfos, rkcg->rkcg_subscription, errored); - - - /* - * Propagate consumer errors for any non-existent or errored topics. - * The function takes ownership of errored. - */ - rd_kafka_propagate_consumer_topic_errors( - rkcg, errored, "Subscribed topic not available"); - - /* - * Update effective list of topics (takes ownership of \c tinfos) - */ - changed = rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos); + changed = rd_kafka_cgrp_set_subscribed_topics_from_subscription(rkcg); if (!do_join || (!changed && diff --git a/src/rdkafka_cgrp.h b/src/rdkafka_cgrp.h index 79a734f5f..181e91fbe 100644 --- a/src/rdkafka_cgrp.h +++ b/src/rdkafka_cgrp.h @@ -361,6 +361,17 @@ typedef struct rd_kafka_cgrp_s { rd_atomic32_t rkcg_subscription_version; /**< Subscription version */ + struct { + int32_t subscription_version; /**< Version of + * rkcg_subscription + * at time of + * last change. */ + rd_ts_t rk_ts_metadata; /**< Timestamp of + * last metadata + * request at last + * change. */ + } rkcg_subscribed_topics_cache; + /* Protected by rd_kafka_*lock() */ struct { rd_ts_t ts_rebalance; /* Timestamp of diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index d8d35daac..ae0f05ff1 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -1216,6 +1216,7 @@ rd_kafka_parse_Metadata_admin(rd_kafka_broker_t *rkb, "(admin request)"); } +typedef RD_MAP_TYPE(const char *, const char *) map_str_str_t; /** * @brief Add all topics in current cached full metadata @@ -1226,7 +1227,7 @@ rd_kafka_parse_Metadata_admin(rd_kafka_broker_t *rkb, * an available topic will be added to this list with * the appropriate error set. * - * @returns the number of topics matched and added to \p list + * @returns the number of topics matched and added to \p tinfos * * @locks none * @locality any @@ -1236,13 +1237,16 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk, rd_list_t *tinfos, const rd_kafka_topic_partition_list_t *match, rd_kafka_topic_partition_list_t *errored) { - int ti, i; + int i; size_t cnt = 0; rd_kafka_topic_partition_list_t *unmatched; - rd_list_t cached_topics; - const char *topic; + const struct rd_kafka_metadata_cache_entry *rkmce; + map_str_str_t map; rd_kafka_rdlock(rk); + map = (map_str_str_t)RD_MAP_INITIALIZER( + 0, rd_map_str_cmp, rd_map_str_hash, NULL /* topic list element */, + NULL /* topic list element */); /* To keep track of which patterns and topics in `match` that * did not match any topic (or matched an errored topic), we * create a set of all topics to match in `unmatched` and then @@ -1253,16 +1257,25 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk, /* For each topic in the cluster, scan through the match list * to find matching topic. */ - rd_list_init(&cached_topics, rk->rk_metadata_cache.rkmc_cnt, rd_free); - rd_kafka_metadata_cache_topics_to_list(rk, &cached_topics, rd_false); - RD_LIST_FOREACH(topic, &cached_topics, ti) { + TAILQ_FOREACH(rkmce, &rk->rk_metadata_cache.rkmc_expiry, rkmce_link) { const rd_kafka_metadata_topic_internal_t *mdti; - const rd_kafka_metadata_topic_t *mdt = - rd_kafka_metadata_cache_topic_get(rk, topic, &mdti, - rd_true /* valid */); - if (!mdt) + const rd_kafka_metadata_topic_t *mdt; + const char *topic = rkmce->rkmce_mtopic.topic; + rd_bool_t matched = rd_false; + + if (!RD_KAFKA_METADATA_CACHE_VALID(rkmce) || !topic || + RD_MAP_GET(&map, topic)) + /* We could have multiple cache entries + * with different topic id and same topic name + * in some cases */ continue; + RD_MAP_SET(&map, topic, topic); + + mdt = &rkmce->rkmce_mtopic; + mdti = &rkmce->rkmce_metadata_internal_topic; + + /* Ignore topics in blacklist */ if (rk->rk_conf.topic_blacklist && rd_kafka_pattern_match(rk->rk_conf.topic_blacklist, topic)) @@ -1279,6 +1292,15 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk, unmatched, match->elems[i].topic, RD_KAFKA_PARTITION_UA); + if (matched) + /* + * Just remove it from unmatched. + * Topic was already added to + * `tinfos` or `errored`. + */ + continue; + matched = rd_true; + if (mdt->err) { rd_kafka_topic_partition_list_add( errored, topic, RD_KAFKA_PARTITION_UA) @@ -1293,6 +1315,7 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk, cnt++; } } + RD_MAP_DESTROY(&map); rd_kafka_rdunlock(rk); /* Any topics/patterns still in unmatched did not match any @@ -1306,7 +1329,6 @@ rd_kafka_metadata_topic_match(rd_kafka_t *rk, } rd_kafka_topic_partition_list_destroy(unmatched); - rd_list_destroy(&cached_topics); return cnt; }