Skip to content

Commit a34986d

Browse files
authored
Remove IterContainer (#707)
This change removes the IterContainer type and reworks other types and implementations to function independently. Notably, this is a breaking change for users of exotic containers: * The `Inspect` trait defers to `&C: IntoIterator` to reveal items in containers. Not all currently used containers provide this. Vec does, but Rc/Arc and Column don't and do not plan to in the future. Users should pivot to `inspect_container` instead and internalize the iteration in the closure. * The `DrainContainer` implementaiton for Rc/Arc depends on the wrapped type implementing `IntoIterator` for references. This limits where wrapped containers that do not provide this can be used. It's mostly limited to the core Timely layer that doesn't provide element-by-element functionality. I feel this change is net positive as it defers to the Rust API to iterate existing types, instead of us providing our own infrastructure. It comes at a cost of potentially breaking existing code, which seems acceptable. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent e42ac3d commit a34986d

File tree

5 files changed

+58
-80
lines changed

5 files changed

+58
-80
lines changed

container/src/lib.rs

Lines changed: 22 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
use std::collections::VecDeque;
66

7-
/// An type containing a number of records accounted for by progress tracking.
7+
/// A type containing a number of records accounted for by progress tracking.
88
///
99
/// The object stores a number of updates and thus is able to describe it count
1010
/// (`update_count()`) and whether it is empty (`is_empty()`). It is empty if the
@@ -19,23 +19,10 @@ pub trait Accountable {
1919
fn record_count(&self) -> i64;
2020

2121
/// Determine if this contains any updates, corresponding to `update_count() == 0`.
22-
/// It is a correctness error for this to by anything other than `self.record_count() == 0`.
22+
/// It is a correctness error for this to be anything other than `self.record_count() == 0`.
2323
#[inline] fn is_empty(&self) -> bool { self.record_count() == 0 }
2424
}
2525

26-
/// A container that allows iteration morally equivalent to [`IntoIterator`].
27-
///
28-
/// Iterating the container presents items in an implementation-specific order.
29-
/// The container's contents are not changed.
30-
pub trait IterContainer {
31-
/// The type of elements when reading non-destructively from the container.
32-
type ItemRef<'a> where Self: 'a;
33-
/// Iterator type when reading from the container.
34-
type Iter<'a>: Iterator<Item=Self::ItemRef<'a>> where Self: 'a;
35-
/// Returns an iterator that reads the contents of this container.
36-
fn iter(&self) -> Self::Iter<'_>;
37-
}
38-
3926
/// A container that can drain itself.
4027
///
4128
/// Draining the container presents items in an implementation-specific order.
@@ -191,14 +178,6 @@ impl<T> Accountable for Vec<T> {
191178
#[inline] fn is_empty(&self) -> bool { Vec::is_empty(self) }
192179
}
193180

194-
impl<T> IterContainer for Vec<T> {
195-
type ItemRef<'a> = &'a T where T: 'a;
196-
type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
197-
#[inline] fn iter(&self) -> Self::Iter<'_> {
198-
self.as_slice().iter()
199-
}
200-
}
201-
202181
impl<T> DrainContainer for Vec<T> {
203182
type Item<'a> = T where T: 'a;
204183
type DrainIter<'a> = std::vec::Drain<'a, T> where Self: 'a;
@@ -246,46 +225,32 @@ impl<T: Clone> PushInto<&&T> for Vec<T> {
246225
}
247226

248227
mod rc {
249-
use std::ops::Deref;
250-
use std::rc::Rc;
251-
252-
use crate::{IterContainer, DrainContainer};
253-
254-
impl<T: crate::Accountable> crate::Accountable for Rc<T> {
255-
#[inline] fn record_count(&self) -> i64 { std::ops::Deref::deref(self).record_count() }
256-
#[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
257-
}
258-
impl<T: IterContainer> IterContainer for Rc<T> {
259-
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
260-
type Iter<'a> = T::Iter<'a> where Self: 'a;
261-
#[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
228+
impl<T: crate::Accountable> crate::Accountable for std::rc::Rc<T> {
229+
#[inline] fn record_count(&self) -> i64 { self.as_ref().record_count() }
230+
#[inline] fn is_empty(&self) -> bool { self.as_ref().is_empty() }
262231
}
263-
impl<T: IterContainer> DrainContainer for Rc<T> {
264-
type Item<'a> = T::ItemRef<'a> where Self: 'a;
265-
type DrainIter<'a> = T::Iter<'a> where Self: 'a;
266-
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
232+
impl<T> crate::DrainContainer for std::rc::Rc<T>
233+
where
234+
for<'a> &'a T: IntoIterator
235+
{
236+
type Item<'a> = <&'a T as IntoIterator>::Item where Self: 'a;
237+
type DrainIter<'a> = <&'a T as IntoIterator>::IntoIter where Self: 'a;
238+
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.into_iter() }
267239
}
268240
}
269241

270242
mod arc {
271-
use std::ops::Deref;
272-
use std::sync::Arc;
273-
274-
use crate::{IterContainer, DrainContainer};
275-
276-
impl<T: crate::Accountable> crate::Accountable for Arc<T> {
277-
#[inline] fn record_count(&self) -> i64 { std::ops::Deref::deref(self).record_count() }
278-
#[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
279-
}
280-
impl<T: IterContainer> IterContainer for Arc<T> {
281-
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
282-
type Iter<'a> = T::Iter<'a> where Self: 'a;
283-
#[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
243+
impl<T: crate::Accountable> crate::Accountable for std::sync::Arc<T> {
244+
#[inline] fn record_count(&self) -> i64 { self.as_ref().record_count() }
245+
#[inline] fn is_empty(&self) -> bool { self.as_ref().is_empty() }
284246
}
285-
impl<T: IterContainer> DrainContainer for Arc<T> {
286-
type Item<'a> = T::ItemRef<'a> where Self: 'a;
287-
type DrainIter<'a> = T::Iter<'a> where Self: 'a;
288-
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
247+
impl<T> crate::DrainContainer for std::sync::Arc<T>
248+
where
249+
for<'a> &'a T: IntoIterator
250+
{
251+
type Item<'a> = <&'a T as IntoIterator>::Item where Self: 'a;
252+
type DrainIter<'a> = <&'a T as IntoIterator>::IntoIter where Self: 'a;
253+
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.into_iter() }
289254
}
290255
}
291256

timely/examples/columnar.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
33
use std::collections::HashMap;
44

5-
use timely::container::{IterContainer, CapacityContainerBuilder};
5+
use columnar::Index;
6+
use timely::Accountable;
7+
use timely::container::CapacityContainerBuilder;
68
use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
79
use timely::dataflow::InputHandleCore;
8-
use timely::dataflow::operators::{Inspect, Operator, Probe};
10+
use timely::dataflow::operators::{InspectCore, Operator, Probe};
911
use timely::dataflow::ProbeHandle;
1012

1113
// Creates `WordCountContainer` and `WordCountReference` structs,
@@ -44,7 +46,7 @@ fn main() {
4446
move |input, output| {
4547
while let Some((time, data)) = input.next() {
4648
let mut session = output.session(&time);
47-
for wordcount in data.iter().flat_map(|wordcount| {
49+
for wordcount in data.borrow().into_index_iter().flat_map(|wordcount| {
4850
wordcount.text.split_whitespace().map(move |text| WordCountReference { text, diff: wordcount.diff })
4951
}) {
5052
session.give(wordcount);
@@ -73,7 +75,7 @@ fn main() {
7375
if !input.frontier().less_equal(key.time()) {
7476
let mut session = output.session(key);
7577
for batch in val.drain(..) {
76-
for wordcount in batch.iter() {
78+
for wordcount in batch.borrow().into_index_iter() {
7779
let total =
7880
if let Some(count) = counts.get_mut(wordcount.text) {
7981
*count += wordcount.diff;
@@ -94,7 +96,17 @@ fn main() {
9496
},
9597
)
9698
.container::<Container>()
97-
.inspect(|x| println!("seen: {:?}", x))
99+
.inspect_container(|x| {
100+
match x {
101+
Ok((time, data)) => {
102+
println!("seen at: {:?}\t{:?} records", time, data.record_count());
103+
for wc in data.borrow().into_index_iter() {
104+
println!(" {}: {}", wc.text, wc.diff);
105+
}
106+
},
107+
Err(frontier) => println!("frontier advanced to {:?}", frontier),
108+
}
109+
})
98110
.probe_with(&probe);
99111
});
100112

@@ -167,7 +179,7 @@ mod container {
167179

168180
impl<C: columnar::ContainerBytes> Column<C> {
169181
/// Borrows the contents no matter their representation.
170-
#[inline(always)] fn borrow(&self) -> C::Borrowed<'_> {
182+
#[inline(always)] pub fn borrow(&self) -> C::Borrowed<'_> {
171183
match self {
172184
Column::Typed(t) => t.borrow(),
173185
Column::Bytes(b) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))),
@@ -180,11 +192,6 @@ mod container {
180192
#[inline] fn record_count(&self) -> i64 { i64::try_from(self.borrow().len()).unwrap() }
181193
#[inline] fn is_empty(&self) -> bool { self.borrow().is_empty() }
182194
}
183-
impl<C: columnar::ContainerBytes> timely::container::IterContainer for Column<C> {
184-
type ItemRef<'a> = C::Ref<'a>;
185-
type Iter<'a> = IterOwn<C::Borrowed<'a>>;
186-
fn iter<'a>(&'a self) -> Self::Iter<'a> { self.borrow().into_index_iter() }
187-
}
188195
impl<C: columnar::ContainerBytes> timely::container::DrainContainer for Column<C> {
189196
type Item<'a> = C::Ref<'a>;
190197
type DrainIter<'a> = IterOwn<C::Borrowed<'a>>;

timely/src/dataflow/operators/core/input.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ pub trait Input : Scope {
7575
/// ```
7676
/// use std::rc::Rc;
7777
/// use timely::*;
78-
/// use timely::dataflow::operators::core::{Input, Inspect};
78+
/// use timely::dataflow::operators::core::{Input, InspectCore};
7979
/// use timely::container::CapacityContainerBuilder;
8080
///
8181
/// // construct and execute a timely dataflow
@@ -84,7 +84,7 @@ pub trait Input : Scope {
8484
/// // add an input and base computation off of it
8585
/// let mut input = worker.dataflow(|scope| {
8686
/// let (input, stream) = scope.new_input_with_builder::<CapacityContainerBuilder<Rc<Vec<_>>>>();
87-
/// stream.inspect(|x| println!("hello {:?}", x));
87+
/// stream.inspect_container(|x| println!("hello {:?}", x));
8888
/// input
8989
/// });
9090
///

timely/src/dataflow/operators/core/inspect.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
//! Extension trait and implementation for observing and action on streamed data.
22
33
use crate::Container;
4-
use crate::container::IterContainer;
54
use crate::dataflow::channels::pact::Pipeline;
65
use crate::dataflow::{Scope, StreamCore};
76
use crate::dataflow::operators::generic::Operator;
87

98
/// Methods to inspect records and batches of records on a stream.
10-
pub trait Inspect<G: Scope, C: IterContainer>: InspectCore<G, C> + Sized {
9+
pub trait Inspect<G: Scope, C>: InspectCore<G, C> + Sized
10+
where
11+
for<'a> &'a C: IntoIterator,
12+
{
1113
/// Runs a supplied closure on each observed data element.
1214
///
1315
/// # Examples
@@ -21,10 +23,10 @@ pub trait Inspect<G: Scope, C: IterContainer>: InspectCore<G, C> + Sized {
2123
/// ```
2224
fn inspect<F>(&self, mut func: F) -> Self
2325
where
24-
F: for<'a> FnMut(C::ItemRef<'a>) + 'static,
26+
F: for<'a> FnMut(<&'a C as IntoIterator>::Item) + 'static,
2527
{
2628
self.inspect_batch(move |_, data| {
27-
for datum in data.iter() { func(datum); }
29+
for datum in data.into_iter() { func(datum); }
2830
})
2931
}
3032

@@ -41,10 +43,10 @@ pub trait Inspect<G: Scope, C: IterContainer>: InspectCore<G, C> + Sized {
4143
/// ```
4244
fn inspect_time<F>(&self, mut func: F) -> Self
4345
where
44-
F: for<'a> FnMut(&G::Timestamp, C::ItemRef<'a>) + 'static,
46+
F: for<'a> FnMut(&G::Timestamp, <&'a C as IntoIterator>::Item) + 'static,
4547
{
4648
self.inspect_batch(move |time, data| {
47-
for datum in data.iter() {
49+
for datum in data.into_iter() {
4850
func(time, datum);
4951
}
5052
})
@@ -91,7 +93,10 @@ pub trait Inspect<G: Scope, C: IterContainer>: InspectCore<G, C> + Sized {
9193
fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
9294
}
9395

94-
impl<G: Scope, C: Container + IterContainer> Inspect<G, C> for StreamCore<G, C> {
96+
impl<G: Scope, C: Container> Inspect<G, C> for StreamCore<G, C>
97+
where
98+
for<'a> &'a C: IntoIterator,
99+
{
95100
fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static {
96101
self.inspect_container(func)
97102
}

timely/src/dataflow/operators/core/rc.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@ pub trait SharedStream<S: Scope, C> {
1212
///
1313
/// # Examples
1414
/// ```
15-
/// use timely::dataflow::operators::{ToStream, Inspect};
15+
/// use timely::dataflow::operators::{ToStream, InspectCore};
1616
/// use timely::dataflow::operators::rc::SharedStream;
1717
///
1818
/// timely::example(|scope| {
1919
/// (0..10).to_stream(scope)
2020
/// .shared()
21-
/// .inspect(|x| println!("seen: {:?}", x));
21+
/// .inspect_container(|x| println!("seen: {:?}", x));
2222
/// });
2323
/// ```
2424
fn shared(&self) -> StreamCore<S, Rc<C>>;
@@ -43,12 +43,13 @@ mod test {
4343
use crate::dataflow::channels::pact::Pipeline;
4444
use crate::dataflow::operators::capture::Extract;
4545
use crate::dataflow::operators::rc::SharedStream;
46-
use crate::dataflow::operators::{Capture, Concatenate, Operator, ToStream};
46+
use crate::dataflow::operators::{Capture, Concatenate, InspectCore, Operator, ToStream};
4747

4848
#[test]
4949
fn test_shared() {
5050
let output = crate::example(|scope| {
5151
let shared = vec![Ok(0), Err(())].to_stream(scope).container::<Vec<_>>().shared();
52+
let shared = shared.inspect_container(|x| println!("seen: {x:?}"));
5253
scope
5354
.concatenate([
5455
shared.unary(Pipeline, "read shared 1", |_, _| {

0 commit comments

Comments
 (0)