Skip to content

Commit cbba9a9

Browse files
author
Andrew Choi
authored
Commit Availability Service (#196)
**Commit Availability Service.** 1 - Addresses: Broken ConsumeService on latest release #195 #195 2 - Addresses: #204 3 - Performs Consumer Offset Commit Availability using Metric Sensors. Does periodically, every few x seconds. 4 - In doing this, public interface `ConsumerFactory` is created to build consumer and pass into `consumeService`. WIP as a follow up PR : Commit `Latency` Service Signed-off-by: Andrew Choi <[email protected]>
1 parent 4bf4803 commit cbba9a9

24 files changed

+762
-196
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ allprojects {
4343
compile 'com.signalfx.public:signalfx-codahale:0.0.47'
4444
compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.3.1'
4545
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.1'
46+
testCompile 'org.mockito:mockito-core:2.24.0'
4647
testCompile 'org.testng:testng:6.8.8'
4748
}
4849

config/kafka-monitor.properties

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@
3838
# or ServiceClassName. The key for the test/service in the json map is used as name to
3939
# identify the test/service in the log or JMX metrics, which is useful if multiple
4040
# test/service with the same class.name are run in the same Kafka Monitor process.
41-
#
42-
# If using Secure Socket Layer for security protocol, SSL properties must be defined under
41+
#
42+
# If using Secure Socket Layer for security protocol, SSL properties must be defined under
4343
# produce.producer.props, consume.consumer.props, as well as single-cluster-monitor props
4444

4545
{
@@ -82,21 +82,49 @@
8282
"class.name": "com.linkedin.kmf.services.DefaultMetricsReporterService",
8383
"report.interval.sec": 1,
8484
"report.metrics.list": [
85-
"kmf:type=kafka-monitor:offline-runnable-count",
86-
"kmf.services:type=produce-service,name=*:produce-availability-avg",
87-
"kmf.services:type=consume-service,name=*:consume-availability-avg",
88-
"kmf.services:type=produce-service,name=*:records-produced-total",
89-
"kmf.services:type=consume-service,name=*:records-consumed-total",
90-
"kmf.services:type=consume-service,name=*:records-lost-total",
91-
"kmf.services:type=consume-service,name=*:records-lost-rate",
92-
"kmf.services:type=consume-service,name=*:records-duplicated-total",
93-
"kmf.services:type=consume-service,name=*:records-delay-ms-avg",
94-
"kmf.services:type=produce-service,name=*:records-produced-rate",
95-
"kmf.services:type=produce-service,name=*:produce-error-rate",
96-
"kmf.services:type=consume-service,name=*:consume-error-rate"
85+
"kmf:type=kafka-monitor,name=*:offline-runnable-count",
86+
"kmf.services:type=produce-service,name=*:produce-availability-avg",
87+
"kmf.services:type=consume-service,name=*:consume-availability-avg",
88+
"kmf.services:type=commit-availability-service,name=*:offsets-committed-avg",
89+
"kmf.services:type=commit-availability-service,name=*:commit-latency-avg",
90+
"kmf.services:type=commit-availability-service,name=*:commit-availability-avg",
91+
"kmf.services:type=commit-availability-service,name=*:failed-commit-offsets-avg",
92+
"kmf.services:type=produce-service,name=*:records-produced-total",
93+
"kmf.services:type=consume-service,name=*:records-consumed-total",
94+
"kmf.services:type=consume-service,name=*:records-lost-total",
95+
"kmf.services:type=consume-service,name=*:records-lost-rate",
96+
"kmf.services:type=consume-service,name=*:records-duplicated-total",
97+
"kmf.services:type=consume-service,name=*:records-delay-ms-avg",
98+
"kmf.services:type=produce-service,name=*:records-produced-rate",
99+
"kmf.services:type=produce-service,name=*:produce-error-rate",
100+
"kmf.services:type=consume-service,name=*:consume-error-rate",
101+
"kmf.services:type=commit-availability-service,name=*:offsets-committed-total",
102+
"kmf.services:type=commit-availability-service,name=*:failed-commit-offsets-total"
97103
]
98104
}
99105

106+
# Example produce-service to produce messages to cluster
107+
# "produce-service": {
108+
# "class.name": "com.linkedin.kmf.services.ProduceService",
109+
# "topic": "kafka-monitor-topic",
110+
# "zookeeper.connect": "localhost:2181",
111+
# "bootstrap.servers": "localhost:9092",
112+
# "consume.latency.sla.ms": "20000",
113+
# "consume.consumer.props": {
114+
# }
115+
# },
116+
117+
# Example consume-service to consume messages
118+
# "consume-service": {
119+
# "class.name": "com.linkedin.kmf.services.ConsumeService",
120+
# "topic": "kafka-monitor-topic",
121+
# "zookeeper.connect": "localhost:2181",
122+
# "bootstrap.servers": "localhost:9092",
123+
# "consume.latency.sla.ms": "20000",
124+
# "consume.consumer.props": {
125+
# }
126+
# },
127+
100128
# Example statsd-service to report metrics
101129
# "statsd-service": {
102130
# "class.name": "com.linkedin.kmf.services.StatsdMetricsReporterService",

src/main/java/com/linkedin/kmf/KafkaMonitor.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,15 @@
1111

1212
import com.fasterxml.jackson.databind.ObjectMapper;
1313
import com.linkedin.kmf.apps.App;
14+
import com.linkedin.kmf.services.ConsumerFactoryImpl;
1415
import com.linkedin.kmf.services.Service;
1516
import java.io.BufferedReader;
1617
import java.io.FileReader;
18+
import java.lang.reflect.Constructor;
1719
import java.util.ArrayList;
1820
import java.util.List;
1921
import java.util.Map;
22+
import java.util.concurrent.CompletableFuture;
2023
import java.util.concurrent.ConcurrentHashMap;
2124
import java.util.concurrent.ConcurrentMap;
2225
import java.util.concurrent.Executors;
@@ -31,7 +34,6 @@
3134
import org.slf4j.Logger;
3235
import org.slf4j.LoggerFactory;
3336

34-
3537
/**
3638
* This is the main entry point of the monitor. It reads the configuration and manages the life cycle of the monitoring
3739
* applications.
@@ -63,13 +65,24 @@ public KafkaMonitor(Map<String, Map> testProps) throws Exception {
6365
throw new IllegalArgumentException(name + " is not configured with " + CLASS_NAME_CONFIG);
6466
String className = (String) props.get(CLASS_NAME_CONFIG);
6567

66-
Class<?> cls = Class.forName(className);
67-
if (App.class.isAssignableFrom(cls)) {
68+
Class<?> aClass = Class.forName(className);
69+
if (App.class.isAssignableFrom(aClass)) {
6870
App test = (App) Class.forName(className).getConstructor(Map.class, String.class).newInstance(props, name);
6971
_apps.put(name, test);
70-
} else if (Service.class.isAssignableFrom(cls)) {
71-
Service service = (Service) Class.forName(className).getConstructor(Map.class, String.class).newInstance(props, name);
72-
_services.put(name, service);
72+
} else if (Service.class.isAssignableFrom(aClass)) {
73+
Constructor<?>[] constructors = Class.forName(className).getConstructors();
74+
if (this.constructorContainsFuture(constructors)) {
75+
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
76+
completableFuture.complete(null);
77+
ConsumerFactoryImpl consumerFactory = new ConsumerFactoryImpl(props);
78+
Service service = (Service) Class.forName(className)
79+
.getConstructor(String.class, CompletableFuture.class, ConsumerFactoryImpl.class)
80+
.newInstance(name, completableFuture, consumerFactory);
81+
_services.put(name, service);
82+
} else {
83+
Service service = (Service) Class.forName(className).getConstructor(Map.class, String.class).newInstance(props, name);
84+
_services.put(name, service);
85+
}
7386
} else {
7487
throw new IllegalArgumentException(className + " should implement either " + App.class.getSimpleName() + " or " + Service.class.getSimpleName());
7588
}
@@ -83,6 +96,15 @@ public KafkaMonitor(Map<String, Map> testProps) throws Exception {
8396
(config, now) -> _offlineRunnables.size());
8497
}
8598

99+
private boolean constructorContainsFuture(Constructor<?>[] constructors) {
100+
for (int n = 0; n < constructors[0].getParameterTypes().length; ++n) {
101+
if (constructors[0].getParameterTypes()[n].equals(CompletableFuture.class)) {
102+
return true;
103+
}
104+
}
105+
return false;
106+
}
107+
86108
public synchronized void start() {
87109
if (!_isRunning.compareAndSet(false, true)) {
88110
return;
@@ -151,7 +173,6 @@ public static void main(String[] args) throws Exception {
151173
return;
152174
}
153175

154-
155176
StringBuilder buffer = new StringBuilder();
156177
try (BufferedReader br = new BufferedReader(new FileReader(args[0].trim()))) {
157178
String line;

src/main/java/com/linkedin/kmf/apps/MultiClusterMonitor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import com.linkedin.kmf.apps.configs.MultiClusterMonitorConfig;
1414
import com.linkedin.kmf.services.ConsumeService;
15+
import com.linkedin.kmf.services.ConsumerFactoryImpl;
1516
import com.linkedin.kmf.services.MultiClusterTopicManagementService;
1617
import com.linkedin.kmf.services.ProduceService;
1718
import java.util.HashMap;
@@ -42,7 +43,8 @@ public MultiClusterMonitor(Map<String, Object> props, String name) throws Except
4243
_multiClusterTopicManagementService = new MultiClusterTopicManagementService(createMultiClusterTopicManagementServiceProps(props, config), name);
4344
CompletableFuture<Void> topicPartitionReady = _multiClusterTopicManagementService.topicPartitionResult();
4445
_produceService = new ProduceService(createProduceServiceProps(props, config), name);
45-
_consumeService = new ConsumeService(createConsumeServiceProps(props, config), name, topicPartitionReady);
46+
ConsumerFactoryImpl consumerFactory = new ConsumerFactoryImpl(createConsumeServiceProps(props, config));
47+
_consumeService = new ConsumeService(name, topicPartitionReady, consumerFactory);
4648
}
4749

4850
@SuppressWarnings("unchecked")

src/main/java/com/linkedin/kmf/apps/SingleClusterMonitor.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,19 @@
1111
package com.linkedin.kmf.apps;
1212

1313
import com.linkedin.kmf.services.ConsumeService;
14+
import com.linkedin.kmf.services.ConsumerFactoryImpl;
1415
import com.linkedin.kmf.services.DefaultMetricsReporterService;
1516
import com.linkedin.kmf.services.JettyService;
1617
import com.linkedin.kmf.services.JolokiaService;
1718
import com.linkedin.kmf.services.ProduceService;
19+
import com.linkedin.kmf.services.Service;
1820
import com.linkedin.kmf.services.TopicManagementService;
1921
import com.linkedin.kmf.services.configs.ConsumeServiceConfig;
2022
import com.linkedin.kmf.services.configs.DefaultMetricsReporterServiceConfig;
2123
import com.linkedin.kmf.services.configs.MultiClusterTopicManagementServiceConfig;
2224
import com.linkedin.kmf.services.configs.ProduceServiceConfig;
2325
import com.linkedin.kmf.services.configs.TopicManagementServiceConfig;
26+
import java.util.ArrayList;
2427
import java.util.Arrays;
2528
import java.util.HashMap;
2629
import java.util.List;
@@ -49,14 +52,20 @@ public class SingleClusterMonitor implements App {
4952
private final ProduceService _produceService;
5053
private final ConsumeService _consumeService;
5154
private final String _name;
55+
private final List<Service> _allServices;
5256

5357
public SingleClusterMonitor(Map<String, Object> props, String name) throws Exception {
5458
_name = name;
5559
_topicManagementService = new TopicManagementService(props, name);
56-
5760
CompletableFuture<Void> topicPartitionReady = _topicManagementService.topicPartitionResult();
5861
_produceService = new ProduceService(props, name);
59-
_consumeService = new ConsumeService(props, name, topicPartitionReady);
62+
ConsumerFactoryImpl consumerFactory = new ConsumerFactoryImpl(props);
63+
_consumeService = new ConsumeService(name, topicPartitionReady, consumerFactory);
64+
int servicesInitialCapacity = 4;
65+
_allServices = new ArrayList<>(servicesInitialCapacity);
66+
_allServices.add(_topicManagementService);
67+
_allServices.add(_produceService);
68+
_allServices.add(_consumeService);
6069
}
6170

6271
@Override
@@ -72,9 +81,9 @@ public void start() {
7281

7382
@Override
7483
public void stop() {
75-
_topicManagementService.stop();
76-
_produceService.stop();
77-
_consumeService.stop();
84+
for (Service service : _allServices) {
85+
service.stop();
86+
}
7887
LOG.info(_name + "/SingleClusterMonitor stopped.");
7988
}
8089

@@ -99,9 +108,9 @@ public boolean isRunning() {
99108

100109
@Override
101110
public void awaitShutdown() {
102-
_topicManagementService.awaitShutdown();
103-
_produceService.awaitShutdown();
104-
_consumeService.awaitShutdown();
111+
for (Service service : _allServices) {
112+
service.awaitShutdown();
113+
}
105114
}
106115

107116
/** Get the command-line argument parser. */
@@ -305,7 +314,9 @@ public static void main(String[] args) throws Exception {
305314
"kmf.services:type=consume-service,name=*:records-delay-ms-avg",
306315
"kmf.services:type=produce-service,name=*:records-produced-rate",
307316
"kmf.services:type=produce-service,name=*:produce-error-rate",
308-
"kmf.services:type=consume-service,name=*:consume-error-rate"
317+
"kmf.services:type=consume-service,name=*:consume-error-rate",
318+
"kmf.services:type=consume-service,name=*:commit-latency-avg",
319+
"kmf.services:type=consume-service,name=*:commit-availability-avg"
309320
);
310321
props.put(DefaultMetricsReporterServiceConfig.REPORT_METRICS_CONFIG, metrics);
311322

src/main/java/com/linkedin/kmf/common/ConfigDocumentationGenerator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
import java.lang.reflect.Field;
1818
import org.apache.kafka.common.config.AbstractConfig;
1919
import org.apache.kafka.common.config.ConfigDef;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
2022

2123

2224
/**
2325
* Generates the table of configuration parameters, their documentation strings and default values.
2426
*/
2527
public class ConfigDocumentationGenerator {
28+
private static final Logger LOG = LoggerFactory.getLogger(ConfigDocumentationGenerator.class);
2629

2730
private static void printHelp() {
2831
System.out.println("ConfigDocumentationGenerator outputDirectory configClassNames...");

src/main/java/com/linkedin/kmf/consumer/KMBaseConsumer.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@
1010

1111
package com.linkedin.kmf.consumer;
1212

13+
import java.util.Map;
14+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
15+
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
16+
import org.apache.kafka.common.TopicPartition;
17+
18+
1319
/**
1420
* A base consumer used to abstract different consumer classes.
1521
*
@@ -20,6 +26,18 @@ public interface KMBaseConsumer {
2026

2127
BaseConsumerRecord receive() throws Exception;
2228

29+
void commitAsync();
30+
31+
void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
32+
33+
void commitAsync(OffsetCommitCallback callback);
34+
35+
OffsetAndMetadata committed(TopicPartition tp);
36+
2337
void close();
2438

39+
long lastCommitted();
40+
41+
void updateLastCommit();
42+
2543
}

src/main/java/com/linkedin/kmf/consumer/NewConsumer.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,15 @@
1313
import java.time.Duration;
1414
import java.util.Collections;
1515
import java.util.Iterator;
16+
import java.util.Map;
1617
import java.util.Properties;
1718
import org.apache.kafka.clients.consumer.ConsumerRecord;
1819
import org.apache.kafka.clients.consumer.KafkaConsumer;
20+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
21+
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
22+
import org.apache.kafka.common.TopicPartition;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
1925

2026
/*
2127
* Wrap around the new consumer from Apache Kafka and implement the #KMBaseConsumer interface
@@ -24,6 +30,8 @@ public class NewConsumer implements KMBaseConsumer {
2430

2531
private final KafkaConsumer<String, String> _consumer;
2632
private Iterator<ConsumerRecord<String, String>> _recordIter;
33+
private static final Logger LOG = LoggerFactory.getLogger(NewConsumer.class);
34+
private static long lastCommitted;
2735

2836
public NewConsumer(String topic, Properties consumerProperties) {
2937
_consumer = new KafkaConsumer<>(consumerProperties);
@@ -39,9 +47,39 @@ public BaseConsumerRecord receive() {
3947
return new BaseConsumerRecord(record.topic(), record.partition(), record.offset(), record.key(), record.value());
4048
}
4149

50+
@Override
51+
public void commitAsync() {
52+
_consumer.commitAsync();
53+
}
54+
55+
@Override
56+
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
57+
_consumer.commitAsync(offsets, callback);
58+
}
59+
60+
@Override
61+
public void commitAsync(OffsetCommitCallback callback) {
62+
_consumer.commitAsync(callback);
63+
}
64+
65+
@Override
66+
public OffsetAndMetadata committed(TopicPartition tp) {
67+
return _consumer.committed(tp);
68+
}
69+
4270
@Override
4371
public void close() {
4472
_consumer.close();
4573
}
4674

75+
@Override
76+
public long lastCommitted() {
77+
return lastCommitted;
78+
}
79+
80+
@Override
81+
public void updateLastCommit() {
82+
lastCommitted = System.currentTimeMillis();
83+
}
84+
4785
}

0 commit comments

Comments
 (0)