diff --git a/src/adapter/src/flags.rs b/src/adapter/src/flags.rs index ed41e768c8c20..212e69d413023 100644 --- a/src/adapter/src/flags.rs +++ b/src/adapter/src/flags.rs @@ -44,19 +44,30 @@ pub fn compute_config(config: &SystemVars) -> ComputeParameters { } fn parse_yield_spec(s: &str) -> Option { - let parts: Vec<_> = s.split(':').collect(); - match &parts[..] { - ["work", amount] => { - let amount = amount.parse().ok()?; - Some(YieldSpec::ByWork(amount)) - } - ["time", millis] => { - let millis = millis.parse().ok()?; - let duration = Duration::from_millis(millis); - Some(YieldSpec::ByTime(duration)) + let mut after_work = None; + let mut after_time = None; + + let options = s.split(',').map(|o| o.trim()); + for option in options { + let parts: Vec<_> = option.split(':').map(|p| p.trim()).collect(); + match &parts[..] { + ["work", amount] => { + let amount = amount.parse().ok()?; + after_work = Some(amount); + } + ["time", millis] => { + let millis = millis.parse().ok()?; + let duration = Duration::from_millis(millis); + after_time = Some(duration); + } + _ => return None, } - _ => None, } + + Some(YieldSpec { + after_work, + after_time, + }) } /// Return the current storage configuration, derived from the system configuration. diff --git a/src/buf.yaml b/src/buf.yaml index 50b83902fdc54..ad5c14e84772b 100644 --- a/src/buf.yaml +++ b/src/buf.yaml @@ -25,6 +25,8 @@ breaking: - compute-client/src/protocol/response.proto # reason: does currently not require backward-compatibility - compute-client/src/service.proto + # reason: does currently not require backward-compatibility + - compute-types/src/dataflows.proto # reason: Ignore because plans are currently not persisted. - expr/src/relation.proto # reason: still under active development diff --git a/src/compute-types/src/dataflows.proto b/src/compute-types/src/dataflows.proto index 776690c6c8865..b1a84613f96f3 100644 --- a/src/compute-types/src/dataflows.proto +++ b/src/compute-types/src/dataflows.proto @@ -7,6 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +// buf breaking: ignore (does currently not require backward-compatibility) + syntax = "proto3"; import "compute-types/src/plan.proto"; @@ -66,8 +68,6 @@ message ProtoBuildDesc { } message ProtoYieldSpec { - oneof kind { - uint64 by_work = 1; - mz_proto.ProtoDuration by_time = 2; - } + optional uint64 after_work = 1; + optional mz_proto.ProtoDuration after_time = 2; } diff --git a/src/compute-types/src/dataflows.rs b/src/compute-types/src/dataflows.rs index 7a1c748d2c897..c7bc5f072914f 100644 --- a/src/compute-types/src/dataflows.rs +++ b/src/compute-types/src/dataflows.rs @@ -643,33 +643,26 @@ impl RustType for BuildDesc { /// Specification of a dataflow operator's yielding behavior. #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)] -pub enum YieldSpec { - ByWork(usize), - ByTime(Duration), +pub struct YieldSpec { + /// Yield after the given amount of work was performed. + pub after_work: Option, + /// Yield after the given amount of time has elapsed. + pub after_time: Option, } impl RustType for YieldSpec { fn into_proto(&self) -> ProtoYieldSpec { - use proto_yield_spec::Kind; - - let kind = match *self { - Self::ByWork(w) => Kind::ByWork(w.into_proto()), - Self::ByTime(t) => Kind::ByTime(t.into_proto()), - }; - ProtoYieldSpec { kind: Some(kind) } + ProtoYieldSpec { + after_work: self.after_work.into_proto(), + after_time: self.after_time.into_proto(), + } } fn from_proto(proto: ProtoYieldSpec) -> Result { - use proto_yield_spec::Kind; - - let Some(kind) = proto.kind else { - return Err(TryFromProtoError::missing_field("ProtoYieldSpec::kind")); - }; - let spec = match kind { - Kind::ByWork(w) => Self::ByWork(w.into_rust()?), - Kind::ByTime(t) => Self::ByTime(t.into_rust()?), - }; - Ok(spec) + Ok(Self { + after_work: proto.after_work.into_rust()?, + after_time: proto.after_time.into_rust()?, + }) } } diff --git a/src/compute/src/render/join/linear_join.rs b/src/compute/src/render/join/linear_join.rs index 939b54578103d..6a59bc3ae78fb 100644 --- a/src/compute/src/render/join/linear_join.rs +++ b/src/compute/src/render/join/linear_join.rs @@ -61,7 +61,10 @@ impl Default for LinearJoinSpec { fn default() -> Self { Self { implementation: LinearJoinImpl::Materialize, - yielding: YieldSpec::ByWork(1_000_000), + yielding: YieldSpec { + after_work: Some(1_000_000), + after_time: None, + }, } } } @@ -88,18 +91,30 @@ impl LinearJoinSpec { V2: Data, { use LinearJoinImpl::*; - use YieldSpec::*; - match (self.implementation, self.yielding) { - (DifferentialDataflow, _) => { + match ( + self.implementation, + self.yielding.after_work, + self.yielding.after_time, + ) { + (DifferentialDataflow, _, _) => { differential_dataflow::operators::JoinCore::join_core(arranged1, arranged2, result) } - (Materialize, ByWork(limit)) => { - let yield_fn = move |_start, work| work >= limit; + (Materialize, Some(work_limit), Some(time_limit)) => { + let yield_fn = + move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit; + mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn) + } + (Materialize, Some(work_limit), None) => { + let yield_fn = move |_start, work| work >= work_limit; + mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn) + } + (Materialize, None, Some(time_limit)) => { + let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit; mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn) } - (Materialize, ByTime(limit)) => { - let yield_fn = move |start: Instant, _work| start.elapsed() >= limit; + (Materialize, None, None) => { + let yield_fn = |_start, _work| false; mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn) } } diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index 1c231faaab971..95c7a610b88ed 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -1311,7 +1311,9 @@ static LINEAR_JOIN_YIELDING: Lazy> = Lazy::new(|| ServerVar { value: &DEFAULT_LINEAR_JOIN_YIELDING, description: "The yielding behavior compute rendering should apply for linear join operators. Either \ - 'work:' or 'time:'.", + 'work:' or 'time:' or 'work:,time:'. Note \ + that omitting one of 'work' or 'time' will entirely disable join yielding by time or \ + work, respectively, rather than falling back to some default.", internal: true, });