Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions src/adapter/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,30 @@ pub fn compute_config(config: &SystemVars) -> ComputeParameters {
}

fn parse_yield_spec(s: &str) -> Option<YieldSpec> {
let parts: Vec<_> = s.split(':').collect();
match &parts[..] {
["work", amount] => {
let amount = amount.parse().ok()?;
Some(YieldSpec::ByWork(amount))
}
["time", millis] => {
let millis = millis.parse().ok()?;
let duration = Duration::from_millis(millis);
Some(YieldSpec::ByTime(duration))
let mut after_work = None;
let mut after_time = None;

let options = s.split(',').map(|o| o.trim());
for option in options {
let parts: Vec<_> = option.split(':').map(|p| p.trim()).collect();
match &parts[..] {
["work", amount] => {
let amount = amount.parse().ok()?;
after_work = Some(amount);
}
["time", millis] => {
let millis = millis.parse().ok()?;
let duration = Duration::from_millis(millis);
after_time = Some(duration);
}
_ => return None,
}
_ => None,
}

Some(YieldSpec {
after_work,
after_time,
})
}

/// Return the current storage configuration, derived from the system configuration.
Expand Down
2 changes: 2 additions & 0 deletions src/buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ breaking:
- compute-client/src/protocol/response.proto
# reason: does currently not require backward-compatibility
- compute-client/src/service.proto
# reason: does currently not require backward-compatibility
- compute-types/src/dataflows.proto
# reason: Ignore because plans are currently not persisted.
- expr/src/relation.proto
# reason: still under active development
Expand Down
8 changes: 4 additions & 4 deletions src/compute-types/src/dataflows.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

// buf breaking: ignore (does currently not require backward-compatibility)

syntax = "proto3";

import "compute-types/src/plan.proto";
Expand Down Expand Up @@ -66,8 +68,6 @@ message ProtoBuildDesc {
}

message ProtoYieldSpec {
oneof kind {
uint64 by_work = 1;
mz_proto.ProtoDuration by_time = 2;
}
optional uint64 after_work = 1;
optional mz_proto.ProtoDuration after_time = 2;
}
33 changes: 13 additions & 20 deletions src/compute-types/src/dataflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,33 +643,26 @@ impl RustType<ProtoBuildDesc> for BuildDesc<crate::plan::Plan> {

/// Specification of a dataflow operator's yielding behavior.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
pub enum YieldSpec {
ByWork(usize),
ByTime(Duration),
pub struct YieldSpec {
/// Yield after the given amount of work was performed.
pub after_work: Option<usize>,
/// Yield after the given amount of time has elapsed.
pub after_time: Option<Duration>,
}

impl RustType<ProtoYieldSpec> for YieldSpec {
fn into_proto(&self) -> ProtoYieldSpec {
use proto_yield_spec::Kind;

let kind = match *self {
Self::ByWork(w) => Kind::ByWork(w.into_proto()),
Self::ByTime(t) => Kind::ByTime(t.into_proto()),
};
ProtoYieldSpec { kind: Some(kind) }
ProtoYieldSpec {
after_work: self.after_work.into_proto(),
after_time: self.after_time.into_proto(),
}
}

fn from_proto(proto: ProtoYieldSpec) -> Result<Self, TryFromProtoError> {
use proto_yield_spec::Kind;

let Some(kind) = proto.kind else {
return Err(TryFromProtoError::missing_field("ProtoYieldSpec::kind"));
};
let spec = match kind {
Kind::ByWork(w) => Self::ByWork(w.into_rust()?),
Kind::ByTime(t) => Self::ByTime(t.into_rust()?),
};
Ok(spec)
Ok(Self {
after_work: proto.after_work.into_rust()?,
after_time: proto.after_time.into_rust()?,
})
}
}

Expand Down
31 changes: 23 additions & 8 deletions src/compute/src/render/join/linear_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ impl Default for LinearJoinSpec {
fn default() -> Self {
Self {
implementation: LinearJoinImpl::Materialize,
yielding: YieldSpec::ByWork(1_000_000),
yielding: YieldSpec {
after_work: Some(1_000_000),
after_time: None,
},
}
}
}
Expand All @@ -88,18 +91,30 @@ impl LinearJoinSpec {
V2: Data,
{
use LinearJoinImpl::*;
use YieldSpec::*;

match (self.implementation, self.yielding) {
(DifferentialDataflow, _) => {
match (
self.implementation,
self.yielding.after_work,
self.yielding.after_time,
) {
(DifferentialDataflow, _, _) => {
differential_dataflow::operators::JoinCore::join_core(arranged1, arranged2, result)
}
(Materialize, ByWork(limit)) => {
let yield_fn = move |_start, work| work >= limit;
(Materialize, Some(work_limit), Some(time_limit)) => {
let yield_fn =
move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit;
mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn)
}
(Materialize, Some(work_limit), None) => {
let yield_fn = move |_start, work| work >= work_limit;
mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn)
}
(Materialize, None, Some(time_limit)) => {
let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit;
Comment on lines +103 to +113
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what the overhead is to checking the option inside the closure. Calling mz_join_core several times might be bad for compile times, but checking the limits in the closure might consume more CPU? Unclear!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there are several things we could do:

  1. Have a single yield_fn that always checks both work and time and handles the Options. Good for readability, good for compile times, bad for runtime performance.
  2. Have multiple yield_fns for each Some/None combination, instantiate a different mz_join_core variant for each of them. Bad for readability, bad for compile times, good for runtime performance.
  3. Have multiple yield_fns for each Some/None combination, instantiate a single mz_join_core that takes a Box<dyn YFn>. Okay for readability, good for compile times, okay for runtime performance.

(The "X for runtime performance" are just guesses, I don't really know either.)

I went with (2) here because I really want to avoid regressing join performance, but I'm not too happy about readability. I'm not too worried about compile times, though maybe I should be :)

mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn)
}
(Materialize, ByTime(limit)) => {
let yield_fn = move |start: Instant, _work| start.elapsed() >= limit;
(Materialize, None, None) => {
let yield_fn = |_start, _work| false;
mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn)
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/sql/src/session/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1311,7 +1311,9 @@ static LINEAR_JOIN_YIELDING: Lazy<ServerVar<String>> = Lazy::new(|| ServerVar {
value: &DEFAULT_LINEAR_JOIN_YIELDING,
description:
"The yielding behavior compute rendering should apply for linear join operators. Either \
'work:<amount>' or 'time:<milliseconds>'.",
'work:<amount>' or 'time:<milliseconds>' or 'work:<amount>,time:<milliseconds>'. Note \
that omitting one of 'work' or 'time' will entirely disable join yielding by time or \
work, respectively, rather than falling back to some default.",
internal: true,
});

Expand Down