Skip to content

Commit c1b34dc

Browse files
author
Andrew Choi
authored
[WIP] Enable SSL Configurations for ISSUE #178 (#187)
This will be merged after the PR (#181) is merged. and then this PR rebased. Addresses the issue: ISSUE #178 🖌 Signed-off-by: Andrew Choi <[email protected]>
1 parent f1a21c9 commit c1b34dc

File tree

6 files changed

+36
-20
lines changed

6 files changed

+36
-20
lines changed

src/main/java/com/linkedin/kmf/apps/MultiClusterMonitor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ private Map<String, Object> createMultiClusterTopicManagementServiceProps(Map<St
7373
@Override
7474
public void start() {
7575
_multiClusterTopicManagementService.start();
76-
CompletableFuture<Void> completableFuture = _multiClusterTopicManagementService.topicManagementReady();
77-
completableFuture.thenRun(() -> {
76+
CompletableFuture<Void> topicManagementReady = _multiClusterTopicManagementService.topicManagementReady();
77+
topicManagementReady.thenRun(() -> {
7878
_produceService.start();
7979
_consumeService.start();
8080
});

src/main/java/com/linkedin/kmf/apps/SingleClusterMonitor.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ public class SingleClusterMonitor implements App {
5252
public SingleClusterMonitor(Map<String, Object> props, String name) throws Exception {
5353
_name = name;
5454
_topicManagementService = new TopicManagementService(props, name);
55+
5556
CompletableFuture<Void> topicPartitionReady = _topicManagementService.topicPartitionReady();
57+
5658
_produceService = new ProduceService(props, name);
5759
_consumeService = new ConsumeService(props, name, topicPartitionReady);
5860
}
@@ -78,16 +80,21 @@ public void stop() {
7880

7981
@Override
8082
public boolean isRunning() {
83+
boolean isRunning = true;
84+
8185
if (!_topicManagementService.isRunning()) {
86+
isRunning = false;
8287
LOG.info("_topicManagementService not running.");
8388
}
8489
if (!_produceService.isRunning()) {
90+
isRunning = false;
8591
LOG.info("_produceService not running.");
8692
}
8793
if (!_consumeService.isRunning()) {
94+
isRunning = false;
8895
LOG.info("_consumeService not Running.");
8996
}
90-
return _topicManagementService.isRunning() && _produceService.isRunning() && _consumeService.isRunning();
97+
return isRunning;
9198
}
9299

93100
@Override
@@ -279,7 +286,6 @@ public static void main(String[] args) throws Exception {
279286
props.put(TopicManagementServiceConfig.TOPIC_REPLICATION_FACTOR_CONFIG, res.getInt("replicationFactor"));
280287
if (res.getInt("rebalanceMs") != null)
281288
props.put(MultiClusterTopicManagementServiceConfig.REBALANCE_INTERVAL_MS_CONFIG, res.getInt("rebalanceMs"));
282-
283289
SingleClusterMonitor app = new SingleClusterMonitor(props, "single-cluster-monitor");
284290
app.start();
285291

src/main/java/com/linkedin/kmf/services/ConsumeService.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public class ConsumeService implements Service {
6060
private final String _name;
6161
private ConsumeMetrics _sensors;
6262
private final KMBaseConsumer _consumer;
63-
private Thread _thread;
63+
private Thread _consumeThread;
6464
private final int _latencyPercentileMaxMs;
6565
private final int _latencyPercentileGranularityMs;
6666
private final AtomicBoolean _running;
@@ -104,6 +104,10 @@ public ConsumeService(Map<String, Object> props, String name, CompletableFuture<
104104

105105
// Assign config specified for consumer. This has the highest priority.
106106
consumerProps.putAll(consumerPropsOverride);
107+
108+
if (props.containsKey(ConsumeServiceConfig.CONSUMER_PROPS_CONFIG)) {
109+
props.forEach(consumerProps::putIfAbsent);
110+
}
107111
_consumer = (KMBaseConsumer) Class.forName(consumerClassName).getConstructor(String.class, Properties.class).newInstance(topic, consumerProps);
108112
topicPartitionReady.thenRun(() -> {
109113
MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(1000, TimeUnit.MILLISECONDS);
@@ -114,16 +118,15 @@ public ConsumeService(Map<String, Object> props, String name, CompletableFuture<
114118
tags.put("name", _name);
115119
_adminClient = AdminClient.create(props);
116120
_sensors = new ConsumeMetrics(metrics, tags, topic, topicPartitionReady);
117-
_thread = new Thread(() -> {
121+
_consumeThread = new Thread(() -> {
118122
try {
119123
consume();
120124
} catch (Exception e) {
121125
LOG.error(_name + "/ConsumeService failed", e);
122126
}
123127
}, _name + " consume-service");
124-
_thread.setDaemon(true);
128+
_consumeThread.setDaemon(true);
125129
});
126-
127130
}
128131

129132
private void consume() throws Exception {
@@ -185,7 +188,7 @@ record = _consumer.receive();
185188
@Override
186189
public synchronized void start() {
187190
if (_running.compareAndSet(false, true)) {
188-
_thread.start();
191+
_consumeThread.start();
189192
LOG.info("{}/ConsumeService started.", _name);
190193
}
191194
}
@@ -209,7 +212,7 @@ public void awaitShutdown() {
209212

210213
@Override
211214
public boolean isRunning() {
212-
return _running.get() && _thread.isAlive();
215+
return _running.get() && _consumeThread.isAlive();
213216
}
214217

215218
private class ConsumeMetrics {

src/main/java/com/linkedin/kmf/services/MultiClusterTopicManagementService.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ public class MultiClusterTopicManagementService implements Service {
7979
private final int _scheduleIntervalMs;
8080
private final long _preferredLeaderElectionIntervalMs;
8181
private final ScheduledExecutorService _executor;
82-
private final CompletableFuture<Void> _completableFuture;
82+
83+
private final CompletableFuture<Void> _topicManagementReady;
84+
8385

8486
@SuppressWarnings("unchecked")
8587
public MultiClusterTopicManagementService(Map<String, Object> props, String serviceName) throws Exception {
@@ -91,14 +93,14 @@ public MultiClusterTopicManagementService(Map<String, Object> props, String serv
9193
_topicManagementByCluster = initializeTopicManagementHelper(propsByCluster, topic);
9294
_scheduleIntervalMs = config.getInt(MultiClusterTopicManagementServiceConfig.REBALANCE_INTERVAL_MS_CONFIG);
9395
_preferredLeaderElectionIntervalMs = config.getLong(MultiClusterTopicManagementServiceConfig.PREFERRED_LEADER_ELECTION_CHECK_INTERVAL_MS_CONFIG);
94-
_completableFuture = new CompletableFuture<>();
96+
_topicManagementReady = new CompletableFuture<>();
9597
_executor = Executors.newSingleThreadScheduledExecutor(
9698
r -> new Thread(r, _serviceName + "-multi-cluster-topic-management-service"));
9799
_topicPartitionReady.complete(null);
98100
}
99101

100102
public CompletableFuture<Void> topicManagementReady() {
101-
return _completableFuture;
103+
return _topicManagementReady;
102104
}
103105

104106
public CompletableFuture<Void> topicPartitionReady() {
@@ -178,7 +180,7 @@ public void run() {
178180
for (TopicManagementHelper helper : _topicManagementByCluster.values()) {
179181
helper.maybeAddPartitions(minPartitionNum);
180182
}
181-
_completableFuture.complete(null);
183+
_topicManagementReady.complete(null);
182184
for (Map.Entry<String, TopicManagementHelper> entry : _topicManagementByCluster.entrySet()) {
183185
String clusterName = entry.getKey();
184186
TopicManagementHelper helper = entry.getValue();
@@ -261,7 +263,9 @@ static class TopicManagementHelper {
261263
Map topicFactoryConfig = props.containsKey(TopicManagementServiceConfig.TOPIC_FACTORY_PROPS_CONFIG) ?
262264
(Map) props.get(TopicManagementServiceConfig.TOPIC_FACTORY_PROPS_CONFIG) : new HashMap();
263265
_topicFactory = (TopicFactory) Class.forName(topicFactoryClassName).getConstructor(Map.class).newInstance(topicFactoryConfig);
266+
264267
_adminClient = constructAdminClient(props);
268+
LOG.info("{} configs: {}", _adminClient.getClass().getSimpleName(), props);
265269
}
266270

267271
@SuppressWarnings("unchecked")
@@ -277,8 +281,6 @@ void maybeCreateTopic() throws Exception {
277281
}
278282

279283
AdminClient constructAdminClient(Map<String, Object> props) {
280-
props.putIfAbsent(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, _bootstrapServers);
281-
props.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, _requestTimeoutMs);
282284
return AdminClient.create(props);
283285
}
284286

src/main/java/com/linkedin/kmf/services/ProduceService.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ public ProduceService(Map<String, Object> props, String name) throws Exception {
113113
throw new ConfigException("Override must not contain " + property + " config.");
114114
}
115115
}
116+
116117
_adminClient = AdminClient.create(props);
117118

118119
if (producerClass.equals(NewProducer.class.getCanonicalName()) || producerClass.equals(NewProducer.class.getSimpleName())) {
@@ -121,7 +122,7 @@ public ProduceService(Map<String, Object> props, String name) throws Exception {
121122
_producerClassName = producerClass;
122123
}
123124

124-
initializeProducer();
125+
initializeProducer(props);
125126

126127
_produceExecutor = Executors.newScheduledThreadPool(_threadsNum, new ProduceServiceThreadFactory());
127128
_handleNewPartitionsExecutor = Executors.newSingleThreadScheduledExecutor(new HandleNewPartitionsThreadFactory());
@@ -135,7 +136,7 @@ public ProduceService(Map<String, Object> props, String name) throws Exception {
135136
_sensors = new ProduceMetrics(metrics, tags);
136137
}
137138

138-
private void initializeProducer() throws Exception {
139+
private void initializeProducer(Map<String, Object> props) throws Exception {
139140
Properties producerProps = new Properties();
140141
// Assign default config. This has the lowest priority.
141142
producerProps.put(ProducerConfig.ACKS_CONFIG, "-1");
@@ -151,6 +152,10 @@ private void initializeProducer() throws Exception {
151152
// Assign config specified for producer. This has the highest priority.
152153
producerProps.putAll(_producerPropsOverride);
153154

155+
if (props.containsKey(ProduceServiceConfig.PRODUCER_PROPS_CONFIG)) {
156+
props.forEach(producerProps::putIfAbsent);
157+
}
158+
154159
_producer = (KMBaseProducer) Class.forName(_producerClassName).getConstructor(Properties.class).newInstance(producerProps);
155160
LOG.info("{}/ProduceService is initialized.", _name);
156161
}
@@ -380,7 +385,7 @@ public void run() {
380385
}
381386
_producer.close();
382387
try {
383-
initializeProducer();
388+
initializeProducer(new HashMap<>());
384389
} catch (Exception e) {
385390
LOG.error("Failed to restart producer.", e);
386391
throw new IllegalStateException(e);

src/main/java/com/linkedin/kmf/services/configs/ConsumeServiceConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.apache.kafka.common.config.AbstractConfig;
1515
import org.apache.kafka.common.config.ConfigDef;
1616

17+
1718
public class ConsumeServiceConfig extends AbstractConfig {
1819

1920
private static final ConfigDef CONFIG;
@@ -79,7 +80,6 @@ public class ConsumeServiceConfig extends AbstractConfig {
7980
20000,
8081
ConfigDef.Importance.MEDIUM,
8182
LATENCY_SLA_MS_DOC);
82-
8383
}
8484

8585
public ConsumeServiceConfig(Map<?, ?> props) {

0 commit comments

Comments
 (0)