Skip to content

Commit c47169c

Browse files
committed
Remove constraints from container builders
Removes the constraints from container builder's container type. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent a34986d commit c47169c

File tree

13 files changed

+41
-37
lines changed

13 files changed

+41
-37
lines changed

container/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,11 @@ pub trait PushInto<T> {
8585
///
8686
/// The trait does not prescribe any specific ordering guarantees, and each implementation can
8787
/// decide to represent a push order for `extract` and `finish`, or not.
88-
pub trait ContainerBuilder: Default + 'static {
88+
pub trait ContainerBuilder: Default {
8989
/// The container type we're building.
9090
// The container is `Clone` because `Tee` requires it, otherwise we need to repeat it
9191
// all over Timely. `'static` because we don't want lifetimes everywhere.
92-
type Container: Accountable + Default + Clone + 'static;
92+
type Container;
9393
/// Extract assembled containers, potentially leaving unfinished data behind. Can
9494
/// be called repeatedly, for example while the caller can send data.
9595
///

logging/src/lib.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl Registry {
3434
/// need to do with containers they receive and what properties to uphold.
3535
///
3636
/// Passing a `&mut None` container to an action indicates a flush.
37-
pub fn insert<CB: ContainerBuilder, F: FnMut(&Duration, &mut Option<CB::Container>)+'static>(
37+
pub fn insert<CB: ContainerBuilder<Container: Default> + 'static, F: FnMut(&Duration, &mut Option<CB::Container>)+'static>(
3838
&mut self,
3939
name: &str,
4040
action: F) -> Option<Box<dyn Any>>
@@ -44,7 +44,7 @@ impl Registry {
4444
}
4545

4646
/// Binds a log name to a logger.
47-
pub fn insert_logger<CB: ContainerBuilder>(&mut self, name: &str, logger: Logger<CB>) -> Option<Box<dyn Any>> {
47+
pub fn insert_logger<CB: ContainerBuilder<Container: Default> + 'static>(&mut self, name: &str, logger: Logger<CB>) -> Option<Box<dyn Any>> {
4848
self.map.insert(name.to_owned(), (Box::new(logger.clone()), Box::new(logger))).map(|x| x.0)
4949
}
5050

@@ -59,7 +59,7 @@ impl Registry {
5959
}
6060

6161
/// Retrieves a shared logger, if one has been inserted.
62-
pub fn get<CB: ContainerBuilder>(&self, name: &str) -> Option<Logger<CB>> {
62+
pub fn get<CB: ContainerBuilder<Container: Default> + 'static>(&self, name: &str) -> Option<Logger<CB>> {
6363
self.map
6464
.get(name)
6565
.and_then(|entry| entry.0.downcast_ref::<Logger<CB>>())
@@ -89,27 +89,27 @@ impl Flush for Registry {
8989
}
9090

9191
/// A buffering logger.
92-
pub struct Logger<CB: ContainerBuilder> {
92+
pub struct Logger<CB: ContainerBuilder<Container: Default>> {
9393
inner: Rc<RefCell<LoggerInner<CB, dyn FnMut(&Duration, &mut Option<CB::Container>)>>>,
9494
}
9595

96-
impl<CB: ContainerBuilder> Clone for Logger<CB> {
96+
impl<CB: ContainerBuilder<Container: Default>> Clone for Logger<CB> {
9797
fn clone(&self) -> Self {
9898
Self {
9999
inner: Rc::clone(&self.inner)
100100
}
101101
}
102102
}
103103

104-
impl<CB: ContainerBuilder + Debug> Debug for Logger<CB> {
104+
impl<CB: ContainerBuilder<Container: Default> + Debug> Debug for Logger<CB> {
105105
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106106
f.debug_struct("Logger")
107107
.field("inner", &self.inner)
108108
.finish()
109109
}
110110
}
111111

112-
struct LoggerInner<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> {
112+
struct LoggerInner<CB: ContainerBuilder<Container: Default>, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> {
113113
/// common instant used for all loggers.
114114
time: Instant,
115115
/// offset to allow re-calibration.
@@ -120,7 +120,7 @@ struct LoggerInner<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Optio
120120
action: A,
121121
}
122122

123-
impl<CB: ContainerBuilder> Logger<CB> {
123+
impl<CB: ContainerBuilder<Container: Default>> Logger<CB> {
124124
/// Allocates a new shareable logger bound to a write destination.
125125
pub fn new<F>(time: Instant, offset: Duration, action: F) -> Self
126126
where
@@ -185,12 +185,12 @@ impl<CB: ContainerBuilder> Logger<CB> {
185185
///
186186
/// Construct a `TypedLogger` with [`Logger::into_typed`] or by calling `into` on a `Logger`.
187187
#[derive(Debug)]
188-
pub struct TypedLogger<CB: ContainerBuilder, T> {
188+
pub struct TypedLogger<CB: ContainerBuilder<Container: Default>, T> {
189189
inner: Logger<CB>,
190190
_marker: PhantomData<T>,
191191
}
192192

193-
impl<CB: ContainerBuilder, T> TypedLogger<CB, T> {
193+
impl<CB: ContainerBuilder<Container: Default>, T> TypedLogger<CB, T> {
194194
/// Logs an event. Equivalent to [`Logger::log`], with the exception that it converts the
195195
/// event to `T` before logging.
196196
pub fn log<S: Into<T>>(&self, event: S)
@@ -211,7 +211,7 @@ impl<CB: ContainerBuilder, T> TypedLogger<CB, T> {
211211
}
212212
}
213213

214-
impl<CB: ContainerBuilder, T> Clone for TypedLogger<CB, T> {
214+
impl<CB: ContainerBuilder<Container: Default>, T> Clone for TypedLogger<CB, T> {
215215
fn clone(&self) -> Self {
216216
Self {
217217
inner: self.inner.clone(),
@@ -220,7 +220,7 @@ impl<CB: ContainerBuilder, T> Clone for TypedLogger<CB, T> {
220220
}
221221
}
222222

223-
impl<CB: ContainerBuilder, T> From<Logger<CB>> for TypedLogger<CB, T> {
223+
impl<CB: ContainerBuilder<Container: Default>, T> From<Logger<CB>> for TypedLogger<CB, T> {
224224
fn from(inner: Logger<CB>) -> Self {
225225
TypedLogger {
226226
inner,
@@ -229,14 +229,14 @@ impl<CB: ContainerBuilder, T> From<Logger<CB>> for TypedLogger<CB, T> {
229229
}
230230
}
231231

232-
impl<CB: ContainerBuilder, T> std::ops::Deref for TypedLogger<CB, T> {
232+
impl<CB: ContainerBuilder<Container: Default>, T> std::ops::Deref for TypedLogger<CB, T> {
233233
type Target = Logger<CB>;
234234
fn deref(&self) -> &Self::Target {
235235
&self.inner
236236
}
237237
}
238238

239-
impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> LoggerInner<CB, A> {
239+
impl<CB: ContainerBuilder<Container: Default>, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> LoggerInner<CB, A> {
240240
/// Push a container with a time at an action.
241241
#[inline]
242242
fn push(action: &mut A, time: &Duration, container: &mut CB::Container) {
@@ -272,15 +272,15 @@ impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Containe
272272
}
273273

274274
/// Flush on the *last* drop of a logger.
275-
impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> Drop for LoggerInner<CB, A> {
275+
impl<CB: ContainerBuilder<Container: Default>, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> Drop for LoggerInner<CB, A> {
276276
fn drop(&mut self) {
277277
self.flush();
278278
}
279279
}
280280

281281
impl<CB, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> Debug for LoggerInner<CB, A>
282282
where
283-
CB: ContainerBuilder + Debug,
283+
CB: ContainerBuilder<Container: Default> + Debug,
284284
{
285285
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286286
f.debug_struct("LoggerInner")
@@ -298,7 +298,7 @@ trait Flush {
298298
fn flush(&self);
299299
}
300300

301-
impl<CB: ContainerBuilder> Flush for Logger<CB> {
301+
impl<CB: ContainerBuilder<Container: Default>> Flush for Logger<CB> {
302302
fn flush(&self) {
303303
self.inner.borrow_mut().flush()
304304
}

timely/src/dataflow/channels/pushers/buffer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
//! with the performance of batched sends.
33
44
use crate::communication::Push;
5-
use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto};
5+
use crate::container::{CapacityContainerBuilder, PushInto};
66
use crate::dataflow::channels::Message;
77
use crate::dataflow::operators::Capability;
88
use crate::progress::Timestamp;
9-
use crate::{Container, Accountable};
9+
use crate::{Container, ContainerBuilder, Accountable};
1010

1111
/// Buffers data sent at the same time, for efficient communication.
1212
///

timely/src/dataflow/channels/pushers/exchange.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
//! The exchange pattern distributes pushed data between many target pushees.
22
3+
use crate::ContainerBuilder;
34
use crate::communication::Push;
4-
use crate::container::{ContainerBuilder, DrainContainer, PushInto};
5+
use crate::container::{DrainContainer, PushInto};
56
use crate::dataflow::channels::Message;
67

78
/// Distribute containers to several pushers.

timely/src/dataflow/operators/core/input.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33
use std::rc::Rc;
44
use std::cell::RefCell;
55

6-
use crate::container::{CapacityContainerBuilder, ContainerBuilder, PushInto};
6+
use crate::container::{CapacityContainerBuilder, PushInto};
77

88
use crate::scheduling::{Schedule, Activator};
99

1010
use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch};
1111
use crate::progress::Source;
1212
use crate::progress::operate::Connectivity;
13-
use crate::{Accountable, Container};
13+
use crate::{Accountable, Container, ContainerBuilder};
1414
use crate::communication::Push;
1515
use crate::dataflow::{Scope, ScopeParent, StreamCore};
1616
use crate::dataflow::channels::pushers::{Tee, Counter};

timely/src/dataflow/operators/core/partition.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
//! Partition a stream of records into multiple streams.
22
3-
use crate::container::{DrainContainer, ContainerBuilder, PushInto};
3+
use crate::container::{DrainContainer, PushInto};
44
use crate::dataflow::channels::pact::Pipeline;
55
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
66
use crate::dataflow::operators::InputCapability;
77
use crate::dataflow::{Scope, StreamCore};
8-
use crate::Container;
8+
use crate::{Container, ContainerBuilder};
99

1010
/// Partition a stream of records into multiple streams.
1111
pub trait Partition<G: Scope, C: DrainContainer> {

timely/src/dataflow/operators/core/to_stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Conversion to the `StreamCore` type from iterators.
22
3-
use crate::container::{CapacityContainerBuilder, ContainerBuilder, SizableContainer, PushInto};
4-
use crate::Container;
3+
use crate::container::{CapacityContainerBuilder, SizableContainer, PushInto};
4+
use crate::{Container, ContainerBuilder};
55
use crate::dataflow::operators::generic::operator::source;
66
use crate::dataflow::{StreamCore, Scope};
77

timely/src/dataflow/operators/core/unordered_input.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
use std::rc::Rc;
44
use std::cell::RefCell;
55

6-
use crate::Container;
7-
use crate::container::{ContainerBuilder, CapacityContainerBuilder};
6+
use crate::{Container, ContainerBuilder};
7+
use crate::container::{CapacityContainerBuilder};
88

99
use crate::scheduling::{Schedule, ActivateOnDrop};
1010

timely/src/dataflow/operators/generic/builder_rc.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ use crate::progress::{ChangeBatch, Timestamp};
88
use crate::progress::operate::SharedProgress;
99
use crate::progress::frontier::{Antichain, MutableAntichain};
1010

11-
use crate::Container;
12-
use crate::container::ContainerBuilder;
11+
use crate::{Container, ContainerBuilder};
1312
use crate::dataflow::{Scope, StreamCore};
1413
use crate::dataflow::channels::pushers::Tee;
1514
use crate::dataflow::channels::pushers::Counter as PushCounter;

timely/src/dataflow/operators/generic/handles.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ use crate::dataflow::channels::pushers::Counter as PushCounter;
1515
use crate::dataflow::channels::pushers::buffer::{Buffer, Session};
1616
use crate::dataflow::channels::Message;
1717
use crate::communication::{Push, Pull};
18-
use crate::{Container, Accountable};
19-
use crate::container::{ContainerBuilder, CapacityContainerBuilder};
18+
use crate::{Accountable, Container, ContainerBuilder};
19+
use crate::container::CapacityContainerBuilder;
2020

2121
use crate::dataflow::operators::InputCapability;
2222
use crate::dataflow::operators::capability::CapabilityTrait;

0 commit comments

Comments
 (0)