Skip to content

Commit 71fdab1

Browse files
MINOR: describeTopics should pass the timeout to the describeCluster call (#20375)
This PR ensures that describeTopics correctly propagates its timeoutMs setting to the underlying describeCluster call. Integration tests were added to verify that the API now fails with a TimeoutException when brokers do not respond within the configured timeout. Reviewers: Ken Huang <[email protected]>, TengYao Chi <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 4e0d8c9 commit 71fdab1

File tree

2 files changed

+26
-2
lines changed

2 files changed

+26
-2
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2334,7 +2334,7 @@ private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNamesWi
23342334
}
23352335

23362336
// First, we need to retrieve the node info.
2337-
DescribeClusterResult clusterResult = describeCluster();
2337+
DescribeClusterResult clusterResult = describeCluster(new DescribeClusterOptions().timeoutMs(options.timeoutMs()));
23382338
clusterResult.nodes().whenComplete(
23392339
(nodes, exception) -> {
23402340
if (exception != null) {

clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,8 @@ public void testCloseAdminClient() {
551551
* Test if admin client can be closed in the callback invoked when
552552
* an api call completes. If calling {@link Admin#close()} in callback, AdminClient thread hangs
553553
*/
554-
@Test @Timeout(10)
554+
@Test
555+
@Timeout(10)
555556
public void testCloseAdminClientInCallback() throws InterruptedException {
556557
MockTime time = new MockTime();
557558
AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, mockCluster(3, 0));
@@ -11668,4 +11669,27 @@ private static StreamsGroupDescribeResponseData makeFullStreamsGroupDescribeResp
1166811669
.setAssignmentEpoch(1));
1166911670
return data;
1167011671
}
11672+
11673+
@Test
11674+
@Timeout(30)
11675+
public void testDescribeTopicsTimeoutWhenNoBrokerResponds() throws Exception {
11676+
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(
11677+
mockCluster(1, 0),
11678+
AdminClientConfig.RETRIES_CONFIG, "0",
11679+
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000")) {
11680+
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
11681+
11682+
// Not using prepareResponse is equivalent to "no brokers respond".
11683+
long start = System.currentTimeMillis();
11684+
DescribeTopicsResult result = env.adminClient().describeTopics(List.of("test-topic"), new DescribeTopicsOptions().timeoutMs(200));
11685+
Map<String, KafkaFuture<TopicDescription>> topicDescriptionMap = result.topicNameValues();
11686+
KafkaFuture<TopicDescription> topicDescription = topicDescriptionMap.get("test-topic");
11687+
ExecutionException exception = assertThrows(ExecutionException.class, topicDescription::get);
11688+
// Duration should be greater than or equal to 200 ms but less than 30000 ms.
11689+
long duration = System.currentTimeMillis() - start;
11690+
11691+
assertInstanceOf(TimeoutException.class, exception.getCause());
11692+
assertTrue(duration >= 150L && duration < 30000);
11693+
}
11694+
}
1167111695
}

0 commit comments

Comments
 (0)