Skip to content

Commit 67c9dcb

Browse files
committed
Allow join yielding by work and time simultaneously
This commit extends the `YieldSpec` type and the syntax allowed for the `linear_join_yielding` system var to enable specifying yield strategies that consider both the performed work and the elapsed time. This will allow us to make sure that (a) join operators don't keep around huge amounts of output records and (b) join operators don't regress interactivity.
1 parent 1b769ef commit 67c9dcb

File tree

6 files changed

+67
-44
lines changed

6 files changed

+67
-44
lines changed

src/adapter/src/flags.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,30 @@ pub fn compute_config(config: &SystemVars) -> ComputeParameters {
4444
}
4545

4646
fn parse_yield_spec(s: &str) -> Option<YieldSpec> {
47-
let parts: Vec<_> = s.split(':').collect();
48-
match &parts[..] {
49-
["work", amount] => {
50-
let amount = amount.parse().ok()?;
51-
Some(YieldSpec::ByWork(amount))
52-
}
53-
["time", millis] => {
54-
let millis = millis.parse().ok()?;
55-
let duration = Duration::from_millis(millis);
56-
Some(YieldSpec::ByTime(duration))
47+
let mut after_work = None;
48+
let mut after_time = None;
49+
50+
let options = s.split(',').map(|o| o.trim());
51+
for option in options {
52+
let parts: Vec<_> = option.split(':').map(|p| p.trim()).collect();
53+
match &parts[..] {
54+
["work", amount] => {
55+
let amount = amount.parse().ok()?;
56+
after_work = Some(amount);
57+
}
58+
["time", millis] => {
59+
let millis = millis.parse().ok()?;
60+
let duration = Duration::from_millis(millis);
61+
after_time = Some(duration);
62+
}
63+
_ => return None,
5764
}
58-
_ => None,
5965
}
66+
67+
Some(YieldSpec {
68+
after_work,
69+
after_time,
70+
})
6071
}
6172

6273
/// Return the current storage configuration, derived from the system configuration.

src/buf.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ breaking:
2525
- compute-client/src/protocol/response.proto
2626
# reason: does currently not require backward-compatibility
2727
- compute-client/src/service.proto
28+
# reason: does currently not require backward-compatibility
29+
- compute-types/src/dataflows.proto
2830
# reason: Ignore because plans are currently not persisted.
2931
- expr/src/relation.proto
3032
# reason: still under active development

src/compute-types/src/dataflows.proto

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
// the Business Source License, use of this software will be governed
88
// by the Apache License, Version 2.0.
99

10+
// buf breaking: ignore (does currently not require backward-compatibility)
11+
1012
syntax = "proto3";
1113

1214
import "compute-types/src/plan.proto";
@@ -66,8 +68,6 @@ message ProtoBuildDesc {
6668
}
6769

6870
message ProtoYieldSpec {
69-
oneof kind {
70-
uint64 by_work = 1;
71-
mz_proto.ProtoDuration by_time = 2;
72-
}
71+
optional uint64 after_work = 1;
72+
optional mz_proto.ProtoDuration after_time = 2;
7373
}

src/compute-types/src/dataflows.rs

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -643,33 +643,26 @@ impl RustType<ProtoBuildDesc> for BuildDesc<crate::plan::Plan> {
643643

644644
/// Specification of a dataflow operator's yielding behavior.
645645
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
646-
pub enum YieldSpec {
647-
ByWork(usize),
648-
ByTime(Duration),
646+
pub struct YieldSpec {
647+
/// Yield after the given amount of work was performed.
648+
pub after_work: Option<usize>,
649+
/// Yield after the given amount of time has elapsed.
650+
pub after_time: Option<Duration>,
649651
}
650652

651653
impl RustType<ProtoYieldSpec> for YieldSpec {
652654
fn into_proto(&self) -> ProtoYieldSpec {
653-
use proto_yield_spec::Kind;
654-
655-
let kind = match *self {
656-
Self::ByWork(w) => Kind::ByWork(w.into_proto()),
657-
Self::ByTime(t) => Kind::ByTime(t.into_proto()),
658-
};
659-
ProtoYieldSpec { kind: Some(kind) }
655+
ProtoYieldSpec {
656+
after_work: self.after_work.into_proto(),
657+
after_time: self.after_time.into_proto(),
658+
}
660659
}
661660

662661
fn from_proto(proto: ProtoYieldSpec) -> Result<Self, TryFromProtoError> {
663-
use proto_yield_spec::Kind;
664-
665-
let Some(kind) = proto.kind else {
666-
return Err(TryFromProtoError::missing_field("ProtoYieldSpec::kind"));
667-
};
668-
let spec = match kind {
669-
Kind::ByWork(w) => Self::ByWork(w.into_rust()?),
670-
Kind::ByTime(t) => Self::ByTime(t.into_rust()?),
671-
};
672-
Ok(spec)
662+
Ok(Self {
663+
after_work: proto.after_work.into_rust()?,
664+
after_time: proto.after_time.into_rust()?,
665+
})
673666
}
674667
}
675668

src/compute/src/render/join/linear_join.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ impl Default for LinearJoinSpec {
6161
fn default() -> Self {
6262
Self {
6363
implementation: LinearJoinImpl::Materialize,
64-
yielding: YieldSpec::ByWork(1_000_000),
64+
yielding: YieldSpec {
65+
after_work: Some(1_000_000),
66+
after_time: None,
67+
},
6568
}
6669
}
6770
}
@@ -88,18 +91,30 @@ impl LinearJoinSpec {
8891
V2: Data,
8992
{
9093
use LinearJoinImpl::*;
91-
use YieldSpec::*;
9294

93-
match (self.implementation, self.yielding) {
94-
(DifferentialDataflow, _) => {
95+
match (
96+
self.implementation,
97+
self.yielding.after_work,
98+
self.yielding.after_time,
99+
) {
100+
(DifferentialDataflow, _, _) => {
95101
differential_dataflow::operators::JoinCore::join_core(arranged1, arranged2, result)
96102
}
97-
(Materialize, ByWork(limit)) => {
98-
let yield_fn = move |_start, work| work >= limit;
103+
(Materialize, Some(work_limit), Some(time_limit)) => {
104+
let yield_fn =
105+
move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit;
106+
mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn)
107+
}
108+
(Materialize, Some(work_limit), None) => {
109+
let yield_fn = move |_start, work| work >= work_limit;
110+
mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn)
111+
}
112+
(Materialize, None, Some(time_limit)) => {
113+
let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit;
99114
mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn)
100115
}
101-
(Materialize, ByTime(limit)) => {
102-
let yield_fn = move |start: Instant, _work| start.elapsed() >= limit;
116+
(Materialize, None, None) => {
117+
let yield_fn = |_start, _work| false;
103118
mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn)
104119
}
105120
}

src/sql/src/session/vars.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1311,7 +1311,9 @@ static LINEAR_JOIN_YIELDING: Lazy<ServerVar<String>> = Lazy::new(|| ServerVar {
13111311
value: &DEFAULT_LINEAR_JOIN_YIELDING,
13121312
description:
13131313
"The yielding behavior compute rendering should apply for linear join operators. Either \
1314-
'work:<amount>' or 'time:<milliseconds>'.",
1314+
'work:<amount>' or 'time:<milliseconds>' or 'work:<amount>,time:<milliseconds>'. Note \
1315+
that omitting one of 'work' or 'time' will entirely disable join yielding by time or \
1316+
work, respectively, rather than falling back to some default.",
13151317
internal: true,
13161318
});
13171319

0 commit comments

Comments
 (0)