Skip to content

Commit 474bac1

Browse files
Create pact for Distributors (#709)
1 parent 3d66d7f commit 474bac1

File tree

2 files changed

+205
-137
lines changed

2 files changed

+205
-137
lines changed

timely/src/dataflow/channels/pact.rs

Lines changed: 204 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,14 @@
77
//! The only requirement of a pact is that it not alter the number of `D` records at each time `T`.
88
//! The progress tracking logic assumes that this number is independent of the pact used.
99
10-
use std::{fmt::{self, Debug}, marker::PhantomData};
10+
use std::fmt::Debug;
1111
use std::rc::Rc;
1212

1313
use crate::Accountable;
14-
use crate::container::{ContainerBuilder, DrainContainer, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto};
1514
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
1615
use crate::communication::{Push, Pull};
17-
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
18-
use crate::dataflow::channels::pushers::exchange::DrainContainerDistributor;
1916
use crate::dataflow::channels::Message;
20-
use crate::logging::{TimelyLogger as Logger, MessagesEvent};
21-
use crate::progress::Timestamp;
17+
use crate::logging::TimelyLogger as Logger;
2218
use crate::worker::AsWorker;
2319

2420
/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
@@ -45,162 +41,234 @@ impl<T: 'static, C: Accountable + 'static> ParallelizationContract<T, C> for Pip
4541
}
4642
}
4743

48-
/// An exchange between multiple observers by data
49-
pub struct ExchangeCore<CB, F> { hash_func: F, phantom: PhantomData<CB> }
50-
51-
/// [ExchangeCore] specialized to vector-based containers.
52-
pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;
53-
54-
impl<CB, F> ExchangeCore<CB, F>
55-
where
56-
CB: LengthPreservingContainerBuilder,
57-
CB::Container: DrainContainer,
58-
for<'a> F: FnMut(&<CB::Container as DrainContainer>::Item<'a>)->u64
59-
{
60-
/// Allocates a new `Exchange` pact from a distribution function.
61-
pub fn new_core(func: F) -> ExchangeCore<CB, F> {
62-
ExchangeCore {
63-
hash_func: func,
64-
phantom: PhantomData,
44+
pub use exchange::{ExchangeCore, Exchange};
45+
mod exchange {
46+
47+
use std::{fmt::{self, Debug}, marker::PhantomData};
48+
use std::rc::Rc;
49+
50+
use crate::container::{ContainerBuilder, DrainContainer, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto};
51+
use crate::communication::{Push, Pull};
52+
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
53+
use crate::dataflow::channels::pushers::exchange::DrainContainerDistributor;
54+
use crate::dataflow::channels::Message;
55+
use crate::logging::TimelyLogger as Logger;
56+
use crate::progress::Timestamp;
57+
use crate::worker::AsWorker;
58+
59+
use super::{ParallelizationContract, LogPusher, LogPuller};
60+
61+
/// An exchange between multiple observers by data
62+
pub struct ExchangeCore<CB, F> { hash_func: F, phantom: PhantomData<CB> }
63+
64+
/// [ExchangeCore] specialized to vector-based containers.
65+
pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;
66+
67+
impl<CB, F> ExchangeCore<CB, F>
68+
where
69+
CB: LengthPreservingContainerBuilder,
70+
CB::Container: DrainContainer,
71+
for<'a> F: FnMut(&<CB::Container as DrainContainer>::Item<'a>)->u64
72+
{
73+
/// Allocates a new `Exchange` pact from a distribution function.
74+
pub fn new_core(func: F) -> ExchangeCore<CB, F> {
75+
ExchangeCore {
76+
hash_func: func,
77+
phantom: PhantomData,
78+
}
6579
}
6680
}
67-
}
6881

69-
impl<C, F> ExchangeCore<CapacityContainerBuilder<C>, F>
70-
where
71-
C: SizableContainer + DrainContainer,
72-
for<'a> F: FnMut(&C::Item<'a>)->u64
73-
{
74-
/// Allocates a new `Exchange` pact from a distribution function.
75-
pub fn new(func: F) -> ExchangeCore<CapacityContainerBuilder<C>, F> {
76-
ExchangeCore {
77-
hash_func: func,
78-
phantom: PhantomData,
82+
impl<C, F> ExchangeCore<CapacityContainerBuilder<C>, F>
83+
where
84+
C: SizableContainer + DrainContainer,
85+
for<'a> F: FnMut(&C::Item<'a>)->u64
86+
{
87+
/// Allocates a new `Exchange` pact from a distribution function.
88+
pub fn new(func: F) -> ExchangeCore<CapacityContainerBuilder<C>, F> {
89+
ExchangeCore {
90+
hash_func: func,
91+
phantom: PhantomData,
92+
}
7993
}
8094
}
81-
}
8295

83-
// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
84-
impl<T: Timestamp, CB, H> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
85-
where
86-
CB: ContainerBuilder<Container: DrainContainer> + for<'a> PushInto<<CB::Container as DrainContainer>::Item<'a>>,
87-
CB::Container: Send + crate::dataflow::channels::ContainerBytes,
88-
for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64 + 'static,
89-
{
90-
type Pusher = ExchangePusher<
91-
T,
92-
LogPusher<Box<dyn Push<Message<T, CB::Container>>>>,
93-
DrainContainerDistributor<CB, H>
94-
>;
95-
type Puller = LogPuller<Box<dyn Pull<Message<T, CB::Container>>>>;
96+
// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
97+
impl<T: Timestamp, CB, H> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
98+
where
99+
CB: ContainerBuilder<Container: DrainContainer> + for<'a> PushInto<<CB::Container as DrainContainer>::Item<'a>>,
100+
CB::Container: Send + crate::dataflow::channels::ContainerBytes,
101+
for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64 + 'static,
102+
{
103+
type Pusher = ExchangePusher<
104+
T,
105+
LogPusher<Box<dyn Push<Message<T, CB::Container>>>>,
106+
DrainContainerDistributor<CB, H>
107+
>;
108+
type Puller = LogPuller<Box<dyn Pull<Message<T, CB::Container>>>>;
96109

97-
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
98-
let (senders, receiver) = allocator.allocate::<Message<T, CB::Container>>(identifier, address);
99-
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
100-
let distributor = DrainContainerDistributor::new(self.hash_func, allocator.peers());
101-
(ExchangePusher::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
110+
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
111+
let (senders, receiver) = allocator.allocate::<Message<T, CB::Container>>(identifier, address);
112+
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
113+
let distributor = DrainContainerDistributor::new(self.hash_func, allocator.peers());
114+
(ExchangePusher::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
115+
}
102116
}
103-
}
104117

105-
impl<C, F> Debug for ExchangeCore<C, F> {
106-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107-
f.debug_struct("Exchange").finish()
118+
impl<C, F> Debug for ExchangeCore<C, F> {
119+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120+
f.debug_struct("Exchange").finish()
121+
}
108122
}
109123
}
110124

111-
/// Wraps a `Message<T,D>` pusher to provide a `Push<(T, Content<D>)>`.
112-
#[derive(Debug)]
113-
pub struct LogPusher<P> {
114-
pusher: P,
115-
channel: usize,
116-
counter: usize,
117-
source: usize,
118-
target: usize,
119-
logging: Option<Logger>,
120-
}
125+
pub use distributor::DistributorPact;
126+
/// Parallelization contract based on a `Distributor` implementation.
127+
mod distributor {
128+
129+
use std::rc::Rc;
130+
131+
use crate::Accountable;
132+
use crate::communication::{Push, Pull};
133+
use crate::dataflow::channels::pushers::{Exchange, exchange::Distributor};
134+
use crate::dataflow::channels::{ContainerBytes, Message};
135+
use crate::logging::TimelyLogger;
136+
use crate::progress::Timestamp;
137+
use crate::worker::AsWorker;
138+
139+
use super::{ParallelizationContract, LogPusher, LogPuller};
140+
141+
/// Intended to wrap a function from a `usize` to an `impl Distributor`.
142+
///
143+
/// For a `D: Distributor<C>` and an appropriate builder `D::new(peers: usize)`,
144+
/// a `DistributorPact(|peers| D::new(peers))` acts as a pact that will use `D`
145+
/// and distribute containers of type `C`.
146+
pub struct DistributorPact<B>(pub B);
121147

122-
impl<P> LogPusher<P> {
123-
/// Allocates a new pusher.
124-
pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
125-
LogPusher {
126-
pusher,
127-
channel,
128-
counter: 0,
129-
source,
130-
target,
131-
logging,
148+
impl<T, B, C, D> ParallelizationContract<T, C> for DistributorPact<B>
149+
where
150+
T: Timestamp,
151+
B: FnOnce(usize) -> D,
152+
C: Accountable + ContainerBytes + Send + 'static,
153+
D: Distributor<C> + 'static,
154+
{
155+
type Pusher = Exchange<
156+
T,
157+
LogPusher<Box<dyn Push<Message<T, C>>>>,
158+
D
159+
>;
160+
type Puller = LogPuller<Box<dyn Pull<Message<T, C>>>>;
161+
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<TimelyLogger>) -> (Self::Pusher, Self::Puller) {
162+
let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
163+
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
164+
let distributor = (self.0)(allocator.peers());
165+
(Exchange::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
132166
}
133167
}
134168
}
135169

136-
impl<T, C: Accountable, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<P> {
137-
#[inline]
138-
fn push(&mut self, pair: &mut Option<Message<T, C>>) {
139-
if let Some(bundle) = pair {
140-
self.counter += 1;
141-
142-
// Stamp the sequence number and source.
143-
// FIXME: Awkward moment/logic.
144-
bundle.seq = self.counter - 1;
145-
bundle.from = self.source;
146-
147-
if let Some(logger) = self.logging.as_ref() {
148-
logger.log(MessagesEvent {
149-
is_send: true,
150-
channel: self.channel,
151-
source: self.source,
152-
target: self.target,
153-
seq_no: self.counter - 1,
154-
record_count: bundle.data.record_count(),
155-
})
170+
pub use push_pull::{LogPusher, LogPuller};
171+
mod push_pull {
172+
173+
use crate::Accountable;
174+
use crate::communication::{Push, Pull};
175+
use crate::dataflow::channels::Message;
176+
use crate::logging::{TimelyLogger as Logger, MessagesEvent};
177+
178+
/// Wraps a `Message<T,D>` pusher to provide a `Push<(T, Content<D>)>`.
179+
#[derive(Debug)]
180+
pub struct LogPusher<P> {
181+
pusher: P,
182+
channel: usize,
183+
counter: usize,
184+
source: usize,
185+
target: usize,
186+
logging: Option<Logger>,
187+
}
188+
189+
impl<P> LogPusher<P> {
190+
/// Allocates a new pusher.
191+
pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
192+
LogPusher {
193+
pusher,
194+
channel,
195+
counter: 0,
196+
source,
197+
target,
198+
logging,
156199
}
157200
}
158-
159-
self.pusher.push(pair);
160201
}
161-
}
162202

163-
/// Wraps a `Message<T,D>` puller to provide a `Pull<(T, Content<D>)>`.
164-
#[derive(Debug)]
165-
pub struct LogPuller<P> {
166-
puller: P,
167-
channel: usize,
168-
index: usize,
169-
logging: Option<Logger>,
170-
}
203+
impl<T, C: Accountable, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<P> {
204+
#[inline]
205+
fn push(&mut self, pair: &mut Option<Message<T, C>>) {
206+
if let Some(bundle) = pair {
207+
self.counter += 1;
208+
209+
// Stamp the sequence number and source.
210+
// FIXME: Awkward moment/logic.
211+
bundle.seq = self.counter - 1;
212+
bundle.from = self.source;
213+
214+
if let Some(logger) = self.logging.as_ref() {
215+
logger.log(MessagesEvent {
216+
is_send: true,
217+
channel: self.channel,
218+
source: self.source,
219+
target: self.target,
220+
seq_no: self.counter - 1,
221+
record_count: bundle.data.record_count(),
222+
})
223+
}
224+
}
171225

172-
impl<P> LogPuller<P> {
173-
/// Allocates a new `Puller`.
174-
pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
175-
LogPuller {
176-
puller,
177-
channel,
178-
index,
179-
logging,
226+
self.pusher.push(pair);
180227
}
181228
}
182-
}
183229

184-
impl<T, C: Accountable, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<P> {
185-
#[inline]
186-
fn pull(&mut self) -> &mut Option<Message<T, C>> {
187-
let result = self.puller.pull();
188-
if let Some(bundle) = result {
189-
let channel = self.channel;
190-
let target = self.index;
191-
192-
if let Some(logger) = self.logging.as_ref() {
193-
logger.log(MessagesEvent {
194-
is_send: false,
195-
channel,
196-
source: bundle.from,
197-
target,
198-
seq_no: bundle.seq,
199-
record_count: bundle.data.record_count(),
200-
});
230+
/// Wraps a `Message<T,D>` puller to provide a `Pull<(T, Content<D>)>`.
231+
#[derive(Debug)]
232+
pub struct LogPuller<P> {
233+
puller: P,
234+
channel: usize,
235+
index: usize,
236+
logging: Option<Logger>,
237+
}
238+
239+
impl<P> LogPuller<P> {
240+
/// Allocates a new `Puller`.
241+
pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
242+
LogPuller {
243+
puller,
244+
channel,
245+
index,
246+
logging,
201247
}
202248
}
249+
}
203250

204-
result
251+
impl<T, C: Accountable, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<P> {
252+
#[inline]
253+
fn pull(&mut self) -> &mut Option<Message<T, C>> {
254+
let result = self.puller.pull();
255+
if let Some(bundle) = result {
256+
let channel = self.channel;
257+
let target = self.index;
258+
259+
if let Some(logger) = self.logging.as_ref() {
260+
logger.log(MessagesEvent {
261+
is_send: false,
262+
channel,
263+
source: bundle.from,
264+
target,
265+
seq_no: bundle.seq,
266+
record_count: bundle.data.record_count(),
267+
});
268+
}
269+
}
270+
271+
result
272+
}
205273
}
206-
}
274+
}

timely/src/dataflow/channels/pushers/exchange.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub trait Distributor<C> {
1919
/// Flush any remaining contents into the `pushers` at time `time`.
2020
fn flush<T: Clone, P: Push<Message<T, C>>>(&mut self, time: &T, pushers: &mut [P]);
2121
/// Optionally release resources, such as memory.
22-
fn relax(&mut self);
22+
fn relax(&mut self) { }
2323
}
2424

2525
/// A distributor creating containers from a drainable container based

0 commit comments

Comments
 (0)