-
Notifications
You must be signed in to change notification settings - Fork 482
Default to capturing Kafka consumer metrics once per second #6192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 21 commits
93fb82a
dc50905
ee80890
23c3970
0388f52
8945be4
f73c6f6
045293d
9f34b96
f09fe85
e6fe16a
dbec829
351837c
3713dbe
1c38647
80d07f9
77d3c5e
2b94899
73165d1
e8afcea
7fd2ba4
ab36134
50f4712
33350b1
1b45481
80f3100
9c8fe7c
3f94cb1
478628d
2736dbc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,9 +13,11 @@ use std::collections::BTreeMap; | |
| use std::convert; | ||
| use std::fs::File; | ||
| use std::io::Read; | ||
| use std::option::Option; | ||
| use std::sync::Mutex; | ||
|
|
||
| use anyhow::bail; | ||
| use chrono; | ||
| use log::{debug, error, info, warn}; | ||
| use rdkafka::client::ClientContext; | ||
| use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext}; | ||
|
|
@@ -36,11 +38,12 @@ enum ValType { | |
|
|
||
| // Describes Kafka cluster configurations users can suppply using `CREATE | ||
| // SOURCE...WITH (option_list)`. | ||
| // TODO(sploiselle): Support overriding keys, default values. | ||
| // TODO(sploiselle): Support overriding keys. | ||
| struct Config { | ||
| name: &'static str, | ||
| val_type: ValType, | ||
| transform: fn(String) -> String, | ||
| default: fn() -> Option<String>, | ||
|
||
| } | ||
|
|
||
| impl Config { | ||
|
|
@@ -49,6 +52,7 @@ impl Config { | |
| name, | ||
| val_type, | ||
| transform: convert::identity, | ||
| default: || -> Option<String> { None }, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -69,6 +73,12 @@ impl Config { | |
| self | ||
| } | ||
|
|
||
| // Allows for returning a default value for this configuration option | ||
| fn default(mut self, f: fn() -> Option<String>) -> Self { | ||
| self.default = f; | ||
| self | ||
| } | ||
|
|
||
| // Get the appropriate String to use as the Kafka config key. | ||
| fn get_key(&self) -> String { | ||
| self.name.replace("_", ".") | ||
|
|
@@ -105,7 +115,12 @@ fn extract( | |
| Ok(v) => v, | ||
| Err(e) => bail!("Invalid WITH option {}={}: {}", config.name, v, e), | ||
| }, | ||
| None => continue, | ||
| None => match (config.default)() { | ||
| Some(v) => v, | ||
| None => { | ||
| continue; | ||
| } | ||
| }, | ||
| }; | ||
| out.insert(config.get_key(), value); | ||
| } | ||
|
|
@@ -134,7 +149,8 @@ pub fn extract_config( | |
| // The range of values comes from `statistics.interval.ms` in | ||
| // https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md | ||
| ValType::Number(0, 86_400_000), | ||
| ), | ||
| ) | ||
| .default(|| Some(chrono::Duration::seconds(1).num_milliseconds().to_string())), | ||
| Config::new( | ||
| "topic_metadata_refresh_interval_ms", | ||
| // The range of values comes from `topic.metadata.refresh.interval.ms` in | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These imports look like cruft from whatever your IDE is? No need to import either! The first is in the standard prelude, and the second is a crate root so implicitly in scope.