Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
93fb82a
Log basic metrics instead of extended statistics
cirego Mar 17, 2021
dc50905
Merge branch 'main' into chris/system-table-kafka-metrics
cirego Mar 17, 2021
ee80890
Add mz_kafka_statistics built-in log / source
cirego Mar 17, 2021
23c3970
Remove change for debugging
cirego Mar 18, 2021
0388f52
Put consumer in the builtin log name
cirego Mar 18, 2021
8945be4
Tracking kafka consumer statistics now works
cirego Mar 18, 2021
f73c6f6
Fixup test_persistence
cirego Mar 18, 2021
045293d
Merge branch 'main' into chris/system-table-kafka-metrics
cirego Mar 22, 2021
9f34b96
Log per-partition kafka consumer metrics
cirego Mar 22, 2021
f09fe85
Merge branch 'main' into chris/system-table-kafka-metrics
cirego Mar 22, 2021
e6fe16a
Record kafka consumer metrics by default
cirego Mar 22, 2021
dbec829
Merge branch 'main' into chris/system-table-kafka-metrics
cirego Mar 22, 2021
351837c
Merge branch 'chris/system-table-kafka-metrics' into chris/default_st…
cirego Mar 22, 2021
3713dbe
Derive default values for stats object
cirego Mar 22, 2021
1c38647
Merge branch 'main' into chris/system-table-kafka-metrics
cirego Mar 22, 2021
80d07f9
Merge branch 'chris/system-table-kafka-metrics' into chris/default_st…
cirego Mar 22, 2021
77d3c5e
Remove commented out log line
cirego Mar 22, 2021
2b94899
Merge branch 'chris/system-table-kafka-metrics' into chris/default_st…
cirego Mar 23, 2021
73165d1
Merge branch 'main' into chris/default_statistics_interval
cirego Mar 23, 2021
e8afcea
Use chrono instead of hardcoded string
cirego Mar 23, 2021
7fd2ba4
Remove completed TODO :)
cirego Mar 23, 2021
ab36134
Use Option<String> instead of lambda
cirego Mar 23, 2021
50f4712
Merge branch 'main' into chris/default_statistics_interval
cirego Mar 24, 2021
33350b1
Remove unneeded use statements
cirego Mar 24, 2021
1b45481
Bump timeout to see if test passes
cirego Mar 24, 2021
80f3100
Another debugging commit for CI
cirego Mar 24, 2021
9c8fe7c
Merge branch 'main' into chris/default_statistics_interval
cirego Mar 25, 2021
3f94cb1
Update release comment / merge main
cirego Mar 25, 2021
478628d
Remove debuggingm commit
cirego Mar 25, 2021
2736dbc
Add upgrade test for kafka stats interval
cirego Mar 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions src/sql/src/kafka_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ use std::collections::BTreeMap;
use std::convert;
use std::fs::File;
use std::io::Read;
use std::option::Option;
use std::sync::Mutex;

use anyhow::bail;
use chrono;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These imports look like cruft from whatever your IDE is? No need to import either! The first is in the standard prelude, and the second is a crate root so implicitly in scope.

use log::{debug, error, info, warn};
use rdkafka::client::ClientContext;
use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
Expand All @@ -36,11 +38,12 @@ enum ValType {

// Describes Kafka cluster configurations users can suppply using `CREATE
// SOURCE...WITH (option_list)`.
// TODO(sploiselle): Support overriding keys, default values.
// TODO(sploiselle): Support overriding keys.
struct Config {
name: &'static str,
val_type: ValType,
transform: fn(String) -> String,
default: fn() -> Option<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random drive-by, but why a fn -> Option<String> instead of an Option<String>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great question and the answer is that this is vestigial. There's no need to make this a function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now an Option<String>.

}

impl Config {
Expand All @@ -49,6 +52,7 @@ impl Config {
name,
val_type,
transform: convert::identity,
default: || -> Option<String> { None },
}
}

Expand All @@ -69,6 +73,12 @@ impl Config {
self
}

// Allows for returning a default value for this configuration option
fn default(mut self, f: fn() -> Option<String>) -> Self {
self.default = f;
self
}

// Get the appropriate String to use as the Kafka config key.
fn get_key(&self) -> String {
self.name.replace("_", ".")
Expand Down Expand Up @@ -105,7 +115,12 @@ fn extract(
Ok(v) => v,
Err(e) => bail!("Invalid WITH option {}={}: {}", config.name, v, e),
},
None => continue,
None => match (config.default)() {
Some(v) => v,
None => {
continue;
}
},
};
out.insert(config.get_key(), value);
}
Expand Down Expand Up @@ -134,7 +149,8 @@ pub fn extract_config(
// The range of values comes from `statistics.interval.ms` in
// https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
ValType::Number(0, 86_400_000),
),
)
.default(|| Some(chrono::Duration::seconds(1).num_milliseconds().to_string())),
Config::new(
"topic_metadata_refresh_interval_ms",
// The range of values comes from `topic.metadata.refresh.interval.ms` in
Expand Down
1 change: 0 additions & 1 deletion test/testdrive/kafka-stats.td
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ $ kafka-create-topic topic=data

> CREATE SOURCE data
FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-data-${testdrive.seed}'
WITH (statistics_interval_ms = 1000)
FORMAT AVRO USING SCHEMA '${schema}'

> CREATE MATERIALIZED VIEW test1 AS
Expand Down