Skip to content

Commit 5c38a99

Browse files
authored
Replace last use of ZkClient by AdminClient equivalent (#319)
Change the topic configuration update to use AdminClient. With first commit of this PR, behavior is like this (without the linebreaks, added for clarity) (listing all existing values then future values for every config): [2020-10-09 19:28:21,161] INFO MultiClusterTopicManagementService will overwrite properties of the topic xinfra-monitor-topic in cluster from Config(entries=[ ConfigEntry(name=compression.type, value=producer, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=confluent.value.schema.validation, value=false, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=leader.replication.throttled.replicas, value=, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.downconversion.enable, value=true, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.insync.replicas, value=1, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.jitter.ms, value=0, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=cleanup.policy, value=delete, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.ms, value=9223372036854775807, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=follower.replication.throttled.replicas, value=, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.bytes, value=1073741824, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.ms, value=3700000, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.messages, value=9223372036854775807, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.format.version, value=2.4-IV1, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=file.delete.delay.ms, value=60000, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=max.compaction.lag.ms, value=9223372036854775807, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=max.message.bytes, value=1000012, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.compaction.lag.ms, value=0, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.type, value=CreateTime, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=preallocate, value=false, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=confluent.placement.constraints, value=, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.cleanable.dirty.ratio, value=0.5, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=index.interval.bytes, value=4096, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=unclean.leader.election.enable, value=false, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.bytes, value=-1, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=delete.retention.ms, value=86400000, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.ms, value=604800000, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=confluent.key.schema.validation, value=false, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.difference.max.ms, value=9223372036854775807, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.index.bytes, value=10485760, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]) ]) to Config(entries=[ ConfigEntry(name=compression.type, value=producer, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=confluent.value.schema.validation, value=false, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=leader.replication.throttled.replicas, value=, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.downconversion.enable, value=true, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.insync.replicas, value=1, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.jitter.ms, value=0, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=cleanup.policy, value=delete, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.ms, value=9223372036854775807, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=follower.replication.throttled.replicas, value=, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.bytes, value=1073741824, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.ms, value=3600000, source=UNKNOWN, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.messages, value=9223372036854775807, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.format.version, value=2.4-IV1, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=file.delete.delay.ms, value=60000, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=max.compaction.lag.ms, value=9223372036854775807, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=max.message.bytes, value=1000012, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.compaction.lag.ms, value=0, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.type, value=CreateTime, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=preallocate, value=false, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=confluent.placement.constraints, value=, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.cleanable.dirty.ratio, value=0.5, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=index.interval.bytes, value=4096, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=unclean.leader.election.enable, value=false, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.bytes, value=-1, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=delete.retention.ms, value=86400000, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.ms, value=604800000, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=confluent.key.schema.validation, value=false, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.difference.max.ms, value=9223372036854775807, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.index.bytes, value=10485760, source=DYNAMIC_TOPIC_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]) ]). (com.linkedin.xinfra.monitor.services.MultiClusterTopicManagementService) With second commit, behavior is like this (listing only modifications that will occur with new values): [2020-10-09 19:26:30,786] INFO MultiClusterTopicManagementService will overwrite properties of the topic xinfra-monitor-topic in cluster with [AlterConfigOp{opType=SET, configEntry=ConfigEntry(name=retention.ms, value=3600000, source=UNKNOWN, isSensitive=false, isReadOnly=false, synonyms=[])}]. (com.linkedin.xinfra.monitor.services.MultiClusterTopicManagementService)
1 parent 8d8a33b commit 5c38a99

File tree

1 file changed

+63
-69
lines changed

1 file changed

+63
-69
lines changed

src/main/java/com/linkedin/xinfra/monitor/services/MultiClusterTopicManagementService.java

Lines changed: 63 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@
3636
import java.util.concurrent.TimeoutException;
3737
import java.util.concurrent.atomic.AtomicBoolean;
3838
import kafka.admin.BrokerMetadata;
39-
import kafka.server.ConfigType;
40-
import kafka.zk.KafkaZkClient;
4139
import org.apache.kafka.clients.admin.AdminClient;
4240
import org.apache.kafka.clients.admin.AdminClientConfig;
41+
import org.apache.kafka.clients.admin.AlterConfigOp;
4342
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
43+
import org.apache.kafka.clients.admin.Config;
44+
import org.apache.kafka.clients.admin.ConfigEntry;
4445
import org.apache.kafka.clients.admin.CreatePartitionsResult;
4546
import org.apache.kafka.clients.admin.ElectLeadersResult;
4647
import org.apache.kafka.clients.admin.NewPartitionReassignment;
@@ -54,8 +55,7 @@
5455
import org.apache.kafka.common.TopicPartition;
5556
import org.apache.kafka.common.TopicPartitionInfo;
5657
import org.apache.kafka.common.config.ConfigException;
57-
import org.apache.kafka.common.security.JaasUtils;
58-
import org.apache.kafka.common.utils.Time;
58+
import org.apache.kafka.common.config.ConfigResource;
5959
import org.slf4j.Logger;
6060
import org.slf4j.LoggerFactory;
6161
import scala.Option$;
@@ -416,70 +416,68 @@ private Set<Node> getAvailableBrokers() throws ExecutionException, InterruptedEx
416416
}
417417

418418
void maybeReassignPartitionAndElectLeader() throws ExecutionException, InterruptedException, TimeoutException {
419-
try (KafkaZkClient zkClient = KafkaZkClient.apply(_zkConnect, JaasUtils.isZkSecurityEnabled(),
420-
Utils.ZK_SESSION_TIMEOUT_MS, Utils.ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM,
421-
METRIC_GROUP_NAME, "SessionExpireListener", null)) {
422-
423-
List<TopicPartitionInfo> partitionInfoList =
424-
_adminClient.describeTopics(Collections.singleton(_topic)).all().get().get(_topic).partitions();
425-
Collection<Node> brokers = this.getAvailableBrokers();
426-
boolean partitionReassigned = false;
427-
if (partitionInfoList.size() == 0) {
428-
throw new IllegalStateException("Topic " + _topic + " does not exist in cluster.");
429-
}
419+
List<TopicPartitionInfo> partitionInfoList =
420+
_adminClient.describeTopics(Collections.singleton(_topic)).all().get().get(_topic).partitions();
421+
Collection<Node> brokers = this.getAvailableBrokers();
422+
boolean partitionReassigned = false;
423+
if (partitionInfoList.size() == 0) {
424+
throw new IllegalStateException("Topic " + _topic + " does not exist in cluster.");
425+
}
430426

431-
int currentReplicationFactor = getReplicationFactor(partitionInfoList);
432-
int expectedReplicationFactor = Math.max(currentReplicationFactor, _replicationFactor);
427+
int currentReplicationFactor = getReplicationFactor(partitionInfoList);
428+
int expectedReplicationFactor = Math.max(currentReplicationFactor, _replicationFactor);
433429

434-
if (_replicationFactor < currentReplicationFactor) {
435-
LOGGER.debug(
436-
"Configured replication factor {} is smaller than the current replication factor {} of the topic {} in cluster.",
437-
_replicationFactor, currentReplicationFactor, _topic);
438-
}
430+
if (_replicationFactor < currentReplicationFactor) {
431+
LOGGER.debug(
432+
"Configured replication factor {} is smaller than the current replication factor {} of the topic {} in cluster.",
433+
_replicationFactor, currentReplicationFactor, _topic);
434+
}
439435

440-
if (expectedReplicationFactor > currentReplicationFactor && Utils.ongoingPartitionReassignments(_adminClient)
441-
.isEmpty()) {
442-
LOGGER.info(
443-
"MultiClusterTopicManagementService will increase the replication factor of the topic {} in cluster"
444-
+ "from {} to {}", _topic, currentReplicationFactor, expectedReplicationFactor);
445-
reassignPartitions(_adminClient, brokers, _topic, partitionInfoList.size(), expectedReplicationFactor);
436+
if (expectedReplicationFactor > currentReplicationFactor && Utils.ongoingPartitionReassignments(_adminClient)
437+
.isEmpty()) {
438+
LOGGER.info(
439+
"MultiClusterTopicManagementService will increase the replication factor of the topic {} in cluster"
440+
+ "from {} to {}", _topic, currentReplicationFactor, expectedReplicationFactor);
441+
reassignPartitions(_adminClient, brokers, _topic, partitionInfoList.size(), expectedReplicationFactor);
446442

447-
partitionReassigned = true;
448-
}
443+
partitionReassigned = true;
444+
}
449445

450-
// Update the properties of the monitor topic if any config is different from the user-specified config
451-
Properties currentProperties = zkClient.getEntityConfigs(ConfigType.Topic(), _topic);
452-
Properties expectedProperties = new Properties();
453-
for (Object key : currentProperties.keySet()) {
454-
expectedProperties.put(key, currentProperties.get(key));
455-
}
456-
for (Object key : _topicProperties.keySet()) {
457-
expectedProperties.put(key, _topicProperties.get(key));
446+
// Update the properties of the monitor topic if any config is different from the user-specified config
447+
ConfigResource topicConfigResource = new ConfigResource(ConfigResource.Type.TOPIC, _topic);
448+
Config currentConfig = _adminClient.describeConfigs(Collections.singleton(topicConfigResource)).all().get().get(topicConfigResource);
449+
Collection<AlterConfigOp> alterConfigOps = new ArrayList<>();
450+
for (Map.Entry<Object, Object> entry : _topicProperties.entrySet()) {
451+
String name = String.valueOf(entry.getKey());
452+
ConfigEntry configEntry = new ConfigEntry(name, String.valueOf(entry.getValue()));
453+
if (!configEntry.equals(currentConfig.get(name))) {
454+
alterConfigOps.add(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET));
458455
}
456+
}
459457

460-
if (!currentProperties.equals(expectedProperties)) {
461-
LOGGER.info("MultiClusterTopicManagementService will overwrite properties of the topic {} "
462-
+ "in cluster from {} to {}.", _topic, currentProperties, expectedProperties);
463-
zkClient.setOrCreateEntityConfigs(ConfigType.Topic(), _topic, expectedProperties);
464-
}
458+
if (!alterConfigOps.isEmpty()) {
459+
LOGGER.info("MultiClusterTopicManagementService will overwrite properties of the topic {} "
460+
+ "in cluster with {}.", _topic, alterConfigOps);
461+
Map<ConfigResource, Collection<AlterConfigOp>> configs = Collections.singletonMap(topicConfigResource, alterConfigOps);
462+
_adminClient.incrementalAlterConfigs(configs);
463+
}
465464

466-
if (partitionInfoList.size() >= brokers.size() && someBrokerNotPreferredLeader(partitionInfoList, brokers)
467-
&& Utils.ongoingPartitionReassignments(_adminClient).isEmpty()) {
468-
LOGGER.info("{} will reassign partitions of the topic {} in cluster.", this.getClass().toString(), _topic);
469-
reassignPartitions(_adminClient, brokers, _topic, partitionInfoList.size(), expectedReplicationFactor);
465+
if (partitionInfoList.size() >= brokers.size() && someBrokerNotPreferredLeader(partitionInfoList, brokers)
466+
&& Utils.ongoingPartitionReassignments(_adminClient).isEmpty()) {
467+
LOGGER.info("{} will reassign partitions of the topic {} in cluster.", this.getClass().toString(), _topic);
468+
reassignPartitions(_adminClient, brokers, _topic, partitionInfoList.size(), expectedReplicationFactor);
470469

471-
partitionReassigned = true;
472-
}
470+
partitionReassigned = true;
471+
}
473472

474-
if (partitionInfoList.size() >= brokers.size() && someBrokerNotElectedLeader(partitionInfoList, brokers)) {
475-
if (!partitionReassigned || Utils.ongoingPartitionReassignments(_adminClient).isEmpty()) {
476-
LOGGER.info("MultiClusterTopicManagementService will trigger preferred leader election for the topic {} in "
477-
+ "cluster.", _topic);
478-
triggerPreferredLeaderElection(partitionInfoList, _topic);
479-
_preferredLeaderElectionRequested = false;
480-
} else {
481-
_preferredLeaderElectionRequested = true;
482-
}
473+
if (partitionInfoList.size() >= brokers.size() && someBrokerNotElectedLeader(partitionInfoList, brokers)) {
474+
if (!partitionReassigned || Utils.ongoingPartitionReassignments(_adminClient).isEmpty()) {
475+
LOGGER.info("MultiClusterTopicManagementService will trigger preferred leader election for the topic {} in "
476+
+ "cluster.", _topic);
477+
triggerPreferredLeaderElection(partitionInfoList, _topic);
478+
_preferredLeaderElectionRequested = false;
479+
} else {
480+
_preferredLeaderElectionRequested = true;
483481
}
484482
}
485483
}
@@ -489,17 +487,13 @@ void maybeElectLeader() throws Exception {
489487
return;
490488
}
491489

492-
try (KafkaZkClient zkClient = KafkaZkClient.apply(_zkConnect, JaasUtils.isZkSecurityEnabled(),
493-
Utils.ZK_SESSION_TIMEOUT_MS, Utils.ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM,
494-
METRIC_GROUP_NAME, "SessionExpireListener", null)) {
495-
if (Utils.ongoingPartitionReassignments(_adminClient).isEmpty()) {
496-
List<TopicPartitionInfo> partitionInfoList =
497-
_adminClient.describeTopics(Collections.singleton(_topic)).all().get().get(_topic).partitions();
498-
LOGGER.info("MultiClusterTopicManagementService will trigger requested preferred leader election for the"
499-
+ " topic {} in cluster.", _topic);
500-
triggerPreferredLeaderElection(partitionInfoList, _topic);
501-
_preferredLeaderElectionRequested = false;
502-
}
490+
if (Utils.ongoingPartitionReassignments(_adminClient).isEmpty()) {
491+
List<TopicPartitionInfo> partitionInfoList =
492+
_adminClient.describeTopics(Collections.singleton(_topic)).all().get().get(_topic).partitions();
493+
LOGGER.info("MultiClusterTopicManagementService will trigger requested preferred leader election for the"
494+
+ " topic {} in cluster.", _topic);
495+
triggerPreferredLeaderElection(partitionInfoList, _topic);
496+
_preferredLeaderElectionRequested = false;
503497
}
504498
}
505499

0 commit comments

Comments
 (0)