Skip to content

Commit 2ae902a

Browse files
author
Andrew Choi
authored
Use createTopicIfNotExist instead of AdminClient (#248)
Use createTopicIfNotExist instead of AdminClient connects the _topicFactory interface’s createTopicIfNotExist. 🙏🏻 Signed-off-by: Andrew Choi <[email protected]>
1 parent ffa7d59 commit 2ae902a

File tree

3 files changed

+15
-12
lines changed

3 files changed

+15
-12
lines changed

src/main/java/com/linkedin/kmf/common/Utils.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.avro.io.Encoder;
3232
import org.apache.avro.io.JsonEncoder;
3333
import org.apache.kafka.clients.admin.AdminClient;
34+
import org.apache.kafka.clients.admin.CreateTopicsResult;
3435
import org.apache.kafka.clients.admin.NewTopic;
3536
import org.apache.kafka.common.errors.TopicExistsException;
3637
import org.json.JSONObject;
@@ -98,7 +99,12 @@ public static int createTopicIfNotExists(String topic, short replicationFactor,
9899

99100
List<NewTopic> topics = new ArrayList<>();
100101
topics.add(newTopic);
101-
adminClient.createTopics(topics);
102+
CreateTopicsResult result = adminClient.createTopics(topics);
103+
104+
// waits for this topic creation future to complete, and then returns its result.
105+
result.values().get(topic).get();
106+
LOG.info("CreateTopicsResult: {}.", result.values());
107+
102108
} catch (TopicExistsException e) {
103109
/* There is a race condition with the consumer. */
104110
LOG.debug("Monitoring topic " + topic + " already exists in the cluster.", e);

src/main/java/com/linkedin/kmf/services/MultiClusterTopicManagementService.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import kafka.zk.KafkaZkClient;
3737
import org.apache.kafka.clients.admin.AdminClient;
3838
import org.apache.kafka.clients.admin.AdminClientConfig;
39-
import org.apache.kafka.clients.admin.CreateTopicsResult;
4039
import org.apache.kafka.clients.admin.ElectLeadersOptions;
4140
import org.apache.kafka.clients.admin.ElectLeadersResult;
4241
import org.apache.kafka.clients.admin.NewPartitions;
@@ -230,7 +229,6 @@ static class TopicManagementHelper {
230229
private final int _replicationFactor;
231230
private final double _minPartitionsToBrokersRatio;
232231
private final int _minPartitionNum;
233-
private final TopicFactory _topicFactory;
234232
private final Properties _topicProperties;
235233
private boolean _preferredLeaderElectionRequested;
236234
private final int _requestTimeoutMs;
@@ -240,6 +238,7 @@ static class TopicManagementHelper {
240238
boolean _topicCreationEnabled;
241239
AdminClient _adminClient;
242240
String _topic;
241+
TopicFactory _topicFactory;
243242

244243

245244
@SuppressWarnings("unchecked")
@@ -277,11 +276,8 @@ void maybeCreateTopic() throws Exception {
277276
int numPartitions = Math.max((int) Math.ceil(brokerCount * _minPartitionsToBrokersRatio), minPartitionNum());
278277
NewTopic newTopic = new NewTopic(_topic, numPartitions, (short) _replicationFactor);
279278
newTopic.configs((Map) _topicProperties);
280-
CreateTopicsResult createTopicsResult = _adminClient.createTopics(Collections.singletonList(newTopic));
281-
282-
// waits for this topic creation future to complete, and then returns its result.
283-
createTopicsResult.values().get(_topic).get();
284-
LOGGER.info("CreateTopicsResult: {}.", createTopicsResult.values());
279+
_topicFactory.createTopicIfNotExist(_topic, (short) _replicationFactor, _minPartitionsToBrokersRatio,
280+
_topicProperties, _adminClient);
285281
}
286282
}
287283

src/test/java/com/linkedin/kmf/services/MultiClusterTopicManagementServiceTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
package com.linkedin.kmf.services;
1212

13+
import com.linkedin.kmf.topicfactory.TopicFactory;
1314
import java.util.Collections;
1415
import java.util.HashSet;
1516
import java.util.Map;
@@ -59,6 +60,7 @@ private void startTest() {
5960
_topicManagementHelper = Mockito.mock(MultiClusterTopicManagementService.TopicManagementHelper.class);
6061
_topicManagementHelper._topic = SERVICE_TEST_TOPIC;
6162
_topicManagementHelper._adminClient = Mockito.mock(AdminClient.class);
63+
_topicManagementHelper._topicFactory = Mockito.mock(TopicFactory.class);
6264
_topicManagementHelper._topicCreationEnabled = true;
6365
}
6466

@@ -94,6 +96,7 @@ protected void MultiClusterTopicManagementServiceTopicCreationTest() throws Exce
9496
*/
9597
@Override
9698
public Void answer(InvocationOnMock invocation) throws Throwable {
99+
97100
Mockito.when(_topicManagementHelper._adminClient.describeTopics(Collections.singleton(SERVICE_TEST_TOPIC)))
98101
.thenReturn(Mockito.mock(DescribeTopicsResult.class));
99102
Mockito.when(
@@ -115,10 +118,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
115118
}
116119
};
117120

118-
Mockito.when(_topicManagementHelper._adminClient.createTopics(Mockito.anyCollection())
119-
.values()
120-
.get(SERVICE_TEST_TOPIC)
121-
.get()).thenAnswer(createKafkaTopicFutureAnswer);
121+
Mockito.when(_topicManagementHelper._topicFactory.createTopicIfNotExist(Mockito.anyString(), Mockito.anyShort(),
122+
Mockito.anyDouble(), Mockito.any(), Mockito.any())).thenAnswer(createKafkaTopicFutureAnswer);
122123

123124
_topicManagementHelper.maybeCreateTopic();
124125

0 commit comments

Comments
 (0)