Skip to content

Commit cf0b566

Browse files
authored
Default to capturing Kafka consumer metrics once per second (#6192)
This will take effect for all Kafka sources that have not already defined statistics_interval_ms.
1 parent 940344d commit cf0b566

File tree

5 files changed

+86
-9
lines changed

5 files changed

+86
-9
lines changed

doc/user/content/release-notes.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ Wrap your release notes at the 80 character mark.
4949
{{% version-header v0.7.2 %}}
5050

5151
- Record Kafka Consumer metrics in the `mz_kafka_consumer_statistics` system
52-
table.
52+
table. Enabled by default for all Kafka sources.
5353

5454
- Add the [`jsonb_object_agg`](/sql/functions/jsonb_object_agg) function to
5555
aggregate rows into a JSON object.

src/sql/src/kafka_util.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@ enum ValType {
3636

3737
// Describes Kafka cluster configurations users can suppply using `CREATE
3838
// SOURCE...WITH (option_list)`.
39-
// TODO(sploiselle): Support overriding keys, default values.
39+
// TODO(sploiselle): Support overriding keys.
4040
struct Config {
4141
name: &'static str,
4242
val_type: ValType,
4343
transform: fn(String) -> String,
44+
default: Option<String>,
4445
}
4546

4647
impl Config {
@@ -49,6 +50,7 @@ impl Config {
4950
name,
5051
val_type,
5152
transform: convert::identity,
53+
default: None,
5254
}
5355
}
5456

@@ -69,6 +71,12 @@ impl Config {
6971
self
7072
}
7173

74+
// Allows for returning a default value for this configuration option
75+
fn set_default(mut self, d: Option<String>) -> Self {
76+
self.default = d;
77+
self
78+
}
79+
7280
// Get the appropriate String to use as the Kafka config key.
7381
fn get_key(&self) -> String {
7482
self.name.replace("_", ".")
@@ -105,7 +113,12 @@ fn extract(
105113
Ok(v) => v,
106114
Err(e) => bail!("Invalid WITH option {}={}: {}", config.name, v, e),
107115
},
108-
None => continue,
116+
None => match &config.default {
117+
Some(v) => v.to_string(),
118+
None => {
119+
continue;
120+
}
121+
},
109122
};
110123
out.insert(config.get_key(), value);
111124
}
@@ -134,7 +147,10 @@ pub fn extract_config(
134147
// The range of values comes from `statistics.interval.ms` in
135148
// https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
136149
ValType::Number(0, 86_400_000),
137-
),
150+
)
151+
.set_default(Some(
152+
chrono::Duration::seconds(1).num_milliseconds().to_string(),
153+
)),
138154
Config::new(
139155
"topic_metadata_refresh_interval_ms",
140156
// The range of values comes from `topic.metadata.refresh.interval.ms` in

test/catalog-compat/catcompatck/catcompatck

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,42 @@ testdrive --no-reset <<'EOF'
135135
> CREATE MATERIALIZED VIEW v AS WITH u AS (SELECT * FROM real_time_src) SELECT * from u;
136136
EOF
137137

138+
say "creating Kafka source with metrics disabled"
139+
testdrive --no-reset <<'EOF'
140+
# Copy schema from above, golden010 catalog doesn't support statistics_interval_ms
141+
$ set schema={
142+
"type": "record",
143+
"name": "envelope",
144+
"fields": [
145+
{
146+
"name": "before",
147+
"type": [
148+
{
149+
"name": "row",
150+
"type": "record",
151+
"fields": [
152+
{"name": "a", "type": "long"},
153+
{"name": "b", "type": "long"}
154+
]
155+
},
156+
"null"
157+
]
158+
},
159+
{ "name": "after", "type": ["row", "null"] }
160+
]
161+
}
162+
163+
> CREATE SOURCE real_time_src_no_stats
164+
FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-real-time-${testdrive.seed}'
165+
WITH (statistics_interval_ms = 0)
166+
FORMAT AVRO USING SCHEMA '${schema}'
167+
ENVELOPE DEBEZIUM
168+
169+
> CREATE MATERIALIZED VIEW real_time_no_stats AS
170+
SELECT *, concat(a::text, CAST(b AS text)) AS c
171+
FROM real_time_src_no_stats
172+
EOF
173+
138174
say "killing materialized-golden071"
139175
kill_materialized
140176

@@ -164,4 +200,12 @@ a b
164200
2 1
165201
3 1
166202
1 2
203+
204+
# Kafka metrics for the real_time_src should be enabled now
205+
# Count should be 2 because there are two materialized views on real_time_src
206+
# If real_time_src_no_stats were also emitting stats, there would be 3 rows
207+
> SELECT count(*) FROM mz_kafka_consumer_statistics;
208+
count
209+
-----
210+
2
167211
EOF

test/testdrive/avro-sources.td

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ $ kafka-ingest format=avro topic=non-dbz-data-varying-partition schema=${non-dbz
299299
# Erroneously adds start_offsets for non-existent partitions.
300300
> CREATE MATERIALIZED SOURCE non_dbz_data_varying_partition_2
301301
FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}'
302-
WITH (start_offset=[0,1], statistics_interval_ms = 1000)
302+
WITH (start_offset=[0,1])
303303
FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
304304
ENVELOPE NONE
305305

@@ -333,15 +333,24 @@ a b
333333
5 6
334334
9 10
335335

336-
# There should be two partitions per consumer
336+
# There should be two partitions for the last created source / consumer (non_dbz_data_varying_partition_2)
337337
> SELECT count(*) FROM mz_kafka_consumer_statistics GROUP BY consumer_name;
338338
count
339339
-----
340+
1
341+
1
342+
1
343+
1
344+
2
345+
2
346+
2
347+
2
348+
2
340349
2
341350

342351
> CREATE MATERIALIZED SOURCE non_dbz_data_varying_partition_3
343352
FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}'
344-
WITH (start_offset=[1,1], statistics_interval_ms = 1000)
353+
WITH (start_offset=[1,1])
345354
FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
346355
ENVELOPE NONE
347356

@@ -358,10 +367,19 @@ a b
358367
9 10
359368
11 12
360369

361-
# There should be metrics for 3 partitions per consumer
370+
# There should three partitions for the last three sources / consumers (non_dbz_data_varying_partition_[123])
362371
> SELECT count(*) FROM mz_kafka_consumer_statistics GROUP BY consumer_name;
363372
count
364373
-----
374+
1
375+
1
376+
1
377+
1
378+
2
379+
2
380+
2
381+
2
382+
3
365383
3
366384
3
367385

test/testdrive/kafka-stats.td

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ $ kafka-create-topic topic=data
2222

2323
> CREATE SOURCE data
2424
FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-data-${testdrive.seed}'
25-
WITH (statistics_interval_ms = 1000)
2625
FORMAT AVRO USING SCHEMA '${schema}'
2726

2827
> CREATE MATERIALIZED VIEW test1 AS

0 commit comments

Comments
 (0)