Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/dataflow/src/source/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,15 @@ impl SourceReader<Vec<u8>> for KafkaSourceReader {
_ => (),
}

let partition_stats =
&statistics.topics[self.topic_name.as_str()].partitions[&part.pid];
let topic_stats = match statistics.topics.get(self.topic_name.as_str()) {
Some(t) => t,
None => continue,
};

let partition_stats = match topic_stats.partitions.get(&part.pid) {
Some(p) => p,
None => continue,
};

logger.log(MaterializedEvent::KafkaConsumerInfo {
consumer_name: statistics.name.to_string(),
Expand Down
17 changes: 15 additions & 2 deletions test/testdrive/avro-sources.td
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ $ kafka-ingest format=avro topic=non-dbz-data-varying-partition schema=${non-dbz
# Erroneously adds start_offsets for non-existent partitions.
> CREATE MATERIALIZED SOURCE non_dbz_data_varying_partition_2
FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}'
WITH (start_offset=[0,1])
WITH (start_offset=[0,1], statistics_interval_ms = 1000)
FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
ENVELOPE NONE

Expand Down Expand Up @@ -333,9 +333,15 @@ a b
5 6
9 10

# There should be two partitions per consumer
> SELECT count(*) FROM mz_kafka_consumer_statistics GROUP BY consumer_name;
count
-----
2

> CREATE MATERIALIZED SOURCE non_dbz_data_varying_partition_3
FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}'
WITH (start_offset=[1,1])
WITH (start_offset=[1,1], statistics_interval_ms = 1000)
FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
ENVELOPE NONE

Expand All @@ -352,6 +358,13 @@ a b
9 10
11 12

# There should be metrics for 3 partitions per consumer
> SELECT count(*) FROM mz_kafka_consumer_statistics GROUP BY consumer_name;
count
-----
3
3

$ set-sql-timeout duration=12.7s

# Source with new-style three-valued "snapshot".
Expand Down