Skip to content

Commit 6407410

Browse files
committed
adapter: add SystemVar linear_join_yielding
This commit adds a new `SystemVar` called `linear_join_yielding` that can be used to control the yielding behavior of linear joins rendered by the compute layer.
1 parent 1dd5dcd commit 6407410

File tree

2 files changed

+42
-2
lines changed

2 files changed

+42
-2
lines changed

src/adapter/src/flags.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,30 @@
1010
use std::time::Duration;
1111

1212
use mz_compute_client::protocol::command::ComputeParameters;
13+
use mz_compute_types::dataflows::YieldSpec;
1314
use mz_orchestrator::scheduling_config::{ServiceSchedulingConfig, ServiceTopologySpreadConfig};
1415
use mz_ore::cast::CastFrom;
1516
use mz_ore::error::ErrorExt;
1617
use mz_persist_client::cfg::{PersistParameters, RetryParameters};
1718
use mz_service::params::GrpcClientParameters;
18-
use mz_sql::session::vars::SystemVars;
19+
use mz_sql::session::vars::{SystemVars, DEFAULT_LINEAR_JOIN_YIELDING};
1920
use mz_storage_types::parameters::{
2021
StorageMaxInflightBytesConfig, StorageParameters, UpsertAutoSpillConfig,
2122
};
2223
use mz_tracing::params::TracingParameters;
2324

2425
/// Return the current compute configuration, derived from the system configuration.
2526
pub fn compute_config(config: &SystemVars) -> ComputeParameters {
27+
let linear_join_yielding = config.linear_join_yielding();
28+
let linear_join_yielding = parse_yield_spec(linear_join_yielding).unwrap_or_else(|| {
29+
tracing::error!("invalid `linear_join_yielding` config: {linear_join_yielding}");
30+
parse_yield_spec(&DEFAULT_LINEAR_JOIN_YIELDING).expect("default is valid")
31+
});
32+
2633
ComputeParameters {
2734
max_result_size: Some(config.max_result_size()),
2835
dataflow_max_inflight_bytes: Some(config.dataflow_max_inflight_bytes()),
29-
linear_join_yielding: None,
36+
linear_join_yielding: Some(linear_join_yielding),
3037
enable_mz_join_core: Some(config.enable_mz_join_core()),
3138
enable_jemalloc_profiling: Some(config.enable_jemalloc_profiling()),
3239
enable_specialized_arrangements: Some(config.enable_specialized_arrangements()),
@@ -36,6 +43,22 @@ pub fn compute_config(config: &SystemVars) -> ComputeParameters {
3643
}
3744
}
3845

46+
fn parse_yield_spec(s: &str) -> Option<YieldSpec> {
47+
let parts: Vec<_> = s.split(':').collect();
48+
match &parts[..] {
49+
["work", amount] => {
50+
let amount = amount.parse().ok()?;
51+
Some(YieldSpec::ByWork(amount))
52+
}
53+
["time", millis] => {
54+
let millis = millis.parse().ok()?;
55+
let duration = Duration::from_millis(millis);
56+
Some(YieldSpec::ByTime(duration))
57+
}
58+
_ => None,
59+
}
60+
}
61+
3962
/// Return the current storage configuration, derived from the system configuration.
4063
pub fn storage_config(config: &SystemVars) -> StorageParameters {
4164
StorageParameters {

src/sql/src/session/vars.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1289,6 +1289,16 @@ const ENABLE_MZ_JOIN_CORE: ServerVar<bool> = ServerVar {
12891289
internal: true,
12901290
};
12911291

1292+
pub static DEFAULT_LINEAR_JOIN_YIELDING: Lazy<String> = Lazy::new(|| "work:1000000".into());
1293+
static LINEAR_JOIN_YIELDING: Lazy<ServerVar<String>> = Lazy::new(|| ServerVar {
1294+
name: UncasedStr::new("linear_join_yielding"),
1295+
value: &DEFAULT_LINEAR_JOIN_YIELDING,
1296+
description:
1297+
"The yielding behavior compute rendering should apply for linear join operators. Either \
1298+
'work:<amount>' or 'time:<milliseconds>'.",
1299+
internal: true,
1300+
});
1301+
12921302
pub const ENABLE_DEFAULT_CONNECTION_VALIDATION: ServerVar<bool> = ServerVar {
12931303
name: UncasedStr::new("enable_default_connection_validation"),
12941304
value: &true,
@@ -2461,6 +2471,7 @@ impl SystemVars {
24612471
.with_var(&KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES)
24622472
.with_var(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES)
24632473
.with_var(&ENABLE_MZ_JOIN_CORE)
2474+
.with_var(&LINEAR_JOIN_YIELDING)
24642475
.with_var(&ENABLE_STORAGE_SHARD_FINALIZATION)
24652476
.with_var(&ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE)
24662477
.with_var(&ENABLE_DEFAULT_CONNECTION_VALIDATION)
@@ -3112,6 +3123,11 @@ impl SystemVars {
31123123
*self.expect_value(&ENABLE_MZ_JOIN_CORE)
31133124
}
31143125

3126+
/// Returns the `linear_join_yielding` configuration parameter.
3127+
pub fn linear_join_yielding(&self) -> &String {
3128+
self.expect_value(&LINEAR_JOIN_YIELDING)
3129+
}
3130+
31153131
/// Returns the `enable_storage_shard_finalization` configuration parameter.
31163132
pub fn enable_storage_shard_finalization(&self) -> bool {
31173133
*self.expect_value(&ENABLE_STORAGE_SHARD_FINALIZATION)
@@ -4777,6 +4793,7 @@ pub fn is_tracing_var(name: &str) -> bool {
47774793
pub fn is_compute_config_var(name: &str) -> bool {
47784794
name == MAX_RESULT_SIZE.name()
47794795
|| name == DATAFLOW_MAX_INFLIGHT_BYTES.name()
4796+
|| name == LINEAR_JOIN_YIELDING.name()
47804797
|| name == ENABLE_MZ_JOIN_CORE.name()
47814798
|| name == ENABLE_JEMALLOC_PROFILING.name()
47824799
|| name == ENABLE_SPECIALIZED_ARRANGEMENTS.name()

0 commit comments

Comments
 (0)