7373 */
7474@ SuppressWarnings ({"rawtypes" , "unchecked" })
7575public class MultiClusterTopicManagementService implements Service {
76- private static final Logger LOG = LoggerFactory .getLogger (MultiClusterTopicManagementService .class );
76+ private static final Logger LOGGER = LoggerFactory .getLogger (MultiClusterTopicManagementService .class );
7777 private static final String METRIC_GROUP_NAME = "topic-management-service" ;
7878 private final CompletableFuture <Void > _topicPartitionResult = new CompletableFuture <>();
7979 private final AtomicBoolean _isRunning = new AtomicBoolean (false );
@@ -126,15 +126,15 @@ public synchronized void start() {
126126 Runnable pleRunnable = new PreferredLeaderElectionRunnable ();
127127 _executor .scheduleWithFixedDelay (pleRunnable , _preferredLeaderElectionIntervalMs , _preferredLeaderElectionIntervalMs ,
128128 TimeUnit .MILLISECONDS );
129- LOG .info ("{}/MultiClusterTopicManagementService started." , _serviceName );
129+ LOGGER .info ("{}/MultiClusterTopicManagementService started." , _serviceName );
130130 }
131131 }
132132
133133 @ Override
134134 public synchronized void stop () {
135135 if (_isRunning .compareAndSet (true , false )) {
136136 _executor .shutdown ();
137- LOG .info ("{}/MultiClusterTopicManagementService stopped." , _serviceName );
137+ LOGGER .info ("{}/MultiClusterTopicManagementService stopped." , _serviceName );
138138 }
139139 }
140140
@@ -148,9 +148,9 @@ public void awaitShutdown() {
148148 try {
149149 _executor .awaitTermination (Integer .MAX_VALUE , TimeUnit .MILLISECONDS );
150150 } catch (InterruptedException e ) {
151- LOG .info ("Thread interrupted when waiting for {}/MultiClusterTopicManagementService to shutdown" , _serviceName );
151+ LOGGER .info ("Thread interrupted when waiting for {}/MultiClusterTopicManagementService to shutdown" , _serviceName );
152152 }
153- LOG .info ("{}/MultiClusterTopicManagementService shutdown completed" , _serviceName );
153+ LOGGER .info ("{}/MultiClusterTopicManagementService shutdown completed" , _serviceName );
154154 }
155155
156156
@@ -165,7 +165,7 @@ public void run() {
165165 }
166166
167167 /*
168- * The partition number of the monitor topics should be the minimum partition number that satisifies the following conditions:
168+ * The partition number of the monitor topics should be the minimum partition number that satisfies the following conditions:
169169 * - partition number of the monitor topics across all monitored clusters should be the same
170170 * - partitionNum / brokerNum >= user-configured partitionsToBrokersRatio.
171171 * - partitionNum >= user-configured minPartitionNum
@@ -185,13 +185,13 @@ public void run() {
185185 try {
186186 helper .maybeReassignPartitionAndElectLeader ();
187187 } catch (IOException | KafkaException e ) {
188- LOG .warn (_serviceName + "/MultiClusterTopicManagementService will retry later in cluster " + clusterName , e );
188+ LOGGER .warn (_serviceName + "/MultiClusterTopicManagementService will retry later in cluster " + clusterName , e );
189189 }
190190 }
191191 } catch (Throwable t ) {
192192 // Need to catch throwable because there is scala API that can throw NoSuchMethodError in runtime
193193 // and such error is not caught by compilation
194- LOG .error (_serviceName + "/MultiClusterTopicManagementService will stop due to error." , t );
194+ LOGGER .error (_serviceName + "/MultiClusterTopicManagementService will stop due to error." , t );
195195 stop ();
196196 }
197197 }
@@ -211,18 +211,20 @@ public void run() {
211211 try {
212212 helper .maybeElectLeader ();
213213 } catch (IOException | KafkaException e ) {
214- LOG .warn (_serviceName + "/MultiClusterTopicManagementService will retry later in cluster " + clusterName , e );
214+ LOGGER .warn (_serviceName + "/MultiClusterTopicManagementService will retry later in cluster " + clusterName , e );
215215 }
216216 }
217217 } catch (Throwable t ) {
218218 /* Need to catch throwable because there is scala API that can throw NoSuchMethodError in runtime
219219 and such error is not caught by compilation. */
220- LOG .error (_serviceName + "/MultiClusterTopicManagementService will stop due to error." , t );
220+ LOGGER .error (_serviceName
221+ + "/MultiClusterTopicManagementService/PreferredLeaderElectionRunnable will stop due to an error." , t );
221222 stop ();
222223 }
223224 }
224225 }
225226
227+ @ SuppressWarnings ("FieldCanBeLocal" )
226228 static class TopicManagementHelper {
227229 private final boolean _topicCreationEnabled ;
228230 private final String _topic ;
@@ -233,9 +235,9 @@ static class TopicManagementHelper {
233235 private final TopicFactory _topicFactory ;
234236 private final Properties _topicProperties ;
235237 private boolean _preferredLeaderElectionRequested ;
236- private int _requestTimeoutMs ;
237- private List _bootstrapServers ;
238- private final AdminClient _adminClient ;
238+ private final int _requestTimeoutMs ;
239+ private final List _bootstrapServers ;
240+ AdminClient _adminClient ;
239241
240242
241243 @ SuppressWarnings ("unchecked" )
@@ -263,7 +265,7 @@ static class TopicManagementHelper {
263265 _topicFactory = (TopicFactory ) Class .forName (topicFactoryClassName ).getConstructor (Map .class ).newInstance (topicFactoryConfig );
264266
265267 _adminClient = constructAdminClient (props );
266- LOG .info ("{} configs: {}" , _adminClient .getClass ().getSimpleName (), props );
268+ LOGGER .info ("{} configs: {}" , _adminClient .getClass ().getSimpleName (), props );
267269 }
268270
269271 @ SuppressWarnings ("unchecked" )
@@ -274,7 +276,10 @@ void maybeCreateTopic() throws Exception {
274276 NewTopic newTopic = new NewTopic (_topic , numPartitions , (short ) _replicationFactor );
275277 newTopic .configs ((Map ) _topicProperties );
276278 CreateTopicsResult createTopicsResult = _adminClient .createTopics (Collections .singletonList (newTopic ));
277- LOG .info ("CreateTopicsResult: {}." , createTopicsResult .values ());
279+
280+ // waits for this topic creation future to complete, and then returns its result.
281+ createTopicsResult .values ().get (_topic ).get ();
282+ LOGGER .info ("CreateTopicsResult: {}." , createTopicsResult .values ());
278283 }
279284 }
280285
@@ -288,13 +293,14 @@ int minPartitionNum() throws InterruptedException, ExecutionException {
288293 }
289294
290295 void maybeAddPartitions (int minPartitionNum ) throws ExecutionException , InterruptedException {
291- Collection <String > topicNames = _adminClient . listTopics (). names (). get ();
292- Map < String , KafkaFuture < TopicDescription >> kafkaFutureMap = _adminClient .describeTopics (topicNames ).values ();
296+ Map <String , KafkaFuture < TopicDescription >> kafkaFutureMap =
297+ _adminClient .describeTopics (Collections . singleton ( _topic ) ).values ();
293298 KafkaFuture <TopicDescription > topicDescriptions = kafkaFutureMap .get (_topic );
294299 List <TopicPartitionInfo > partitions = topicDescriptions .get ().partitions ();
300+
295301 int partitionNum = partitions .size ();
296302 if (partitionNum < minPartitionNum ) {
297- LOG .info ("{} will increase partition of the topic {} in the cluster from {}"
303+ LOGGER .info ("{} will increase partition of the topic {} in the cluster from {}"
298304 + " to {}." , this .getClass ().toString (), _topic , partitionNum , minPartitionNum );
299305 Set <Integer > blackListedBrokers = _topicFactory .getBlackListedBrokers (_zkConnect );
300306 List <List <Integer >> replicaAssignment = new ArrayList <>(new ArrayList <>());
@@ -339,13 +345,13 @@ void maybeReassignPartitionAndElectLeader() throws Exception {
339345 int expectedReplicationFactor = Math .max (currentReplicationFactor , _replicationFactor );
340346
341347 if (_replicationFactor < currentReplicationFactor )
342- LOG .debug (
348+ LOGGER .debug (
343349 "Configured replication factor {} is smaller than the current replication factor {} of the topic {} in cluster." ,
344350 _replicationFactor , currentReplicationFactor , _topic );
345351
346352 if (expectedReplicationFactor > currentReplicationFactor && !zkClient
347353 .reassignPartitionsInProgress ()) {
348- LOG .info (
354+ LOGGER .info (
349355 "MultiClusterTopicManagementService will increase the replication factor of the topic {} in cluster"
350356 + "from {} to {}" , _topic , currentReplicationFactor , expectedReplicationFactor );
351357 reassignPartitions (zkClient , brokers , _topic , partitionInfoList .size (),
@@ -362,15 +368,15 @@ void maybeReassignPartitionAndElectLeader() throws Exception {
362368 expectedProperties .put (key , _topicProperties .get (key ));
363369
364370 if (!currentProperties .equals (expectedProperties )) {
365- LOG .info ("MultiClusterTopicManagementService will overwrite properties of the topic {} "
371+ LOGGER .info ("MultiClusterTopicManagementService will overwrite properties of the topic {} "
366372 + "in cluster from {} to {}." , _topic , currentProperties , expectedProperties );
367373 zkClient .setOrCreateEntityConfigs (ConfigType .Topic (), _topic , expectedProperties );
368374 }
369375
370376 if (partitionInfoList .size () >= brokers .size () &&
371377 someBrokerNotPreferredLeader (partitionInfoList , brokers ) && !zkClient
372378 .reassignPartitionsInProgress ()) {
373- LOG .info ("{} will reassign partitions of the topic {} in cluster." ,
379+ LOGGER .info ("{} will reassign partitions of the topic {} in cluster." ,
374380 this .getClass ().toString (), _topic );
375381 reassignPartitions (zkClient , brokers , _topic , partitionInfoList .size (),
376382 expectedReplicationFactor );
@@ -380,7 +386,7 @@ void maybeReassignPartitionAndElectLeader() throws Exception {
380386 if (partitionInfoList .size () >= brokers .size () &&
381387 someBrokerNotElectedLeader (partitionInfoList , brokers )) {
382388 if (!partitionReassigned || !zkClient .reassignPartitionsInProgress ()) {
383- LOG .info (
389+ LOGGER .info (
384390 "MultiClusterTopicManagementService will trigger preferred leader election for the topic {} in "
385391 + "cluster." , _topic
386392 );
@@ -403,7 +409,7 @@ void maybeElectLeader() throws Exception {
403409 if (!zkClient .reassignPartitionsInProgress ()) {
404410 List <TopicPartitionInfo > partitionInfoList = _adminClient
405411 .describeTopics (Collections .singleton (_topic )).all ().get ().get (_topic ).partitions ();
406- LOG .info (
412+ LOGGER .info (
407413 "MultiClusterTopicManagementService will trigger requested preferred leader election for the"
408414 + " topic {} in cluster." , _topic );
409415 triggerPreferredLeaderElection (partitionInfoList , _topic );
@@ -424,7 +430,7 @@ private void triggerPreferredLeaderElection(List<TopicPartitionInfo> partitionIn
424430 Set <TopicPartition > topicPartitions = new HashSet <>(partitions );
425431 ElectLeadersResult electLeadersResult = _adminClient .electLeaders (electionType , topicPartitions , newOptions );
426432
427- LOG .info ("{}: triggerPreferredLeaderElection - {}" , this .getClass ().toString (), electLeadersResult .all ().get ());
433+ LOGGER .info ("{}: triggerPreferredLeaderElection - {}" , this .getClass ().toString (), electLeadersResult .all ().get ());
428434 }
429435
430436 private static void reassignPartitions (KafkaZkClient zkClient , Collection <Node > brokers , String topic , int partitionCount , int replicationFactor ) {
@@ -448,9 +454,9 @@ private static void reassignPartitions(KafkaZkClient zkClient, Collection<Node>
448454 String currentAssignmentJson = formatAsReassignmentJson (topic , currentAssignment );
449455 String newAssignmentJson = formatAsReassignmentJson (topic , assignedReplicas );
450456
451- LOG .info ("Reassign partitions for topic " + topic );
452- LOG .info ("Current partition replica assignment " + currentAssignmentJson );
453- LOG .info ("New partition replica assignment " + newAssignmentJson );
457+ LOGGER .info ("Reassign partitions for topic " + topic );
458+ LOGGER .info ("Current partition replica assignment " + currentAssignmentJson );
459+ LOGGER .info ("New partition replica assignment " + newAssignmentJson );
454460 zkClient .createPartitionReassignment (newAssignment );
455461 }
456462
@@ -461,7 +467,7 @@ static int getReplicationFactor(List<TopicPartitionInfo> partitionInfoList) {
461467 int replicationFactor = partitionInfoList .get (0 ).replicas ().size ();
462468 for (TopicPartitionInfo partitionInfo : partitionInfoList ) {
463469 if (replicationFactor != partitionInfo .replicas ().size ()) {
464- LOG .warn ("Partitions of the topic have different replication factor." );
470+ LOGGER .warn ("Partitions of the topic have different replication factor." );
465471 return -1 ;
466472 }
467473 }
0 commit comments