Skip to content

Conversation

@cirego
Copy link
Contributor

@cirego cirego commented Mar 23, 2021

Now that #6131 has landed, and we've run benchmarks showing that the slowdown is minimal, we should default to collecting these metrics for Kafka sources.

One open question: do we want to turn this on for existing sources too?


This change is Reviewable

cirego and others added 20 commits March 17, 2021 12:41
Instead of trying to create a new Logger class that implements Send +
Sync, use crossbeam to send messages from the callback context into the
source info context. This will activate the source, triggering the
source to read messages from the callback and update the system log
tables. When a source is dropped, the statistics are removed.
We no longer need the print message (that's what this PR replaces!)
@cirego cirego requested review from elindsey and umanwizard March 23, 2021 22:12
@cirego cirego self-assigned this Mar 23, 2021
@cirego
Copy link
Contributor Author

cirego commented Mar 23, 2021

visualization (2)

Benchmark results comparing main, this PR (None), and our two previous releases.

name: &'static str,
val_type: ValType,
transform: fn(String) -> String,
default: fn() -> Option<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random drive-by, but why a fn -> Option<String> instead of an Option<String>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great question and the answer is that this is vestigial. There's no need to make this a function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now an Option<String>.

use std::sync::Mutex;

use anyhow::bail;
use chrono;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These imports look like cruft from whatever your IDE is? No need to import either! The first is in the standard prelude, and the second is a crate root so implicitly in scope.

)
.set_default(Some(
chrono::Duration::seconds(1).num_milliseconds().to_string(),
)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(this seems like a rather roundabout way to spell "1000", but leave it if you feel it is clearer as is)

@benesch
Copy link
Contributor

benesch commented Mar 24, 2021

I'm definitely in favor of turning this on for existing sources (which, unless I'm mistaken, is the current behavior of this PR).

@cirego
Copy link
Contributor Author

cirego commented Mar 24, 2021

I'm definitely in favor of turning this on for existing sources (which, unless I'm mistaken, is the current behavior of this PR).

Yes, this will enable statistics for any sources that didn't explicitly set the value to 0 at create time.

@benesch
Copy link
Contributor

benesch commented Mar 24, 2021

Looks like materialized crashed while testdrive was running:

materialized_1     | 2021-03-24T21:12:47.414966Z ERROR panic: no entry found for key
materialized_1     | thread: timely:work-0
materialized_1     | location: src/dataflow/src/source/kafka.rs:154:26
materialized_1     | version: 0.7.2-dev (33350b13ca7efd117e411371b0a3c7b59202472d)
materialized_1     | backtrace:
materialized_1     |    0: materialized::handle_panic
materialized_1     |              at var/lib/buildkite-agent/builds/buildkite-builders-i-07ebc912e8c6ad411-1/materialize/tests/src/materialized/src/bin/materialized/main.rs:651:21
materialized_1     |    1: core::ops::function::Fn::call
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/core/src/ops/function.rs:70:5
materialized_1     |    2: std::panicking::rust_panic_with_hook
materialized_1     |              at rustc/e1884a8e3c3e813aada8254edfa120e85bf5ffca/library/std/src/panicking.rs:597:17
materialized_1     |    3: std::panicking::begin_panic_handler::{{closure}}
materialized_1     |              at rustc/e1884a8e3c3e813aada8254edfa120e85bf5ffca/library/std/src/panicking.rs:499:13
materialized_1     |    4: std::sys_common::backtrace::__rust_end_short_backtrace
materialized_1     |              at rustc/e1884a8e3c3e813aada8254edfa120e85bf5ffca/library/std/src/sys_common/backtrace.rs:141:18
materialized_1     |    5: rust_begin_unwind
materialized_1     |              at rustc/e1884a8e3c3e813aada8254edfa120e85bf5ffca/library/std/src/panicking.rs:495:5
materialized_1     |    6: core::panicking::panic_fmt
materialized_1     |              at rustc/e1884a8e3c3e813aada8254edfa120e85bf5ffca/library/core/src/panicking.rs:92:14
materialized_1     |    7: core::option::expect_failed
materialized_1     |              at rustc/e1884a8e3c3e813aada8254edfa120e85bf5ffca/library/core/src/option.rs:1260:5
materialized_1     |    8: core::option::Option<T>::expect
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/core/src/option.rs:349:21
materialized_1     |       <std::collections::hash::map::HashMap<K,V,S> as core::ops::index::Index<&Q>>::index
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/std/src/collections/hash/map.rs:1096:9
materialized_1     |       <dataflow::source::kafka::KafkaSourceReader as dataflow::source::SourceReader<alloc::vec::Vec<u8>>>::get_next_message
materialized_1     |              at var/lib/buildkite-agent/builds/buildkite-builders-i-07ebc912e8c6ad411-1/materialize/tests/src/dataflow/src/source/kafka.rs:154:26
materialized_1     |    9: dataflow::source::create_source::{{closure}}::{{closure}}
materialized_1     |              at var/lib/buildkite-agent/builds/buildkite-builders-i-07ebc912e8c6ad411-1/materialize/tests/src/dataflow/src/source/mod.rs:1410:27
materialized_1     |       dataflow::source::util::source::{{closure}}::{{closure}}
materialized_1     |              at var/lib/buildkite-agent/builds/buildkite-builders-i-07ebc912e8c6ad411-1/materialize/tests/src/dataflow/src/source/util.rs:79:45
materialized_1     |       timely::dataflow::operators::generic::operator::source::{{closure}}::{{closure}}
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/dataflow/operators/generic/operator.rs:576:13
materialized_1     |       timely::dataflow::operators::generic::builder_rc::OperatorBuilder<G>::build::{{closure}}::{{closure}}
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/dataflow/operators/generic/builder_rc.rs:123:31
materialized_1     |       timely::dataflow::operators::generic::builder_rc::OperatorBuilder<G>::build_reschedule::{{closure}}
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/dataflow/operators/generic/builder_rc.rs:163:26
materialized_1     |       <timely::dataflow::operators::generic::builder_raw::OperatorCore<T,L> as timely::scheduling::Schedule>::schedule
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/dataflow/operators/generic/builder_raw.rs:203:9
materialized_1     |   10: timely::progress::subgraph::PerOperatorState<T>::schedule
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/progress/subgraph.rs:656:30
materialized_1     |   11: timely::progress::subgraph::Subgraph<TOuter,TInner>::activate_child
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/progress/subgraph.rs:338:26
materialized_1     |       <timely::progress::subgraph::Subgraph<TOuter,TInner> as timely::scheduling::Schedule>::schedule
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/progress/subgraph.rs:304:17
materialized_1     |   12: timely::progress::subgraph::PerOperatorState<T>::schedule
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/progress/subgraph.rs:656:30
materialized_1     |   13: timely::progress::subgraph::Subgraph<TOuter,TInner>::activate_child
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/progress/subgraph.rs:338:26
materialized_1     |       <timely::progress::subgraph::Subgraph<TOuter,TInner> as timely::scheduling::Schedule>::schedule
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/progress/subgraph.rs:304:17
materialized_1     |   14: timely::worker::Wrapper::step::{{closure}}
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/worker.rs:747:57
materialized_1     |       core::option::Option<T>::map
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/core/src/option.rs:453:29
materialized_1     |       timely::worker::Wrapper::step
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/worker.rs:747:26
materialized_1     |   15: timely::worker::Worker<A>::step_or_park
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/worker.rs:395:38
materialized_1     |   16: dataflow::server::Worker<A>::run
materialized_1     |              at var/lib/buildkite-agent/builds/buildkite-builders-i-07ebc912e8c6ad411-1/materialize/tests/src/dataflow/src/server.rs:424:13
materialized_1     |   17: dataflow::server::serve::{{closure}}
materialized_1     |              at var/lib/buildkite-agent/builds/buildkite-builders-i-07ebc912e8c6ad411-1/materialize/tests/src/dataflow/src/server.rs:206:13
materialized_1     |       timely::execute::execute::{{closure}}
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/execute.rs:285:22
materialized_1     |       timely_communication::initialize::initialize_from::{{closure}}
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/communication/src/initialize.rs:285:33
materialized_1     |       std::sys_common::backtrace::__rust_begin_short_backtrace
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/std/src/sys_common/backtrace.rs:125:18
materialized_1     |   18: std::thread::Builder::spawn_unchecked::{{closure}}::{{closure}}
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/std/src/thread/mod.rs:474:17
materialized_1     |       <std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/std/src/panic.rs:322:9
materialized_1     |       std::panicking::try::do_call
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/std/src/panicking.rs:381:40
materialized_1     |       std::panicking::try
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/std/src/panicking.rs:345:19
materialized_1     |       std::panic::catch_unwind
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/std/src/panic.rs:396:14
materialized_1     |       std::thread::Builder::spawn_unchecked::{{closure}}
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/std/src/thread/mod.rs:473:30
materialized_1     |       core::ops::function::FnOnce::call_once{{vtable.shim}}
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/core/src/ops/function.rs:227:5
materialized_1     |   19: <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once
materialized_1     |              at rustc/e1884a8e3c3e813aada8254edfa120e85bf5ffca/library/alloc/src/boxed.rs:1307:9
materialized_1     |       <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once
materialized_1     |              at rustc/e1884a8e3c3e813aada8254edfa120e85bf5ffca/library/alloc/src/boxed.rs:1307:9
materialized_1     |       std::sys::unix::thread::Thread::new::thread_start
materialized_1     |              at rustc/e1884a8e3c3e813aada8254edfa120e85bf5ffca/library/std/src/sys/unix/thread.rs:71:17
materialized_1     |   20: start_thread
materialized_1     |   21: __clone
materialized_1     |     
materialized_1     | materialized encountered an internal error and crashed.
materialized_1     | 
materialized_1     | We rely on bug reports to diagnose and fix these errors. Please
materialized_1     | copy and paste the above details and file a report at:
materialized_1     | 
materialized_1     |     https://materialize.com/s/bug
materialized_1     | 

@cirego
Copy link
Contributor Author

cirego commented Mar 24, 2021

Looks like materialized crashed while testdrive was running:

materialized_1     | 2021-03-24T21:12:47.414966Z ERROR panic: no entry found for key
materialized_1     | thread: timely:work-0
materialized_1     | location: src/dataflow/src/source/kafka.rs:154:26
materialized_1     | version: 0.7.2-dev (33350b13ca7efd117e411371b0a3c7b59202472d)
materialized_1     | backtrace:
materialized_1     |    0: materialized::handle_panic
materialized_1     |              at var/lib/buildkite-agent/builds/buildkite-builders-i-07ebc912e8c6ad411-1/materialize/tests/src/materialized/src/bin/materialized/main.rs:651:21
materialized_1     |    1: core::ops::function::Fn::call
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/core/src/ops/function.rs:70:5
materialized_1     |    2: std::panicking::rust_panic_with_hook
materialized_1     |              at rustc/e1884a8e3c3e813aada8254edfa120e85bf5ffca/library/std/src/panicking.rs:597:17
materialized_1     |    3: std::panicking::begin_panic_handler::{{closure}}
materialized_1     |              at rustc/e1884a8e3c3e813aada8254edfa120e85bf5ffca/library/std/src/panicking.rs:499:13
materialized_1     |    4: std::sys_common::backtrace::__rust_end_short_backtrace
materialized_1     |              at rustc/e1884a8e3c3e813aada8254edfa120e85bf5ffca/library/std/src/sys_common/backtrace.rs:141:18
materialized_1     |    5: rust_begin_unwind
materialized_1     |              at rustc/e1884a8e3c3e813aada8254edfa120e85bf5ffca/library/std/src/panicking.rs:495:5
materialized_1     |    6: core::panicking::panic_fmt
materialized_1     |              at rustc/e1884a8e3c3e813aada8254edfa120e85bf5ffca/library/core/src/panicking.rs:92:14
materialized_1     |    7: core::option::expect_failed
materialized_1     |              at rustc/e1884a8e3c3e813aada8254edfa120e85bf5ffca/library/core/src/option.rs:1260:5
materialized_1     |    8: core::option::Option<T>::expect
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/core/src/option.rs:349:21
materialized_1     |       <std::collections::hash::map::HashMap<K,V,S> as core::ops::index::Index<&Q>>::index
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/std/src/collections/hash/map.rs:1096:9
materialized_1     |       <dataflow::source::kafka::KafkaSourceReader as dataflow::source::SourceReader<alloc::vec::Vec<u8>>>::get_next_message
materialized_1     |              at var/lib/buildkite-agent/builds/buildkite-builders-i-07ebc912e8c6ad411-1/materialize/tests/src/dataflow/src/source/kafka.rs:154:26
materialized_1     |    9: dataflow::source::create_source::{{closure}}::{{closure}}
materialized_1     |              at var/lib/buildkite-agent/builds/buildkite-builders-i-07ebc912e8c6ad411-1/materialize/tests/src/dataflow/src/source/mod.rs:1410:27
materialized_1     |       dataflow::source::util::source::{{closure}}::{{closure}}
materialized_1     |              at var/lib/buildkite-agent/builds/buildkite-builders-i-07ebc912e8c6ad411-1/materialize/tests/src/dataflow/src/source/util.rs:79:45
materialized_1     |       timely::dataflow::operators::generic::operator::source::{{closure}}::{{closure}}
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/dataflow/operators/generic/operator.rs:576:13
materialized_1     |       timely::dataflow::operators::generic::builder_rc::OperatorBuilder<G>::build::{{closure}}::{{closure}}
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/dataflow/operators/generic/builder_rc.rs:123:31
materialized_1     |       timely::dataflow::operators::generic::builder_rc::OperatorBuilder<G>::build_reschedule::{{closure}}
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/dataflow/operators/generic/builder_rc.rs:163:26
materialized_1     |       <timely::dataflow::operators::generic::builder_raw::OperatorCore<T,L> as timely::scheduling::Schedule>::schedule
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/dataflow/operators/generic/builder_raw.rs:203:9
materialized_1     |   10: timely::progress::subgraph::PerOperatorState<T>::schedule
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/progress/subgraph.rs:656:30
materialized_1     |   11: timely::progress::subgraph::Subgraph<TOuter,TInner>::activate_child
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/progress/subgraph.rs:338:26
materialized_1     |       <timely::progress::subgraph::Subgraph<TOuter,TInner> as timely::scheduling::Schedule>::schedule
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/progress/subgraph.rs:304:17
materialized_1     |   12: timely::progress::subgraph::PerOperatorState<T>::schedule
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/progress/subgraph.rs:656:30
materialized_1     |   13: timely::progress::subgraph::Subgraph<TOuter,TInner>::activate_child
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/progress/subgraph.rs:338:26
materialized_1     |       <timely::progress::subgraph::Subgraph<TOuter,TInner> as timely::scheduling::Schedule>::schedule
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/progress/subgraph.rs:304:17
materialized_1     |   14: timely::worker::Wrapper::step::{{closure}}
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/worker.rs:747:57
materialized_1     |       core::option::Option<T>::map
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/core/src/option.rs:453:29
materialized_1     |       timely::worker::Wrapper::step
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/worker.rs:747:26
materialized_1     |   15: timely::worker::Worker<A>::step_or_park
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/worker.rs:395:38
materialized_1     |   16: dataflow::server::Worker<A>::run
materialized_1     |              at var/lib/buildkite-agent/builds/buildkite-builders-i-07ebc912e8c6ad411-1/materialize/tests/src/dataflow/src/server.rs:424:13
materialized_1     |   17: dataflow::server::serve::{{closure}}
materialized_1     |              at var/lib/buildkite-agent/builds/buildkite-builders-i-07ebc912e8c6ad411-1/materialize/tests/src/dataflow/src/server.rs:206:13
materialized_1     |       timely::execute::execute::{{closure}}
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/timely/src/execute.rs:285:22
materialized_1     |       timely_communication::initialize::initialize_from::{{closure}}
materialized_1     |              at cargo/git/checkouts/timely-dataflow-4c0cc365061cd263/ac0a326/communication/src/initialize.rs:285:33
materialized_1     |       std::sys_common::backtrace::__rust_begin_short_backtrace
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/std/src/sys_common/backtrace.rs:125:18
materialized_1     |   18: std::thread::Builder::spawn_unchecked::{{closure}}::{{closure}}
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/std/src/thread/mod.rs:474:17
materialized_1     |       <std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/std/src/panic.rs:322:9
materialized_1     |       std::panicking::try::do_call
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/std/src/panicking.rs:381:40
materialized_1     |       std::panicking::try
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/std/src/panicking.rs:345:19
materialized_1     |       std::panic::catch_unwind
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/std/src/panic.rs:396:14
materialized_1     |       std::thread::Builder::spawn_unchecked::{{closure}}
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/std/src/thread/mod.rs:473:30
materialized_1     |       core::ops::function::FnOnce::call_once{{vtable.shim}}
materialized_1     |              at usr/local/lib/rustlib/src/rust/library/core/src/ops/function.rs:227:5
materialized_1     |   19: <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once
materialized_1     |              at rustc/e1884a8e3c3e813aada8254edfa120e85bf5ffca/library/alloc/src/boxed.rs:1307:9
materialized_1     |       <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once
materialized_1     |              at rustc/e1884a8e3c3e813aada8254edfa120e85bf5ffca/library/alloc/src/boxed.rs:1307:9
materialized_1     |       std::sys::unix::thread::Thread::new::thread_start
materialized_1     |              at rustc/e1884a8e3c3e813aada8254edfa120e85bf5ffca/library/std/src/sys/unix/thread.rs:71:17
materialized_1     |   20: start_thread
materialized_1     |   21: __clone
materialized_1     |     
materialized_1     | materialized encountered an internal error and crashed.
materialized_1     | 
materialized_1     | We rely on bug reports to diagnose and fix these errors. Please
materialized_1     | copy and paste the above details and file a report at:
materialized_1     | 
materialized_1     |     https://materialize.com/s/bug
materialized_1     | 

Thank you! That really helps!

Any idea why this wasn't crashing when I ran this locally? I can try running this using ci-builder.

@benesch
Copy link
Contributor

benesch commented Mar 24, 2021

No idea, sorry!

@cirego
Copy link
Contributor Author

cirego commented Mar 24, 2021

Hooray! It's either a race condition when the number of topics changes and/or librdkafka / rdkafka does not update the metrics to include the new partition.

The stats object only has information for partitions 0 and -1, but we are looking for partition 1.

materialized_1     | 2021-03-24T21:56:58.875426Z  INFO dataflow::source::kafka: Topic: testdrive-non-dbz-data-varying-partition-801619974, Partition: 1                                                                                       
materialized_1     | 2021-03-24T21:56:58.900070Z  INFO librdkafka: librdkafka: PARTCNT [thrd:main]: Topic testdrive-non-dbz-data-varying-partition-801619974 partition count changed from 1 to 2                                              
materialized_1     | 2021-03-24T21:56:59.897869Z ERROR panic: no entry found for key 

@cirego cirego merged commit cf0b566 into MaterializeInc:main Mar 25, 2021
@cirego cirego deleted the chris/default_statistics_interval branch March 25, 2021 23:15
@aljoscha
Copy link
Contributor

Was the intent of this to also log the statistics to the regular log?

With this change, my logs only consist of rdkafka stats:

2021-03-30T16:48:27.061200Z  INFO rdkafka::client: Client stats: Statistics { name: "rdkafka#producer-5", client_id: "rdkafka", client_type: "producer", ts: 1617122907060655, time: 1617122907, replyq: 0, msg_cnt: 0, msg_size: 0, msg_max: 10000000, msg_size_max: 17179869184, tx: 13, tx_bytes: 602710, rx: 11, rx_bytes: 1367, txmsgs: 35193, txmsg_bytes: 316737, rxmsgs: 0, rxmsg_bytes: 0, simple_cnt: 0, metadata_cache_cnt: 1, brokers: {"localhost:9092/0": Broker { name: "localhost:9092/0", nodeid: 0, nodename: "localhost:9092", source: "configured", state: "UP", stateage: 10017662, outbuf_cnt: 0, outbuf_msg_cnt: 0, waitresp_cnt: 0, waitresp_msg_cnt: 0, tx: 13, txbytes: 602710, txerrs: 0, txretries: 0, req_timeouts: 0, rx: 11, rxbytes: 1367, rxerrs: 0, rxcorriderrs: 0, rxpartial: 0, req: {"TxnOffsetCommit": 0, "Metadata": 2, "BeginQuorumEpochRequest": 0, "IncrementalAlterConfigsRequest": 0, "UpdateFeaturesRequest": 0, "EnvelopeRequest": 0, "AddOffsetsToTxn": 0, "Offset": 0, "OffsetDeleteRequest": 0, "ApiVersion": 1, "InitProducerId": 1, "SaslAuthenticate": 0, "DescribeClientQuotasRequest": 0, "AlterPartitionReassignmentsRequest": 0, "AddPartitionsToTxn": 0, "SaslHandshake": 0, "EndTxn": 0, "Produce": 7, "ListPartitionReassignmentsRequest": 0, "AlterClientQuotasRequest": 0, "FindCoordinator": 0, "DescribeUserScramCredentialsRequest": 0, "ElectLeadersRequest": 0, "AlterUserScramCredentialsRequest": 0, "VoteRequest": 0, "EndQuorumEpochRequest": 0, "DescribeQuorumRequest": 0, "AlterIsrRequest": 0}, zbuf_grow: 0, buf_grow: 0, wakeups: Some(32), connects: Some(1), disconnects: Some(0), int_latency: Some(Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 14448, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }), outbuf_latency: Some(Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 11376, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }), rtt: Some(Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 13424, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }), throttle: Some(Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 17520, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }), toppars: {"output-sink-1-u2-1617122896-15753450948680609202-0": TopicPartition { topic: "output-sink-1-u2-1617122896-15753450948680609202", partition: 0 }} }}, topics: {"output-sink-1-u2-1617122896-15753450948680609202": Topic { topic: "output-sink-1-u2-1617122896-15753450948680609202", metadata_age: 5015, batchsize: Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 14448, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }, batchcnt: Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 8304, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }, partitions: {-1: Partition { partition: -1, broker: -1, leader: -1, desired: false, unknown: false, msgq_cnt: 0, msgq_bytes: 0, xmit_msgq_cnt: 0, xmit_msgq_bytes: 0, fetchq_cnt: 0, fetchq_size: 0, fetch_state: "none", query_offset: -1001, next_offset: 0, app_offset: -1001, stored_offset: -1001, committed_offset: -1001, eof_offset: -1001, lo_offset: -1001, hi_offset: -1001, ls_offset: -1001, consumer_lag: -1, txmsgs: 0, txbytes: 0, rxmsgs: 0, rxbytes: 0, msgs: 19997, rx_ver_drops: 0, msgs_inflight: 0, next_ack_seq: 0, next_err_seq: 0, acked_msgid: 0 }, 0: Partition { partition: 0, broker: 0, leader: 0, desired: false, unknown: false, msgq_cnt: 0, msgq_bytes: 0, xmit_msgq_cnt: 0, xmit_msgq_bytes: 0, fetchq_cnt: 0, fetchq_size: 0, fetch_state: "none", query_offset: -1001, next_offset: 0, app_offset: -1001, stored_offset: -1001, committed_offset: -1001, eof_offset: -1001, lo_offset: -1001, hi_offset: -1001, ls_offset: -1001, consumer_lag: -1, txmsgs: 35193, txbytes: 316737, rxmsgs: 0, rxbytes: 0, msgs: 35193, rx_ver_drops: 0, msgs_inflight: 0, next_ack_seq: 35193, next_err_seq: 35193, acked_msgid: 35193 }} }}, cgrp: None, eos: Some(ExactlyOnceSemantics { idemp_state: "Assigned", idemp_stateage: 10016, txn_state: "Init", txn_stateage: 1617122907060, txn_may_enq: false, producer_id: 86, producer_epoch: 0, epoch_cnt: 1 }) }
2021-03-30T16:48:28.049806Z  INFO rdkafka::client: Client stats: Statistics { name: "rdkafka#producer-6", client_id: "rdkafka", client_type: "producer", ts: 1617122908049419, time: 1617122908, replyq: 0, msg_cnt: 0, msg_size: 0, msg_max: 10000000, msg_size_max: 17179869184, tx: 3, tx_bytes: 93, rx: 3, rx_bytes: 515, txmsgs: 0, txmsg_bytes: 0, rxmsgs: 0, rxmsg_bytes: 0, simple_cnt: 0, metadata_cache_cnt: 0, brokers: {"localhost:9092/0": Broker { name: "localhost:9092/0", nodeid: 0, nodename: "localhost:9092", source: "configured", state: "UP", stateage: 11006426, outbuf_cnt: 0, outbuf_msg_cnt: 0, waitresp_cnt: 0, waitresp_msg_cnt: 0, tx: 3, txbytes: 93, txerrs: 0, txretries: 0, req_timeouts: 0, rx: 3, rxbytes: 515, rxerrs: 0, rxcorriderrs: 0, rxpartial: 0, req: {"OffsetDeleteRequest": 0, "VoteRequest": 0, "AlterIsrRequest": 0, "EndTxn": 0, "AlterPartitionReassignmentsRequest": 0, "ApiVersion": 1, "AddOffsetsToTxn": 0, "FindCoordinator": 0, "EndQuorumEpochRequest": 0, "IncrementalAlterConfigsRequest": 0, "ListPartitionReassignmentsRequest": 0, "TxnOffsetCommit": 0, "ElectLeadersRequest": 0, "InitProducerId": 1, "AddPartitionsToTxn": 0, "Produce": 0, "AlterClientQuotasRequest": 0, "UpdateFeaturesRequest": 0, "EnvelopeRequest": 0, "DescribeClientQuotasRequest": 0, "Offset": 0, "Metadata": 1, "DescribeQuorumRequest": 0, "SaslAuthenticate": 0, "DescribeUserScramCredentialsRequest": 0, "AlterUserScramCredentialsRequest": 0, "BeginQuorumEpochRequest": 0, "SaslHandshake": 0}, zbuf_grow: 0, buf_grow: 0, wakeups: Some(10), connects: Some(1), disconnects: Some(0), int_latency: Some(Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 11376, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }), outbuf_latency: Some(Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 11376, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }), rtt: Some(Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 13424, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }), throttle: Some(Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 17520, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }), toppars: {} }}, topics: {}, cgrp: None, eos: Some(ExactlyOnceSemantics { idemp_state: "Assigned", idemp_stateage: 11005, txn_state: "Init", txn_stateage: 1617122908049, txn_may_enq: false, producer_id: 85, producer_epoch: 0, epoch_cnt: 1 }) }
2021-03-30T16:48:28.065518Z  INFO rdkafka::client: Client stats: Statistics { name: "rdkafka#producer-5", client_id: "rdkafka", client_type: "producer", ts: 1617122908062952, time: 1617122908, replyq: 0, msg_cnt: 0, msg_size: 0, msg_max: 10000000, msg_size_max: 17179869184, tx: 16, tx_bytes: 772986, rx: 14, rx_bytes: 1667, txmsgs: 45193, txmsg_bytes: 406737, rxmsgs: 0, rxmsg_bytes: 0, simple_cnt: 0, metadata_cache_cnt: 1, brokers: {"localhost:9092/0": Broker { name: "localhost:9092/0", nodeid: 0, nodename: "localhost:9092", source: "configured", state: "UP", stateage: 11019959, outbuf_cnt: 0, outbuf_msg_cnt: 0, waitresp_cnt: 0, waitresp_msg_cnt: 0, tx: 16, txbytes: 772986, txerrs: 0, txretries: 0, req_timeouts: 0, rx: 14, rxbytes: 1667, rxerrs: 0, rxcorriderrs: 0, rxpartial: 0, req: {"ApiVersion": 1, "VoteRequest": 0, "OffsetDeleteRequest": 0, "Produce": 10, "BeginQuorumEpochRequest": 0, "EndQuorumEpochRequest": 0, "SaslHandshake": 0, "ListPartitionReassignmentsRequest": 0, "AlterIsrRequest": 0, "ElectLeadersRequest": 0, "AlterClientQuotasRequest": 0, "DescribeClientQuotasRequest": 0, "AddPartitionsToTxn": 0, "DescribeUserScramCredentialsRequest": 0, "DescribeQuorumRequest": 0, "UpdateFeaturesRequest": 0, "TxnOffsetCommit": 0, "Metadata": 2, "EnvelopeRequest": 0, "AlterPartitionReassignmentsRequest": 0, "EndTxn": 0, "SaslAuthenticate": 0, "Offset": 0, "AddOffsetsToTxn": 0, "FindCoordinator": 0, "InitProducerId": 1, "IncrementalAlterConfigsRequest": 0, "AlterUserScramCredentialsRequest": 0}, zbuf_grow: 0, buf_grow: 0, wakeups: Some(39), connects: Some(1), disconnects: Some(0), int_latency: Some(Window { min: 15, max: 11071, avg: 5657, sum: 56579522, cnt: 10000, stddev: 2913, hdrsize: 14448, p50: 5759, p75: 8191, p90: 9535, p95: 9983, p99: 10751, p99_99: 11071, outofrange: 0 }), outbuf_latency: Some(Window { min: 62, max: 94, avg: 75, sum: 227, cnt: 3, stddev: 13, hdrsize: 11376, p50: 71, p75: 71, p90: 94, p95: 94, p99: 94, p99_99: 94, outofrange: 0 }), rtt: Some(Window { min: 1004, max: 1182, avg: 1065, sum: 3197, cnt: 3, stddev: 81, hdrsize: 13424, p50: 1011, p75: 1011, p90: 1183, p95: 1183, p99: 1183, p99_99: 1183, outofrange: 0 }), throttle: Some(Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 3, stddev: 0, hdrsize: 17520, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }), toppars: {"output-sink-1-u2-1617122896-15753450948680609202-0": TopicPartition { topic: "output-sink-1-u2-1617122896-15753450948680609202", partition: 0 }} }}, topics: {"output-sink-1-u2-1617122896-15753450948680609202": Topic { topic: "output-sink-1-u2-1617122896-15753450948680609202", metadata_age: 6017, batchsize: Window { min: 49093, max: 62166, avg: 56663, sum: 169991, cnt: 3, stddev: 5539, hdrsize: 14448, p50: 58879, p75: 58879, p90: 62207, p95: 62207, p99: 62207, p99_99: 62207, outofrange: 0 }, batchcnt: Window { min: 2888, max: 3657, avg: 3333, sum: 10000, cnt: 3, stddev: 324, hdrsize: 8304, p50: 3455, p75: 3455, p90: 3663, p95: 3663, p99: 3663, p99_99: 3663, outofrange: 0 }, partitions: {-1: Partition { partition: -1, broker: -1, leader: -1, desired: false, unknown: false, msgq_cnt: 0, msgq_bytes: 0, xmit_msgq_cnt: 0, xmit_msgq_bytes: 0, fetchq_cnt: 0, fetchq_size: 0, fetch_state: "none", query_offset: -1001, next_offset: 0, app_offset: -1001, stored_offset: -1001, committed_offset: -1001, eof_offset: -1001, lo_offset: -1001, hi_offset: -1001, ls_offset: -1001, consumer_lag: -1, txmsgs: 0, txbytes: 0, rxmsgs: 0, rxbytes: 0, msgs: 19997, rx_ver_drops: 0, msgs_inflight: 0, next_ack_seq: 0, next_err_seq: 0, acked_msgid: 0 }, 0: Partition { partition: 0, broker: 0, leader: 0, desired: false, unknown: false, msgq_cnt: 0, msgq_bytes: 0, xmit_msgq_cnt: 0, xmit_msgq_bytes: 0, fetchq_cnt: 0, fetchq_size: 0, fetch_state: "none", query_offset: -1001, next_offset: 0, app_offset: -1001, stored_offset: -1001, committed_offset: -1001, eof_offset: -1001, lo_offset: -1001, hi_offset: -1001, ls_offset: -1001, consumer_lag: -1, txmsgs: 45193, txbytes: 406737, rxmsgs: 0, rxbytes: 0, msgs: 45193, rx_ver_drops: 0, msgs_inflight: 0, next_ack_seq: 45193, next_err_seq: 45193, acked_msgid: 45193 }} }}, cgrp: None, eos: Some(ExactlyOnceSemantics { idemp_state: "Assigned", idemp_stateage: 11019, txn_state: "Init", txn_stateage: 1617122908062, txn_may_enq: false, producer_id: 86, producer_epoch: 0, epoch_cnt: 1 }) }

Somehow in Github it doesn't looks as bad but in a shell it's just a wall of text. 😅

@cirego
Copy link
Contributor Author

cirego commented Mar 30, 2021

Was the intent of this to also log the statistics to the regular log?

With this change, my logs only consist of rdkafka stats:

2021-03-30T16:48:27.061200Z  INFO rdkafka::client: Client stats: Statistics { name: "rdkafka#producer-5", client_id: "rdkafka", client_type: "producer", ts: 1617122907060655, time: 1617122907, replyq: 0, msg_cnt: 0, msg_size: 0, msg_max: 10000000, msg_size_max: 17179869184, tx: 13, tx_bytes: 602710, rx: 11, rx_bytes: 1367, txmsgs: 35193, txmsg_bytes: 316737, rxmsgs: 0, rxmsg_bytes: 0, simple_cnt: 0, metadata_cache_cnt: 1, brokers: {"localhost:9092/0": Broker { name: "localhost:9092/0", nodeid: 0, nodename: "localhost:9092", source: "configured", state: "UP", stateage: 10017662, outbuf_cnt: 0, outbuf_msg_cnt: 0, waitresp_cnt: 0, waitresp_msg_cnt: 0, tx: 13, txbytes: 602710, txerrs: 0, txretries: 0, req_timeouts: 0, rx: 11, rxbytes: 1367, rxerrs: 0, rxcorriderrs: 0, rxpartial: 0, req: {"TxnOffsetCommit": 0, "Metadata": 2, "BeginQuorumEpochRequest": 0, "IncrementalAlterConfigsRequest": 0, "UpdateFeaturesRequest": 0, "EnvelopeRequest": 0, "AddOffsetsToTxn": 0, "Offset": 0, "OffsetDeleteRequest": 0, "ApiVersion": 1, "InitProducerId": 1, "SaslAuthenticate": 0, "DescribeClientQuotasRequest": 0, "AlterPartitionReassignmentsRequest": 0, "AddPartitionsToTxn": 0, "SaslHandshake": 0, "EndTxn": 0, "Produce": 7, "ListPartitionReassignmentsRequest": 0, "AlterClientQuotasRequest": 0, "FindCoordinator": 0, "DescribeUserScramCredentialsRequest": 0, "ElectLeadersRequest": 0, "AlterUserScramCredentialsRequest": 0, "VoteRequest": 0, "EndQuorumEpochRequest": 0, "DescribeQuorumRequest": 0, "AlterIsrRequest": 0}, zbuf_grow: 0, buf_grow: 0, wakeups: Some(32), connects: Some(1), disconnects: Some(0), int_latency: Some(Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 14448, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }), outbuf_latency: Some(Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 11376, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }), rtt: Some(Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 13424, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }), throttle: Some(Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 17520, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }), toppars: {"output-sink-1-u2-1617122896-15753450948680609202-0": TopicPartition { topic: "output-sink-1-u2-1617122896-15753450948680609202", partition: 0 }} }}, topics: {"output-sink-1-u2-1617122896-15753450948680609202": Topic { topic: "output-sink-1-u2-1617122896-15753450948680609202", metadata_age: 5015, batchsize: Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 14448, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }, batchcnt: Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 8304, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }, partitions: {-1: Partition { partition: -1, broker: -1, leader: -1, desired: false, unknown: false, msgq_cnt: 0, msgq_bytes: 0, xmit_msgq_cnt: 0, xmit_msgq_bytes: 0, fetchq_cnt: 0, fetchq_size: 0, fetch_state: "none", query_offset: -1001, next_offset: 0, app_offset: -1001, stored_offset: -1001, committed_offset: -1001, eof_offset: -1001, lo_offset: -1001, hi_offset: -1001, ls_offset: -1001, consumer_lag: -1, txmsgs: 0, txbytes: 0, rxmsgs: 0, rxbytes: 0, msgs: 19997, rx_ver_drops: 0, msgs_inflight: 0, next_ack_seq: 0, next_err_seq: 0, acked_msgid: 0 }, 0: Partition { partition: 0, broker: 0, leader: 0, desired: false, unknown: false, msgq_cnt: 0, msgq_bytes: 0, xmit_msgq_cnt: 0, xmit_msgq_bytes: 0, fetchq_cnt: 0, fetchq_size: 0, fetch_state: "none", query_offset: -1001, next_offset: 0, app_offset: -1001, stored_offset: -1001, committed_offset: -1001, eof_offset: -1001, lo_offset: -1001, hi_offset: -1001, ls_offset: -1001, consumer_lag: -1, txmsgs: 35193, txbytes: 316737, rxmsgs: 0, rxbytes: 0, msgs: 35193, rx_ver_drops: 0, msgs_inflight: 0, next_ack_seq: 35193, next_err_seq: 35193, acked_msgid: 35193 }} }}, cgrp: None, eos: Some(ExactlyOnceSemantics { idemp_state: "Assigned", idemp_stateage: 10016, txn_state: "Init", txn_stateage: 1617122907060, txn_may_enq: false, producer_id: 86, producer_epoch: 0, epoch_cnt: 1 }) }
2021-03-30T16:48:28.049806Z  INFO rdkafka::client: Client stats: Statistics { name: "rdkafka#producer-6", client_id: "rdkafka", client_type: "producer", ts: 1617122908049419, time: 1617122908, replyq: 0, msg_cnt: 0, msg_size: 0, msg_max: 10000000, msg_size_max: 17179869184, tx: 3, tx_bytes: 93, rx: 3, rx_bytes: 515, txmsgs: 0, txmsg_bytes: 0, rxmsgs: 0, rxmsg_bytes: 0, simple_cnt: 0, metadata_cache_cnt: 0, brokers: {"localhost:9092/0": Broker { name: "localhost:9092/0", nodeid: 0, nodename: "localhost:9092", source: "configured", state: "UP", stateage: 11006426, outbuf_cnt: 0, outbuf_msg_cnt: 0, waitresp_cnt: 0, waitresp_msg_cnt: 0, tx: 3, txbytes: 93, txerrs: 0, txretries: 0, req_timeouts: 0, rx: 3, rxbytes: 515, rxerrs: 0, rxcorriderrs: 0, rxpartial: 0, req: {"OffsetDeleteRequest": 0, "VoteRequest": 0, "AlterIsrRequest": 0, "EndTxn": 0, "AlterPartitionReassignmentsRequest": 0, "ApiVersion": 1, "AddOffsetsToTxn": 0, "FindCoordinator": 0, "EndQuorumEpochRequest": 0, "IncrementalAlterConfigsRequest": 0, "ListPartitionReassignmentsRequest": 0, "TxnOffsetCommit": 0, "ElectLeadersRequest": 0, "InitProducerId": 1, "AddPartitionsToTxn": 0, "Produce": 0, "AlterClientQuotasRequest": 0, "UpdateFeaturesRequest": 0, "EnvelopeRequest": 0, "DescribeClientQuotasRequest": 0, "Offset": 0, "Metadata": 1, "DescribeQuorumRequest": 0, "SaslAuthenticate": 0, "DescribeUserScramCredentialsRequest": 0, "AlterUserScramCredentialsRequest": 0, "BeginQuorumEpochRequest": 0, "SaslHandshake": 0}, zbuf_grow: 0, buf_grow: 0, wakeups: Some(10), connects: Some(1), disconnects: Some(0), int_latency: Some(Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 11376, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }), outbuf_latency: Some(Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 11376, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }), rtt: Some(Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 13424, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }), throttle: Some(Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 0, stddev: 0, hdrsize: 17520, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }), toppars: {} }}, topics: {}, cgrp: None, eos: Some(ExactlyOnceSemantics { idemp_state: "Assigned", idemp_stateage: 11005, txn_state: "Init", txn_stateage: 1617122908049, txn_may_enq: false, producer_id: 85, producer_epoch: 0, epoch_cnt: 1 }) }
2021-03-30T16:48:28.065518Z  INFO rdkafka::client: Client stats: Statistics { name: "rdkafka#producer-5", client_id: "rdkafka", client_type: "producer", ts: 1617122908062952, time: 1617122908, replyq: 0, msg_cnt: 0, msg_size: 0, msg_max: 10000000, msg_size_max: 17179869184, tx: 16, tx_bytes: 772986, rx: 14, rx_bytes: 1667, txmsgs: 45193, txmsg_bytes: 406737, rxmsgs: 0, rxmsg_bytes: 0, simple_cnt: 0, metadata_cache_cnt: 1, brokers: {"localhost:9092/0": Broker { name: "localhost:9092/0", nodeid: 0, nodename: "localhost:9092", source: "configured", state: "UP", stateage: 11019959, outbuf_cnt: 0, outbuf_msg_cnt: 0, waitresp_cnt: 0, waitresp_msg_cnt: 0, tx: 16, txbytes: 772986, txerrs: 0, txretries: 0, req_timeouts: 0, rx: 14, rxbytes: 1667, rxerrs: 0, rxcorriderrs: 0, rxpartial: 0, req: {"ApiVersion": 1, "VoteRequest": 0, "OffsetDeleteRequest": 0, "Produce": 10, "BeginQuorumEpochRequest": 0, "EndQuorumEpochRequest": 0, "SaslHandshake": 0, "ListPartitionReassignmentsRequest": 0, "AlterIsrRequest": 0, "ElectLeadersRequest": 0, "AlterClientQuotasRequest": 0, "DescribeClientQuotasRequest": 0, "AddPartitionsToTxn": 0, "DescribeUserScramCredentialsRequest": 0, "DescribeQuorumRequest": 0, "UpdateFeaturesRequest": 0, "TxnOffsetCommit": 0, "Metadata": 2, "EnvelopeRequest": 0, "AlterPartitionReassignmentsRequest": 0, "EndTxn": 0, "SaslAuthenticate": 0, "Offset": 0, "AddOffsetsToTxn": 0, "FindCoordinator": 0, "InitProducerId": 1, "IncrementalAlterConfigsRequest": 0, "AlterUserScramCredentialsRequest": 0}, zbuf_grow: 0, buf_grow: 0, wakeups: Some(39), connects: Some(1), disconnects: Some(0), int_latency: Some(Window { min: 15, max: 11071, avg: 5657, sum: 56579522, cnt: 10000, stddev: 2913, hdrsize: 14448, p50: 5759, p75: 8191, p90: 9535, p95: 9983, p99: 10751, p99_99: 11071, outofrange: 0 }), outbuf_latency: Some(Window { min: 62, max: 94, avg: 75, sum: 227, cnt: 3, stddev: 13, hdrsize: 11376, p50: 71, p75: 71, p90: 94, p95: 94, p99: 94, p99_99: 94, outofrange: 0 }), rtt: Some(Window { min: 1004, max: 1182, avg: 1065, sum: 3197, cnt: 3, stddev: 81, hdrsize: 13424, p50: 1011, p75: 1011, p90: 1183, p95: 1183, p99: 1183, p99_99: 1183, outofrange: 0 }), throttle: Some(Window { min: 0, max: 0, avg: 0, sum: 0, cnt: 3, stddev: 0, hdrsize: 17520, p50: 0, p75: 0, p90: 0, p95: 0, p99: 0, p99_99: 0, outofrange: 0 }), toppars: {"output-sink-1-u2-1617122896-15753450948680609202-0": TopicPartition { topic: "output-sink-1-u2-1617122896-15753450948680609202", partition: 0 }} }}, topics: {"output-sink-1-u2-1617122896-15753450948680609202": Topic { topic: "output-sink-1-u2-1617122896-15753450948680609202", metadata_age: 6017, batchsize: Window { min: 49093, max: 62166, avg: 56663, sum: 169991, cnt: 3, stddev: 5539, hdrsize: 14448, p50: 58879, p75: 58879, p90: 62207, p95: 62207, p99: 62207, p99_99: 62207, outofrange: 0 }, batchcnt: Window { min: 2888, max: 3657, avg: 3333, sum: 10000, cnt: 3, stddev: 324, hdrsize: 8304, p50: 3455, p75: 3455, p90: 3663, p95: 3663, p99: 3663, p99_99: 3663, outofrange: 0 }, partitions: {-1: Partition { partition: -1, broker: -1, leader: -1, desired: false, unknown: false, msgq_cnt: 0, msgq_bytes: 0, xmit_msgq_cnt: 0, xmit_msgq_bytes: 0, fetchq_cnt: 0, fetchq_size: 0, fetch_state: "none", query_offset: -1001, next_offset: 0, app_offset: -1001, stored_offset: -1001, committed_offset: -1001, eof_offset: -1001, lo_offset: -1001, hi_offset: -1001, ls_offset: -1001, consumer_lag: -1, txmsgs: 0, txbytes: 0, rxmsgs: 0, rxbytes: 0, msgs: 19997, rx_ver_drops: 0, msgs_inflight: 0, next_ack_seq: 0, next_err_seq: 0, acked_msgid: 0 }, 0: Partition { partition: 0, broker: 0, leader: 0, desired: false, unknown: false, msgq_cnt: 0, msgq_bytes: 0, xmit_msgq_cnt: 0, xmit_msgq_bytes: 0, fetchq_cnt: 0, fetchq_size: 0, fetch_state: "none", query_offset: -1001, next_offset: 0, app_offset: -1001, stored_offset: -1001, committed_offset: -1001, eof_offset: -1001, lo_offset: -1001, hi_offset: -1001, ls_offset: -1001, consumer_lag: -1, txmsgs: 45193, txbytes: 406737, rxmsgs: 0, rxbytes: 0, msgs: 45193, rx_ver_drops: 0, msgs_inflight: 0, next_ack_seq: 45193, next_err_seq: 45193, acked_msgid: 45193 }} }}, cgrp: None, eos: Some(ExactlyOnceSemantics { idemp_state: "Assigned", idemp_stateage: 11019, txn_state: "Init", txn_stateage: 1617122908062, txn_may_enq: false, producer_id: 86, producer_epoch: 0, epoch_cnt: 1 }) }

Somehow in Github it doesn't looks as bad but in a shell it's just a wall of text.

@aljoscha, there shouldn't be any statistics printed to the logfile. Can you point me at the branch you're using? Perhaps there is a merge conflict?

@aljoscha
Copy link
Contributor

It's on main. But it only happens when you have a Kafka Sink (with consistency), because that internally uses a Kafka Consumer to fetch the latest timestamp from the output consistency topic.

The entry point for that is roughly here:

async fn get_latest_ts(
.

@cirego
Copy link
Contributor Author

cirego commented Mar 30, 2021

It's on main. But it only happens when you have a Kafka Sink (with consistency), because that internally uses a Kafka Consumer to fetch the latest timestamp from the output consistency topic.

The entry point for that is roughly here:

async fn get_latest_ts(

.

Oh, okay! I'll file a new GH issue for this. Looks like we need to duplicate some of the log from our Kafka source codebase to create the config with context and then register a callback.

@cirego
Copy link
Contributor Author

cirego commented Mar 30, 2021

Filed MaterializeInc/database-issues#1931 to track this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants