-
Notifications
You must be signed in to change notification settings - Fork 433
Port groupBy
from REX
#200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9b8175b
918e119
a575411
679f800
dbaa6cf
0f0f48b
5bea41e
d53d396
2587a6c
f4ae595
56ee8f6
1d52724
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1929,6 +1929,59 @@ extension SignalProducerProtocol { | |
} | ||
} | ||
} | ||
|
||
/// Creates a new `SignalProducer` that will bucket each received value into | ||
/// a group based on the key returned from `grouping`. | ||
/// | ||
/// Termination events on the original signal are also forwarded to each | ||
/// producer group. | ||
/// | ||
/// - parameters: | ||
/// - grouping: a closure that determines the grouping key for a given value | ||
/// | ||
/// - returns: A producer of producers that emits one producer for each group and forwards | ||
/// each value from the original producer to the inner producer corresponding | ||
/// to the group to which the value belongs to (as determined by the key) | ||
public func group<Key: Hashable>(by grouping: @escaping (Value) -> Key) -> SignalProducer<(Key, SignalProducer<Value, Error>), Error> { | ||
return SignalProducer { observer, disposable in | ||
let groups = Atomic<[Key: Signal<Value, Error>.Observer]>([:]) | ||
|
||
disposable += self.start { event in | ||
switch event { | ||
case let .value(value): | ||
let key = grouping(value) | ||
let group: Signal<Value, Error>.Observer = groups.modify { groups in | ||
if let group = groups[key] { | ||
return group | ||
} else { | ||
let (signal, innerObserver) = Signal<Value, Error>.pipe() | ||
let producer = SignalProducer(signal).replayLazily(upTo: Int.max) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is replaying values an essential part of (You could just as easily send the If you feel that it is, I'd love to see a larger code sample that demonstrates why. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think so, otherwise consumers could miss events without realizing it. Hence why I erred on the side of buffering. I could be convinced otherwise, but I'm not sure how to best communicate those semantics at the operator level. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think returning But I think seeing an actual example of this operator will make it easier to see whether that's a viable change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've posted an example how we use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I'm reading this correctly, RxJS is using Hot Observables for the groups (via There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As much as I agree with @mdiep about the semantics, in my experience having buffering in this operator (specifically There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps should the buffering be an option? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
While this is the ideal case, it seems the buffering is essential, say when using with Edit: I'd suggest to annotate it with warnings for the potential of indefinite buffering, and perhaps offer a non-buffering variant. Edit: If we have to find a middle ground, perhaps the emitted inner producers should buffer until & only replay values at the first time it started (future work: a configurable #?). Subsequent starts would be invalid and emit |
||
|
||
// Start the buffering immediately. | ||
producer.start().dispose() | ||
observer.send(value: (key, producer)) | ||
|
||
groups[key] = innerObserver | ||
return innerObserver | ||
} | ||
} | ||
group.send(value: value) | ||
|
||
case let .failed(error): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Nit] Blank line before the pattern. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
observer.send(error: error) | ||
groups.value.values.forEach { $0.send(error: error) } | ||
|
||
case .completed: | ||
observer.sendCompleted() | ||
groups.value.values.forEach { $0.sendCompleted() } | ||
|
||
case .interrupted: | ||
observer.sendInterrupted() | ||
groups.value.values.forEach { $0.sendInterrupted() } | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
extension SignalProducerProtocol where Value == Bool { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1926,5 +1926,193 @@ class SignalProducerLiftingSpec: QuickSpec { | |
expect(latestValues?.1) == 2 | ||
} | ||
} | ||
|
||
describe("groupBy") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be great if there are specialized test cases than one giant one. At least this single test case apparently did not catch the undisposed upstreams, caused by this line. Say we could use a few test cases on how the inner producers behaves with regard to an outer terminal event, and keep this test case focusing on values. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah right, i've split this up into 2 separated cases and extended them to check the lifetime of the inner producers - now the cases catch the disposal issue you've found! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps one per terminal event, hmm? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure I can do that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think asserting just the specific event is enough. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alright |
||
it("should group events by their key") { | ||
let (signal, observer) = Signal<Int, NoError>.pipe() | ||
let producer = SignalProducer<Int, NoError>(signal) | ||
var evens: [Int] = [] | ||
var odds: [Int] = [] | ||
let disposable = producer | ||
.group { $0 % 2 == 0 } | ||
.startWithValues { key, group in | ||
if key { | ||
group.startWithValues { evens.append($0)} | ||
} else { | ||
group.startWithValues { odds.append($0)} | ||
} | ||
} | ||
|
||
observer.send(value: 1) | ||
expect(evens) == [] | ||
expect(odds) == [1] | ||
|
||
observer.send(value: 2) | ||
expect(evens) == [2] | ||
expect(odds) == [1] | ||
|
||
observer.send(value: 3) | ||
expect(evens) == [2] | ||
expect(odds) == [1, 3] | ||
|
||
disposable.dispose() | ||
|
||
observer.send(value: 4) | ||
expect(evens) == [2] | ||
expect(odds) == [1, 3] | ||
} | ||
|
||
it("should terminate correctly on disposal") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Nit] A blank line please. 🙏 |
||
let (signal, observer) = Signal<Int, NoError>.pipe() | ||
let producer = SignalProducer<Int, NoError>(signal) | ||
var completed = false | ||
var interrupted = false | ||
var evensInterrupted = false | ||
var oddsInterrupted = false | ||
|
||
let disposable = producer | ||
.group { $0 % 2 == 0 } | ||
.start { event in | ||
switch event { | ||
case let .value(key, group): | ||
if key { | ||
group.startWithInterrupted { evensInterrupted = true } | ||
} else { | ||
group.startWithInterrupted { oddsInterrupted = true } | ||
} | ||
case .completed: | ||
completed = true | ||
case .interrupted: | ||
interrupted = true | ||
case .failed: | ||
break | ||
} | ||
} | ||
|
||
observer.send(value: 1) | ||
observer.send(value: 2) | ||
|
||
disposable.dispose() | ||
|
||
expect(completed) == false | ||
expect(interrupted) == true | ||
expect(evensInterrupted) == true | ||
expect(oddsInterrupted) == true | ||
} | ||
} | ||
|
||
it("should terminate correctly when receiving an interrupted event") { | ||
let (signal, observer) = Signal<Int, NoError>.pipe() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please wrap test cases in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. woops |
||
let producer = SignalProducer<Int, NoError>(signal) | ||
var completed = false | ||
var interrupted = false | ||
var evensInterrupted = false | ||
var oddsInterrupted = false | ||
|
||
producer | ||
.group { $0 % 2 == 0 } | ||
.start { event in | ||
switch event { | ||
case let .value(key, group): | ||
if key { | ||
group.startWithInterrupted { evensInterrupted = true } | ||
} else { | ||
group.startWithInterrupted { oddsInterrupted = true } | ||
} | ||
case .completed: | ||
completed = true | ||
case .interrupted: | ||
interrupted = true | ||
case .failed: | ||
break | ||
} | ||
} | ||
|
||
observer.send(value: 1) | ||
observer.send(value: 2) | ||
observer.sendInterrupted() | ||
|
||
expect(completed) == false | ||
expect(interrupted) == true | ||
expect(evensInterrupted) == true | ||
expect(oddsInterrupted) == true | ||
} | ||
|
||
it("should terminate correctly receiving a completed event") { | ||
let (signal, observer) = Signal<Int, NoError>.pipe() | ||
let producer = SignalProducer<Int, NoError>(signal) | ||
var completed = false | ||
var interrupted = false | ||
var evenCompleted = false | ||
var oddCompleted = false | ||
|
||
producer | ||
.group { $0 % 2 == 0 } | ||
.start { event in | ||
switch event { | ||
case let .value(key, group): | ||
if key { | ||
group.startWithCompleted { evenCompleted = true } | ||
} else { | ||
group.startWithCompleted { oddCompleted = true } | ||
} | ||
case .completed: | ||
completed = true | ||
case .interrupted: | ||
interrupted = true | ||
case .failed: | ||
break | ||
} | ||
} | ||
|
||
observer.send(value: 1) | ||
observer.send(value: 2) | ||
observer.sendCompleted() | ||
|
||
expect(completed) == true | ||
expect(interrupted) == false | ||
expect(evenCompleted) == true | ||
expect(oddCompleted) == true | ||
} | ||
|
||
it("should terminate correctly receiving a failed event") { | ||
let (signal, observer) = Signal<Int, TestError>.pipe() | ||
let producer = SignalProducer<Int, TestError>(signal) | ||
var interrupted = false | ||
var completed = false | ||
var error: TestError? = nil | ||
var evensError: TestError? = nil | ||
var oddsError: TestError? = nil | ||
|
||
producer | ||
.group { $0 % 2 == 0 } | ||
.start { event in | ||
switch event { | ||
case let .value(key, group): | ||
if key { | ||
group.startWithFailed { evensError = $0 } | ||
} else { | ||
group.startWithFailed { oddsError = $0 } | ||
} | ||
case .completed: | ||
completed = true | ||
case .interrupted: | ||
interrupted = true | ||
case let .failed(e): | ||
error = e | ||
} | ||
} | ||
|
||
observer.send(value: 1) | ||
observer.send(value: 2) | ||
|
||
observer.send(error: .error1) | ||
|
||
expect(interrupted) == false | ||
expect(completed) == false | ||
expect(error) == .error1 | ||
expect(evensError) == .error1 | ||
expect(oddsError) == .error1 | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't seem necessary to specialise it. Is it a workaround to the compiler's weirdness or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid I don't know what exactly you mean.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean if you really need to write
SignalProducer<(Key, SignalProducer<Value, Error>), Error>
when the type parameters can be inferred.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh that line. Yeah that works without explicit annotations 👍