Skip to content
53 changes: 53 additions & 0 deletions Sources/SignalProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1929,6 +1929,59 @@ extension SignalProducerProtocol {
}
}
}

/// Creates a new `SignalProducer` that will bucket each received value into
/// a group based on the key returned from `grouping`.
///
/// Termination events on the original signal are also forwarded to each
/// producer group.
///
/// - parameters:
/// - grouping: a closure that determines the grouping key for a given value
///
/// - returns: A producer of producers that emits one producer for each group and forwards
/// each value from the original producer to the inner producer corresponding
/// to the group to which the value belongs to (as determined by the key)
public func group<Key: Hashable>(by grouping: @escaping (Value) -> Key) -> SignalProducer<(Key, SignalProducer<Value, Error>), Error> {
return SignalProducer { observer, disposable in
let groups = Atomic<[Key: Signal<Value, Error>.Observer]>([:])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem necessary to specialise it. Is it a workaround to the compiler's weirdness or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid I don't know what exactly you mean.

Copy link
Member

@andersio andersio Dec 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean if you really need to write SignalProducer<(Key, SignalProducer<Value, Error>), Error> when the type parameters can be inferred.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that line. Yeah that works without explicit annotations 👍


disposable += self.start { event in
switch event {
case let .value(value):
let key = grouping(value)
let group: Signal<Value, Error>.Observer = groups.modify { groups in
if let group = groups[key] {
return group
} else {
let (signal, innerObserver) = Signal<Value, Error>.pipe()
let producer = SignalProducer(signal).replayLazily(upTo: Int.max)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is replaying values an essential part of group(by:)? This is my primary hesitation. In general, we try to discourage the use of replaying (which is why buffer was removed).

(You could just as easily send the Signal and not send values on the Signal until it's been sent—giving observers a chance to observe the signal.)

If you feel that it is, I'd love to see a larger code sample that demonstrates why.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, otherwise consumers could miss events without realizing it. Hence why I erred on the side of buffering. I could be convinced otherwise, but I'm not sure how to best communicate those semantics at the operator level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think returning Signals instead of SignalProducers communicates the semantics.

But I think seeing an actual example of this operator will make it easier to see whether that's a viable change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've posted an example how we use the groupBy in a project in the Issue: #197 (comment)

Copy link
Contributor Author

@iv-mexx iv-mexx Feb 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading this correctly, RxJS is using Hot Observables for the groups (via refCount)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As much as I agree with @mdiep about the semantics, in my experience having buffering in this operator (specifically RxJava's Observable#groupBy) has been valuable .

Copy link
Member

@andersio andersio Feb 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps should the buffering be an option?

Copy link
Member

@andersio andersio Mar 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mdiep

(You could just as easily send the Signal and not send values on the Signal until it's been sent—giving observers a chance to observe the signal.)

While this is the ideal case, it seems the buffering is essential, say when using with flatten(.concat), which can delay the establishment of the observation.

Edit: I'd suggest to annotate it with warnings for the potential of indefinite buffering, and perhaps offer a non-buffering variant.

Edit: If we have to find a middle ground, perhaps the emitted inner producers should buffer until & only replay values at the first time it started (future work: a configurable #?). Subsequent starts would be invalid and emit interrupted. This should satisfy the common use cases, e.g. @iv-mexx's example, which do not retain and repeatedly start the inner producers.


// Start the buffering immediately.
producer.start().dispose()
observer.send(value: (key, producer))

groups[key] = innerObserver
return innerObserver
}
}
group.send(value: value)

case let .failed(error):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Nit] Blank line before the pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

observer.send(error: error)
groups.value.values.forEach { $0.send(error: error) }

case .completed:
observer.sendCompleted()
groups.value.values.forEach { $0.sendCompleted() }

case .interrupted:
observer.sendInterrupted()
groups.value.values.forEach { $0.sendInterrupted() }
}
}
}
}
}

extension SignalProducerProtocol where Value == Bool {
Expand Down
188 changes: 188 additions & 0 deletions Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1926,5 +1926,193 @@ class SignalProducerLiftingSpec: QuickSpec {
expect(latestValues?.1) == 2
}
}

describe("groupBy") {
Copy link
Member

@andersio andersio Dec 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if there are specialized test cases than one giant one. At least this single test case apparently did not catch the undisposed upstreams, caused by this line.

Say we could use a few test cases on how the inner producers behaves with regard to an outer terminal event, and keep this test case focusing on values.

Copy link
Contributor Author

@iv-mexx iv-mexx Dec 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah right, i've split this up into 2 separated cases and extended them to check the lifetime of the inner producers - now the cases catch the disposal issue you've found!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps one per terminal event, hmm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I can do that.
Should we check for all events that only this event is sent (e.g. when sending completed, also check that interrupted was not received), or can we trust that when completed is received, interrupted can not be received?

Copy link
Member

@andersio andersio Dec 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think asserting just the specific event is enough. The Signal contract guarantees only one would ever be sent. If multiple terminal events are sent, this would be a fault in the Signal basics and should have been caught somewhere else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright

it("should group events by their key") {
let (signal, observer) = Signal<Int, NoError>.pipe()
let producer = SignalProducer<Int, NoError>(signal)
var evens: [Int] = []
var odds: [Int] = []
let disposable = producer
.group { $0 % 2 == 0 }
.startWithValues { key, group in
if key {
group.startWithValues { evens.append($0)}
} else {
group.startWithValues { odds.append($0)}
}
}

observer.send(value: 1)
expect(evens) == []
expect(odds) == [1]

observer.send(value: 2)
expect(evens) == [2]
expect(odds) == [1]

observer.send(value: 3)
expect(evens) == [2]
expect(odds) == [1, 3]

disposable.dispose()

observer.send(value: 4)
expect(evens) == [2]
expect(odds) == [1, 3]
}

it("should terminate correctly on disposal") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Nit] A blank line please. 🙏

let (signal, observer) = Signal<Int, NoError>.pipe()
let producer = SignalProducer<Int, NoError>(signal)
var completed = false
var interrupted = false
var evensInterrupted = false
var oddsInterrupted = false

let disposable = producer
.group { $0 % 2 == 0 }
.start { event in
switch event {
case let .value(key, group):
if key {
group.startWithInterrupted { evensInterrupted = true }
} else {
group.startWithInterrupted { oddsInterrupted = true }
}
case .completed:
completed = true
case .interrupted:
interrupted = true
case .failed:
break
}
}

observer.send(value: 1)
observer.send(value: 2)

disposable.dispose()

expect(completed) == false
expect(interrupted) == true
expect(evensInterrupted) == true
expect(oddsInterrupted) == true
}
}

it("should terminate correctly when receiving an interrupted event") {
let (signal, observer) = Signal<Int, NoError>.pipe()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please wrap test cases in it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woops

let producer = SignalProducer<Int, NoError>(signal)
var completed = false
var interrupted = false
var evensInterrupted = false
var oddsInterrupted = false

producer
.group { $0 % 2 == 0 }
.start { event in
switch event {
case let .value(key, group):
if key {
group.startWithInterrupted { evensInterrupted = true }
} else {
group.startWithInterrupted { oddsInterrupted = true }
}
case .completed:
completed = true
case .interrupted:
interrupted = true
case .failed:
break
}
}

observer.send(value: 1)
observer.send(value: 2)
observer.sendInterrupted()

expect(completed) == false
expect(interrupted) == true
expect(evensInterrupted) == true
expect(oddsInterrupted) == true
}

it("should terminate correctly receiving a completed event") {
let (signal, observer) = Signal<Int, NoError>.pipe()
let producer = SignalProducer<Int, NoError>(signal)
var completed = false
var interrupted = false
var evenCompleted = false
var oddCompleted = false

producer
.group { $0 % 2 == 0 }
.start { event in
switch event {
case let .value(key, group):
if key {
group.startWithCompleted { evenCompleted = true }
} else {
group.startWithCompleted { oddCompleted = true }
}
case .completed:
completed = true
case .interrupted:
interrupted = true
case .failed:
break
}
}

observer.send(value: 1)
observer.send(value: 2)
observer.sendCompleted()

expect(completed) == true
expect(interrupted) == false
expect(evenCompleted) == true
expect(oddCompleted) == true
}

it("should terminate correctly receiving a failed event") {
let (signal, observer) = Signal<Int, TestError>.pipe()
let producer = SignalProducer<Int, TestError>(signal)
var interrupted = false
var completed = false
var error: TestError? = nil
var evensError: TestError? = nil
var oddsError: TestError? = nil

producer
.group { $0 % 2 == 0 }
.start { event in
switch event {
case let .value(key, group):
if key {
group.startWithFailed { evensError = $0 }
} else {
group.startWithFailed { oddsError = $0 }
}
case .completed:
completed = true
case .interrupted:
interrupted = true
case let .failed(e):
error = e
}
}

observer.send(value: 1)
observer.send(value: 2)

observer.send(error: .error1)

expect(interrupted) == false
expect(completed) == false
expect(error) == .error1
expect(evensError) == .error1
expect(oddsError) == .error1
}
}
}