Skip to content

Commit 14aa285

Browse files
author
Andrew Choi
authored
Measurement of Commit Latency Service Metrics (#225)
Measurement of Commit Latency Service Metrics 1 - Adds setCommitStartTimeMs() functionality 2 - Adds public void recordCommitComplete() for finishing the recording of consumer offset commit. 3 - Implements public void setCommitStartTimeMs(long time) for setting in milliseconds the start time of consumer offset commit. 4 - Implements public long commitStartTimeMs() for retrieving the start time of consumer offset commit.
1 parent c23ad61 commit 14aa285

File tree

4 files changed

+72
-16
lines changed

4 files changed

+72
-16
lines changed

src/main/java/com/linkedin/kmf/services/CommitLatencyMetrics.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,25 +30,76 @@ public class CommitLatencyMetrics {
3030
private static final String METRIC_GROUP_NAME = "commit-latency-service";
3131
private static final Logger LOG = LoggerFactory.getLogger(CommitLatencyMetrics.class);
3232
private final Sensor _commitOffsetLatency;
33+
private long _commitStartTimeMs;
34+
private volatile boolean _inProgressCommit;
3335

3436
/**
3537
* Metrics for Calculating the offset commit latency of a consumer.
3638
* @param metrics the commit offset metrics
3739
* @param tags the tags associated, i.e) kmf.services:name=single-cluster-monitor
3840
*/
3941
CommitLatencyMetrics(Metrics metrics, Map<String, String> tags, int latencyPercentileMaxMs, int latencyPercentileGranularityMs) {
42+
_inProgressCommit = false;
4043
_commitOffsetLatency = metrics.sensor("commit-offset-latency");
4144
_commitOffsetLatency.add(new MetricName("commit-offset-latency-ms-avg", METRIC_GROUP_NAME, "The average latency in ms of committing offset", tags), new Avg());
4245
_commitOffsetLatency.add(new MetricName("commit-offset-latency-ms-max", METRIC_GROUP_NAME, "The maximum latency in ms of committing offset", tags), new Max());
4346

47+
if (latencyPercentileGranularityMs == 0) {
48+
throw new IllegalArgumentException("The latency percentile granularity was incorrectly passed a zero value.");
49+
}
50+
51+
// 2 extra buckets exist which are respectively designated for values which are less than 0.0 or larger than max.
4452
int bucketNum = latencyPercentileMaxMs / latencyPercentileGranularityMs + 2;
4553
int sizeInBytes = bucketNum * 4;
4654
_commitOffsetLatency.add(new Percentiles(sizeInBytes, latencyPercentileMaxMs, Percentiles.BucketSizing.CONSTANT,
4755
new Percentile(new MetricName("commit-offset-latency-ms-99th", METRIC_GROUP_NAME, "The 99th percentile latency of committing offset", tags), 99.0),
4856
new Percentile(new MetricName("commit-offset-latency-ms-999th", METRIC_GROUP_NAME, "The 99.9th percentile latency of committing offset", tags), 99.9),
4957
new Percentile(new MetricName("commit-offset-latency-ms-9999th", METRIC_GROUP_NAME, "The 99.99th percentile latency of committing offset", tags), 99.99)));
50-
5158
LOG.info("{} was constructed successfully.", this.getClass().getSimpleName());
59+
}
5260

61+
/**
62+
* start the recording of consumer offset commit
63+
* @throws Exception if the offset commit is already in progress.
64+
*/
65+
public void recordCommitStart() throws Exception {
66+
if (!_inProgressCommit) {
67+
this.setCommitStartTimeMs(System.currentTimeMillis());
68+
_inProgressCommit = true;
69+
} else {
70+
// inProgressCommit is already set to TRUE;
71+
throw new Exception("Offset commit is already in progress.");
72+
}
73+
}
74+
75+
/**
76+
* finish the recording of consumer offset commit
77+
*/
78+
public void recordCommitComplete() {
79+
if (_inProgressCommit) {
80+
long commitCompletedMs = System.currentTimeMillis();
81+
long commitStartMs = this.commitStartTimeMs();
82+
this._commitOffsetLatency.record(commitCompletedMs - commitStartMs);
83+
_inProgressCommit = false;
84+
} else {
85+
// inProgressCommit is already set to FALSE;
86+
LOG.error("Offset commit is not in progress. CommitLatencyMetrics shouldn't completing a record commit here.");
87+
}
88+
}
89+
90+
/**
91+
* set in milliseconds the start time of consumer offset commit
92+
* @param time commit start time in ms
93+
*/
94+
public void setCommitStartTimeMs(long time) {
95+
_commitStartTimeMs = time;
96+
}
97+
98+
/**
99+
* retrieve the start time of consumer offset commit
100+
* @return _commitStartTimeMs
101+
*/
102+
public long commitStartTimeMs() {
103+
return _commitStartTimeMs;
53104
}
54105
}

src/main/java/com/linkedin/kmf/services/ConsumeService.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,12 @@ public class ConsumeService implements Service {
5050
private static Metrics metrics;
5151
private final AtomicBoolean _running;
5252
private final KMBaseConsumer _baseConsumer;
53-
private int _latencySlaMs;
53+
private final int _latencySlaMs;
5454
private ConsumeMetrics _sensors;
5555
private Thread _consumeThread;
56-
private AdminClient _adminClient;
56+
private final AdminClient _adminClient;
5757
private CommitAvailabilityMetrics _commitAvailabilityMetrics;
58+
private CommitLatencyMetrics _commitLatencyMetrics;
5859
private String _topic;
5960
private String _name;
6061
private static final String METRIC_GROUP_NAME = "consume-service";
@@ -79,6 +80,7 @@ public ConsumeService(String name,
7980
tags.put(TAGS_NAME, name);
8081
_topic = consumerFactory.topic();
8182
_sensors = new ConsumeMetrics(metrics, tags, consumerFactory.latencyPercentileMaxMs(), consumerFactory.latencyPercentileGranularityMs());
83+
_commitLatencyMetrics = new CommitLatencyMetrics(metrics, tags, consumerFactory.latencyPercentileMaxMs(), consumerFactory.latencyPercentileGranularityMs());
8284
_commitAvailabilityMetrics = new CommitAvailabilityMetrics(metrics, tags);
8385
_consumeThread = new Thread(() -> {
8486
try {
@@ -125,7 +127,6 @@ record = _baseConsumer.receive();
125127
continue;
126128
}
127129
int partition = record.partition();
128-
129130
/* Commit availability and commit latency service */
130131
try {
131132
/* Call commitAsync, wait for a NON-NULL return value (see https://issues.apache.org/jira/browse/KAFKA-6183) */
@@ -137,24 +138,24 @@ public void onComplete(Map<TopicPartition, OffsetAndMetadata> topicPartitionOffs
137138
_commitAvailabilityMetrics._failedCommitOffsets.record();
138139
} else {
139140
_commitAvailabilityMetrics._offsetsCommitted.record();
141+
_commitLatencyMetrics.recordCommitComplete();
140142
}
141143
}
142144
};
143145

144146
/* Current timestamp to perform subtraction*/
145147
long currTimeMillis = System.currentTimeMillis();
146148

147-
/* 5 seconds consumer offset commit interval. */
149+
/* 4 seconds consumer offset commit interval. */
148150
long timeDiffMillis = TimeUnit.SECONDS.toMillis(COMMIT_TIME_INTERVAL);
149151

150152
if (currTimeMillis - _baseConsumer.lastCommitted() >= timeDiffMillis) {
151153
/* commit the consumer offset asynchronously with a callback. */
152154
_baseConsumer.commitAsync(commitCallback);
153-
155+
_commitLatencyMetrics.recordCommitStart();
154156
/* Record the current time for the committed consumer offset */
155157
_baseConsumer.updateLastCommit();
156158
}
157-
158159
} catch (Exception exception) {
159160
LOG.error("Exception while trying to perform an asynchronous commit.", exception);
160161
_commitAvailabilityMetrics._failedCommitOffsets.record();

src/main/java/com/linkedin/kmf/services/GraphiteMetricsReporterService.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,17 @@ public GraphiteMetricsReporterService(Map<String, Object> props, String name)
5151

5252
@Override
5353
public synchronized void start() {
54-
_executor.scheduleAtFixedRate(() -> {
55-
try {
56-
reportMetrics();
57-
} catch (Exception e) {
58-
LOG.error(_name + "/GraphiteMetricsReporterService failed to report metrics", e);
54+
_executor.scheduleAtFixedRate(new Runnable() {
55+
@Override
56+
public void run() {
57+
try {
58+
GraphiteMetricsReporterService.this.reportMetrics();
59+
} catch (Exception e) {
60+
LOG.error(_name + "/GraphiteMetricsReporterService failed to report metrics",
61+
e);
62+
}
5963
}
60-
}, _reportIntervalSec, _reportIntervalSec, TimeUnit.SECONDS
64+
}, _reportIntervalSec, _reportIntervalSec, TimeUnit.SECONDS
6165
);
6266
LOG.info("{}/GraphiteMetricsReporterService started", _name);
6367
}

src/test/java/com/linkedin/kmf/services/ConsumeServiceTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class ConsumeServiceTest {
4343
private static final String TAGS_NAME = "name";
4444
private static final String METRIC_GROUP_NAME = "commit-availability-service";
4545
/* thread start delay in seconds */
46-
private static final long THREAD_START_DELAY = 4;
46+
private static final long THREAD_START_DELAY_SECONDS = 4;
4747
private static final String TAG_NAME_VALUE = "name";
4848
private static final long MOCK_LAST_COMMITTED_OFFSET = System.currentTimeMillis();
4949
private static final int PARTITION = 2;
@@ -83,7 +83,7 @@ public void commitAvailabilityTest() throws Exception {
8383
Assert.assertTrue(consumeService.isRunning());
8484

8585
/* in milliseconds */
86-
long threadStartDelay = 1000 * THREAD_START_DELAY;
86+
long threadStartDelay = TimeUnit.SECONDS.toMillis(THREAD_START_DELAY_SECONDS);
8787

8888
/* Thread.sleep safe to do here instead of ScheduledExecutorService
8989
* We want to sleep current thread so that consumeService can start running for enough seconds. */
@@ -112,7 +112,7 @@ public void commitLatencyTest() throws Exception {
112112
Assert.assertTrue(consumeService.isRunning());
113113

114114
/* in milliseconds */
115-
long threadStartDelay = TimeUnit.SECONDS.toMillis(THREAD_START_DELAY);
115+
long threadStartDelay = TimeUnit.SECONDS.toMillis(THREAD_START_DELAY_SECONDS);
116116

117117
/* Thread.sleep safe to do here instead of ScheduledExecutorService
118118
* We want to sleep current thread so that consumeService can start running for enough seconds. */

0 commit comments

Comments
 (0)