File tree Expand file tree Collapse file tree 2 files changed +24
-4
lines changed Expand file tree Collapse file tree 2 files changed +24
-4
lines changed Original file line number Diff line number Diff line change @@ -150,8 +150,15 @@ impl SourceReader<Vec<u8>> for KafkaSourceReader {
150150 _ => ( ) ,
151151 }
152152
153- let partition_stats =
154- & statistics. topics [ self . topic_name . as_str ( ) ] . partitions [ & part. pid ] ;
153+ let topic_stats = match statistics. topics . get ( self . topic_name . as_str ( ) ) {
154+ Some ( t) => t,
155+ None => continue ,
156+ } ;
157+
158+ let partition_stats = match topic_stats. partitions . get ( & part. pid ) {
159+ Some ( p) => p,
160+ None => continue ,
161+ } ;
155162
156163 logger. log ( MaterializedEvent :: KafkaConsumerInfo {
157164 consumer_name : statistics. name . to_string ( ) ,
Original file line number Diff line number Diff line change @@ -299,7 +299,7 @@ $ kafka-ingest format=avro topic=non-dbz-data-varying-partition schema=${non-dbz
299299# Erroneously adds start_offsets for non-existent partitions.
300300> CREATE MATERIALIZED SOURCE non_dbz_data_varying_partition_2
301301 FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}'
302- WITH (start_offset=[0,1])
302+ WITH (start_offset=[0,1], statistics_interval_ms = 1000 )
303303 FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
304304 ENVELOPE NONE
305305
3333335 6
3343349 10
335335
336+ # There should be two partitions per consumer
337+ > SELECT count(*) FROM mz_kafka_consumer_statistics GROUP BY consumer_name;
338+ count
339+ -----
340+ 2
341+
336342> CREATE MATERIALIZED SOURCE non_dbz_data_varying_partition_3
337343 FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-non-dbz-data-varying-partition-${testdrive.seed}'
338- WITH (start_offset=[1,1])
344+ WITH (start_offset=[1,1], statistics_interval_ms = 1000 )
339345 FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
340346 ENVELOPE NONE
341347
3523589 10
35335911 12
354360
361+ # There should be metrics for 3 partitions per consumer
362+ > SELECT count(*) FROM mz_kafka_consumer_statistics GROUP BY consumer_name;
363+ count
364+ -----
365+ 3
366+ 3
367+
355368$ set-sql-timeout duration=12.7s
356369
357370# Source with new-style three-valued "snapshot".
You can’t perform that action at this time.
0 commit comments