-
Couldn't load subscription status.
- Fork 482
Default to capturing Kafka consumer metrics once per second #6192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Default to capturing Kafka consumer metrics once per second #6192
Conversation
Instead of trying to create a new Logger class that implements Send + Sync, use crossbeam to send messages from the callback context into the source info context. This will activate the source, triggering the source to read messages from the callback and update the system log tables. When a source is dropped, the statistics are removed.
Fixes #5666
…atistics_interval
…atistics_interval
We no longer need the print message (that's what this PR replaces!)
…atistics_interval
src/sql/src/kafka_util.rs
Outdated
| name: &'static str, | ||
| val_type: ValType, | ||
| transform: fn(String) -> String, | ||
| default: fn() -> Option<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Random drive-by, but why a fn -> Option<String> instead of an Option<String>?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great question and the answer is that this is vestigial. There's no need to make this a function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now an Option<String>.
src/sql/src/kafka_util.rs
Outdated
| use std::sync::Mutex; | ||
|
|
||
| use anyhow::bail; | ||
| use chrono; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These imports look like cruft from whatever your IDE is? No need to import either! The first is in the standard prelude, and the second is a crate root so implicitly in scope.
| ) | ||
| .set_default(Some( | ||
| chrono::Duration::seconds(1).num_milliseconds().to_string(), | ||
| )), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(this seems like a rather roundabout way to spell "1000", but leave it if you feel it is clearer as is)
|
I'm definitely in favor of turning this on for existing sources (which, unless I'm mistaken, is the current behavior of this PR). |
Yes, this will enable statistics for any sources that didn't explicitly set the value to |
|
Looks like materialized crashed while testdrive was running: |
Thank you! That really helps! Any idea why this wasn't crashing when I ran this locally? I can try running this using |
|
No idea, sorry! |
|
Hooray! It's either a race condition when the number of topics changes and/or librdkafka / rdkafka does not update the metrics to include the new partition. The stats object only has information for partitions |
|
Was the intent of this to also log the statistics to the regular log? With this change, my logs only consist of rdkafka stats: Somehow in Github it doesn't looks as bad but in a shell it's just a wall of text. 😅 |
@aljoscha, there shouldn't be any statistics printed to the logfile. Can you point me at the branch you're using? Perhaps there is a merge conflict? |
|
It's on The entry point for that is roughly here:
|
Oh, okay! I'll file a new GH issue for this. Looks like we need to duplicate some of the log from our Kafka source codebase to create the config with context and then register a callback. |
|
Filed MaterializeInc/database-issues#1931 to track this. |

Now that #6131 has landed, and we've run benchmarks showing that the slowdown is minimal, we should default to collecting these metrics for Kafka sources.
One open question: do we want to turn this on for existing sources too?
This change is