Skip to content

Commit e42ac3d

Browse files
authored
Replace ExchangeCore with DistributePact (#711)
This removes a bunch of now redundant code. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 474bac1 commit e42ac3d

File tree

1 file changed

+10
-52
lines changed
  • timely/src/dataflow/channels

1 file changed

+10
-52
lines changed

timely/src/dataflow/channels/pact.rs

Lines changed: 10 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -44,22 +44,14 @@ impl<T: 'static, C: Accountable + 'static> ParallelizationContract<T, C> for Pip
4444
pub use exchange::{ExchangeCore, Exchange};
4545
mod exchange {
4646

47-
use std::{fmt::{self, Debug}, marker::PhantomData};
48-
use std::rc::Rc;
49-
50-
use crate::container::{ContainerBuilder, DrainContainer, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto};
51-
use crate::communication::{Push, Pull};
52-
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
47+
use crate::Container;
48+
use crate::container::{DrainContainer, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder};
5349
use crate::dataflow::channels::pushers::exchange::DrainContainerDistributor;
54-
use crate::dataflow::channels::Message;
55-
use crate::logging::TimelyLogger as Logger;
56-
use crate::progress::Timestamp;
57-
use crate::worker::AsWorker;
5850

59-
use super::{ParallelizationContract, LogPusher, LogPuller};
51+
use super::DistributorPact;
6052

6153
/// An exchange between multiple observers by data
62-
pub struct ExchangeCore<CB, F> { hash_func: F, phantom: PhantomData<CB> }
54+
pub type ExchangeCore<CB, F> = DistributorPact<Box<dyn FnOnce(usize) -> DrainContainerDistributor<CB, F>>>;
6355

6456
/// [ExchangeCore] specialized to vector-based containers.
6557
pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;
@@ -68,56 +60,22 @@ mod exchange {
6860
where
6961
CB: LengthPreservingContainerBuilder,
7062
CB::Container: DrainContainer,
71-
for<'a> F: FnMut(&<CB::Container as DrainContainer>::Item<'a>)->u64
63+
for<'a> F: FnMut(&<CB::Container as DrainContainer>::Item<'a>)->u64 + 'static
7264
{
7365
/// Allocates a new `Exchange` pact from a distribution function.
7466
pub fn new_core(func: F) -> ExchangeCore<CB, F> {
75-
ExchangeCore {
76-
hash_func: func,
77-
phantom: PhantomData,
78-
}
67+
DistributorPact(Box::new(move |peers| DrainContainerDistributor::new(func, peers)))
7968
}
8069
}
8170

8271
impl<C, F> ExchangeCore<CapacityContainerBuilder<C>, F>
8372
where
84-
C: SizableContainer + DrainContainer,
85-
for<'a> F: FnMut(&C::Item<'a>)->u64
73+
C: Container + SizableContainer + DrainContainer,
74+
for<'a> F: FnMut(&C::Item<'a>)->u64 + 'static
8675
{
8776
/// Allocates a new `Exchange` pact from a distribution function.
8877
pub fn new(func: F) -> ExchangeCore<CapacityContainerBuilder<C>, F> {
89-
ExchangeCore {
90-
hash_func: func,
91-
phantom: PhantomData,
92-
}
93-
}
94-
}
95-
96-
// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
97-
impl<T: Timestamp, CB, H> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
98-
where
99-
CB: ContainerBuilder<Container: DrainContainer> + for<'a> PushInto<<CB::Container as DrainContainer>::Item<'a>>,
100-
CB::Container: Send + crate::dataflow::channels::ContainerBytes,
101-
for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64 + 'static,
102-
{
103-
type Pusher = ExchangePusher<
104-
T,
105-
LogPusher<Box<dyn Push<Message<T, CB::Container>>>>,
106-
DrainContainerDistributor<CB, H>
107-
>;
108-
type Puller = LogPuller<Box<dyn Pull<Message<T, CB::Container>>>>;
109-
110-
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
111-
let (senders, receiver) = allocator.allocate::<Message<T, CB::Container>>(identifier, address);
112-
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
113-
let distributor = DrainContainerDistributor::new(self.hash_func, allocator.peers());
114-
(ExchangePusher::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
115-
}
116-
}
117-
118-
impl<C, F> Debug for ExchangeCore<C, F> {
119-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120-
f.debug_struct("Exchange").finish()
78+
DistributorPact(Box::new(move |peers| DrainContainerDistributor::new(func, peers)))
12179
}
12280
}
12381
}
@@ -271,4 +229,4 @@ mod push_pull {
271229
result
272230
}
273231
}
274-
}
232+
}

0 commit comments

Comments
 (0)