Skip to content

Commit 53fe356

Browse files
committed
compute: controllable mz_join_core yielding
This commit provides `mz_join_core` with a yield functions, inspired by `persist_source` and DD's half join operator, that allows the caller to control the yield behavior by time or by amount of work performed.
1 parent f45bf4c commit 53fe356

File tree

2 files changed

+38
-25
lines changed

2 files changed

+38
-25
lines changed

src/compute/src/render/join/linear_join.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ impl LinearJoinImpl {
6767
Self::DifferentialDataflow => {
6868
differential_dataflow::operators::JoinCore::join_core(arranged1, arranged2, result)
6969
}
70-
Self::Materialize => mz_join_core(arranged1, arranged2, result),
70+
Self::Materialize => mz_join_core(arranged1, arranged2, result, |_start, work| {
71+
work >= 1_000_000
72+
}),
7173
}
7274
}
7375
}

src/compute/src/render/join/mz_join_core.rs

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
3939
use std::cmp::Ordering;
4040
use std::collections::VecDeque;
41+
use std::time::Instant;
4142

4243
use differential_dataflow::consolidation::consolidate_updates;
4344
use differential_dataflow::difference::Multiply;
@@ -61,10 +62,11 @@ use timely::PartialOrder;
6162
/// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
6263
/// which produces something implementing `IntoIterator`, where the output collection will have an entry for
6364
/// every value returned by the iterator.
64-
pub(super) fn mz_join_core<G, Tr1, Tr2, L, I, K, V1, V2>(
65+
pub(super) fn mz_join_core<G, Tr1, Tr2, L, I, K, V1, V2, YFn>(
6566
arranged1: &Arranged<G, Tr1>,
6667
arranged2: &Arranged<G, Tr2>,
6768
mut result: L,
69+
yield_fn: YFn,
6870
) -> Collection<G, I::Item, Diff>
6971
where
7072
G: Scope,
@@ -77,6 +79,7 @@ where
7779
K: Data,
7880
V1: Data,
7981
V2: Data,
82+
YFn: Fn(Instant, usize) -> bool + 'static,
8083
{
8184
let mut trace1 = arranged1.trace.clone();
8285
let mut trace2 = arranged2.trace.clone();
@@ -278,24 +281,30 @@ where
278281
// input must scan all batches from the other input).
279282

280283
// Perform some amount of outstanding work.
281-
let mut fuel = 1_000_000;
282-
while !todo1.is_empty() && fuel > 0 {
283-
todo1
284-
.front_mut()
285-
.unwrap()
286-
.work(output, &mut result, &mut fuel);
284+
let start_time = Instant::now();
285+
let mut work = 0;
286+
while !todo1.is_empty() && !yield_fn(start_time, work) {
287+
todo1.front_mut().unwrap().work(
288+
output,
289+
&mut result,
290+
|w| yield_fn(start_time, w),
291+
&mut work,
292+
);
287293
if !todo1.front().unwrap().work_remains() {
288294
todo1.pop_front();
289295
}
290296
}
291297

292298
// Perform some amount of outstanding work.
293-
let mut fuel = 1_000_000;
294-
while !todo2.is_empty() && fuel > 0 {
295-
todo2
296-
.front_mut()
297-
.unwrap()
298-
.work(output, &mut result, &mut fuel);
299+
let start_time = Instant::now();
300+
let mut work = 0;
301+
while !todo2.is_empty() && !yield_fn(start_time, work) {
302+
todo2.front_mut().unwrap().work(
303+
output,
304+
&mut result,
305+
|w| yield_fn(start_time, w),
306+
&mut work,
307+
);
299308
if !todo2.front().unwrap().work_remains() {
300309
todo2.pop_front();
301310
}
@@ -405,14 +414,16 @@ where
405414
}
406415

407416
/// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
408-
fn work<L, I>(
417+
fn work<L, I, YFn>(
409418
&mut self,
410419
output: &mut OutputHandle<T, (D, T, Diff), Tee<T, (D, T, Diff)>>,
411420
mut result: L,
412-
fuel: &mut usize,
421+
yield_fn: YFn,
422+
work: &mut usize,
413423
) where
414424
I: IntoIterator<Item = D>,
415425
L: FnMut(&C1::Key, &C1::Val, &C2::Val) -> I,
426+
YFn: Fn(usize) -> bool,
416427
{
417428
let meet = self.capability.time();
418429

@@ -443,7 +454,7 @@ where
443454
Ordering::Less => cursor1.seek_key(storage1, cursor2.key(storage2)),
444455
Ordering::Greater => cursor2.seek_key(storage2, cursor1.key(storage1)),
445456
Ordering::Equal => {
446-
// Populate `temp` with the results, as long as fuel remains.
457+
// Populate `temp` with the results, until we should yield.
447458
let key = cursor2.key(storage2);
448459
while let Some(val1) = cursor1.get_val(storage1) {
449460
while let Some(val2) = cursor2.get_val(storage2) {
@@ -463,14 +474,14 @@ where
463474
cursor1.step_val(storage1);
464475
cursor2.rewind_vals(storage2);
465476

466-
*fuel = fuel.saturating_sub(temp.len());
477+
*work.saturating_add(temp.len());
467478

468-
if *fuel == 0 {
469-
// The fuel is exhausted, so we should yield. Returning here is only
470-
// allowed because we leave the cursors in a state that will let us
471-
// pick up the work correctly on the next invocation.
472-
*fuel += flush(temp, &mut session);
473-
if *fuel == 0 {
479+
if yield_fn(*work) {
480+
// Returning here is only allowed because we leave the cursors in a
481+
// state that will let us pick up the work correctly on the next
482+
// invocation.
483+
*work -= flush(temp, &mut session);
484+
if yield_fn(*work) {
474485
return;
475486
}
476487
}
@@ -483,7 +494,7 @@ where
483494
}
484495

485496
if !temp.is_empty() {
486-
*fuel += flush(temp, &mut session);
497+
*work -= flush(temp, &mut session);
487498
}
488499

489500
// We only get here after having iterated through all keys.

0 commit comments

Comments
 (0)