Skip to content

Commit c676389

Browse files
Pivot container builders above capability checking (#715)
* Pivot container builders above capability checking * Remove pushers::Buffer * Respond to feedback * Rename to OutputBuilderSession * Improve partition.rs
1 parent 729f75b commit c676389

File tree

23 files changed

+288
-487
lines changed

23 files changed

+288
-487
lines changed

timely/examples/unordered_input.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use timely::dataflow::operators::*;
22
use timely::Config;
3-
// use timely::progress::timestamp::RootTimestamp;
43

54
fn main() {
65
timely::execute(Config::thread(), |worker| {
@@ -11,7 +10,7 @@ fn main() {
1110
});
1211

1312
for round in 0..10 {
14-
input.session(cap.clone()).give(round);
13+
input.activate().session(&cap).give(round);
1514
cap = cap.delayed(&(round + 1));
1615
worker.step();
1716
}

timely/src/dataflow/channels/pushers/buffer.rs

Lines changed: 0 additions & 255 deletions
This file was deleted.

timely/src/dataflow/channels/pushers/counter.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,12 @@ impl<T, P> Counter<T, P> where T : Ord+Clone+'static {
4242
pub fn produced(&self) -> &Rc<RefCell<ChangeBatch<T>>> {
4343
&self.produced
4444
}
45+
/// Ships a time and a container.
46+
///
47+
/// This is not a validated capability, and this method should not be used without great care.
48+
/// Ideally, users would not have direct access to a `Counter`, and preventing this is the way
49+
/// to uphold invariants.
50+
#[inline] pub fn give<C: crate::Container>(&mut self, time: T, container: &mut C) where P: Push<Message<T, C>> {
51+
if !container.is_empty() { Message::push_at(container, time, &mut self.pushee); }
52+
}
4553
}

timely/src/dataflow/channels/pushers/exchange.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ where
141141
}
142142
self.distributor.relax();
143143
for index in 0..self.pushers.len() {
144-
self.pushers[index].push(&mut None);
144+
self.pushers[index].done();
145145
}
146146
}
147147
}

timely/src/dataflow/channels/pushers/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,9 @@ pub use self::counter::Counter;
55
pub mod tee;
66
pub mod exchange;
77
pub mod counter;
8-
pub mod buffer;
8+
pub mod progress;
9+
10+
/// An output pusher which validates capabilities, records progress, and tees output.
11+
pub type Output<T, C> = progress::Progress<T, counter::Counter<T, tee::Tee<T, C>>>;
12+
/// An output session that will flush the output when dropped.
13+
pub type OutputSession<'a, T, C> = progress::ProgressSession<'a, T, C, counter::Counter<T, tee::Tee<T, C>>>;
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
//! A wrapper that allows containers to be sent by validating capabilities.
2+
3+
use std::rc::Rc;
4+
use std::cell::RefCell;
5+
6+
use crate::progress::{ChangeBatch, Timestamp};
7+
use crate::dataflow::channels::Message;
8+
use crate::dataflow::operators::CapabilityTrait;
9+
use crate::communication::Push;
10+
use crate::Container;
11+
12+
/// A wrapper that allows containers to be sent by validating capabilities.
13+
#[derive(Debug)]
14+
pub struct Progress<T, P> {
15+
pushee: P,
16+
internal: Rc<RefCell<ChangeBatch<T>>>,
17+
port: usize,
18+
}
19+
20+
impl<T: Timestamp, P> Progress<T, P> {
21+
/// Ships a container using a provided capability.
22+
///
23+
/// On return, the container may hold undefined contents and should be cleared before it is reused.
24+
#[inline] pub fn give<C: Container, CT: CapabilityTrait<T>>(&mut self, capability: &CT, container: &mut C) where P: Push<Message<T, C>> {
25+
debug_assert!(self.valid(capability), "Attempted to open output session with invalid capability");
26+
if !container.is_empty() { Message::push_at(container, capability.time().clone(), &mut self.pushee); }
27+
}
28+
/// Activates a `Progress` into a `ProgressSession` which will flush when dropped.
29+
pub fn activate<'a, C>(&'a mut self) -> ProgressSession<'a, T, C, P> where P: Push<Message<T, C>> {
30+
ProgressSession {
31+
borrow: self,
32+
marker: std::marker::PhantomData,
33+
}
34+
}
35+
/// Determines if the capability is valid for this output.
36+
pub fn valid<CT: CapabilityTrait<T>>(&self, capability: &CT) -> bool {
37+
capability.valid_for_output(&self.internal, self.port)
38+
}
39+
}
40+
41+
impl<T, P> Progress<T, P> where T : Ord+Clone+'static {
42+
/// Allocates a new `Progress` from a pushee and capability validation information.
43+
pub fn new(pushee: P, internal: Rc<RefCell<ChangeBatch<T>>>, port: usize) -> Self {
44+
Self { pushee, internal, port }
45+
}
46+
}
47+
48+
/// A session that provides access to a `Progress` but will flush when dropped.
49+
///
50+
/// The type of the container `C` must be known, as long as the flushing action requires a specific `Push` implementation.
51+
pub struct ProgressSession<'a, T: Timestamp, C, P: Push<Message<T, C>>> {
52+
borrow: &'a mut Progress<T, P>,
53+
marker: std::marker::PhantomData<C>,
54+
}
55+
56+
impl<'a, T: Timestamp, C, P: Push<Message<T, C>>> std::ops::Deref for ProgressSession<'a, T, C, P> {
57+
type Target = Progress<T, P>;
58+
fn deref(&self) -> &Self::Target { self.borrow }
59+
}
60+
61+
impl<'a, T: Timestamp, C, P: Push<Message<T, C>>> std::ops::DerefMut for ProgressSession<'a, T, C, P> {
62+
fn deref_mut(&mut self) -> &mut Self::Target { self.borrow }
63+
}
64+
65+
impl<'a, T: Timestamp, C, P: Push<Message<T, C>>> Drop for ProgressSession<'a, T, C, P> {
66+
fn drop(&mut self) { self.borrow.pushee.done(); }
67+
}

timely/src/dataflow/channels/pushers/tee.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ impl<T: Data, C: Container> Push<Message<T, C>> for Tee<T, C> {
2929
}
3030
else {
3131
for index in 1..pushers.len() {
32-
pushers[index-1].push(&mut None);
32+
pushers[index-1].done();
3333
}
3434
}
3535
if !pushers.is_empty() {

0 commit comments

Comments
 (0)