diff --git a/Sources/SignalProducer.swift b/Sources/SignalProducer.swift index 0a09a4183..249fa39c3 100644 --- a/Sources/SignalProducer.swift +++ b/Sources/SignalProducer.swift @@ -1929,6 +1929,59 @@ extension SignalProducerProtocol { } } } + + /// Creates a new `SignalProducer` that will bucket each received value into + /// a group based on the key returned from `grouping`. + /// + /// Termination events on the original signal are also forwarded to each + /// producer group. + /// + /// - parameters: + /// - grouping: a closure that determines the grouping key for a given value + /// + /// - returns: A producer of producers that emits one producer for each group and forwards + /// each value from the original producer to the inner producer corresponding + /// to the group to which the value belongs to (as determined by the key) + public func group(by grouping: @escaping (Value) -> Key) -> SignalProducer<(Key, SignalProducer), Error> { + return SignalProducer { observer, disposable in + let groups = Atomic<[Key: Signal.Observer]>([:]) + + disposable += self.start { event in + switch event { + case let .value(value): + let key = grouping(value) + let group: Signal.Observer = groups.modify { groups in + if let group = groups[key] { + return group + } else { + let (signal, innerObserver) = Signal.pipe() + let producer = SignalProducer(signal).replayLazily(upTo: Int.max) + + // Start the buffering immediately. + producer.start().dispose() + observer.send(value: (key, producer)) + + groups[key] = innerObserver + return innerObserver + } + } + group.send(value: value) + + case let .failed(error): + observer.send(error: error) + groups.value.values.forEach { $0.send(error: error) } + + case .completed: + observer.sendCompleted() + groups.value.values.forEach { $0.sendCompleted() } + + case .interrupted: + observer.sendInterrupted() + groups.value.values.forEach { $0.sendInterrupted() } + } + } + } + } } extension SignalProducerProtocol where Value == Bool { diff --git a/Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift b/Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift index 8a721afaf..bee10b0f7 100644 --- a/Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift +++ b/Tests/ReactiveSwiftTests/SignalProducerLiftingSpec.swift @@ -1926,5 +1926,193 @@ class SignalProducerLiftingSpec: QuickSpec { expect(latestValues?.1) == 2 } } + + describe("groupBy") { + it("should group events by their key") { + let (signal, observer) = Signal.pipe() + let producer = SignalProducer(signal) + var evens: [Int] = [] + var odds: [Int] = [] + let disposable = producer + .group { $0 % 2 == 0 } + .startWithValues { key, group in + if key { + group.startWithValues { evens.append($0)} + } else { + group.startWithValues { odds.append($0)} + } + } + + observer.send(value: 1) + expect(evens) == [] + expect(odds) == [1] + + observer.send(value: 2) + expect(evens) == [2] + expect(odds) == [1] + + observer.send(value: 3) + expect(evens) == [2] + expect(odds) == [1, 3] + + disposable.dispose() + + observer.send(value: 4) + expect(evens) == [2] + expect(odds) == [1, 3] + } + + it("should terminate correctly on disposal") { + let (signal, observer) = Signal.pipe() + let producer = SignalProducer(signal) + var completed = false + var interrupted = false + var evensInterrupted = false + var oddsInterrupted = false + + let disposable = producer + .group { $0 % 2 == 0 } + .start { event in + switch event { + case let .value(key, group): + if key { + group.startWithInterrupted { evensInterrupted = true } + } else { + group.startWithInterrupted { oddsInterrupted = true } + } + case .completed: + completed = true + case .interrupted: + interrupted = true + case .failed: + break + } + } + + observer.send(value: 1) + observer.send(value: 2) + + disposable.dispose() + + expect(completed) == false + expect(interrupted) == true + expect(evensInterrupted) == true + expect(oddsInterrupted) == true + } + } + + it("should terminate correctly when receiving an interrupted event") { + let (signal, observer) = Signal.pipe() + let producer = SignalProducer(signal) + var completed = false + var interrupted = false + var evensInterrupted = false + var oddsInterrupted = false + + producer + .group { $0 % 2 == 0 } + .start { event in + switch event { + case let .value(key, group): + if key { + group.startWithInterrupted { evensInterrupted = true } + } else { + group.startWithInterrupted { oddsInterrupted = true } + } + case .completed: + completed = true + case .interrupted: + interrupted = true + case .failed: + break + } + } + + observer.send(value: 1) + observer.send(value: 2) + observer.sendInterrupted() + + expect(completed) == false + expect(interrupted) == true + expect(evensInterrupted) == true + expect(oddsInterrupted) == true + } + + it("should terminate correctly receiving a completed event") { + let (signal, observer) = Signal.pipe() + let producer = SignalProducer(signal) + var completed = false + var interrupted = false + var evenCompleted = false + var oddCompleted = false + + producer + .group { $0 % 2 == 0 } + .start { event in + switch event { + case let .value(key, group): + if key { + group.startWithCompleted { evenCompleted = true } + } else { + group.startWithCompleted { oddCompleted = true } + } + case .completed: + completed = true + case .interrupted: + interrupted = true + case .failed: + break + } + } + + observer.send(value: 1) + observer.send(value: 2) + observer.sendCompleted() + + expect(completed) == true + expect(interrupted) == false + expect(evenCompleted) == true + expect(oddCompleted) == true + } + + it("should terminate correctly receiving a failed event") { + let (signal, observer) = Signal.pipe() + let producer = SignalProducer(signal) + var interrupted = false + var completed = false + var error: TestError? = nil + var evensError: TestError? = nil + var oddsError: TestError? = nil + + producer + .group { $0 % 2 == 0 } + .start { event in + switch event { + case let .value(key, group): + if key { + group.startWithFailed { evensError = $0 } + } else { + group.startWithFailed { oddsError = $0 } + } + case .completed: + completed = true + case .interrupted: + interrupted = true + case let .failed(e): + error = e + } + } + + observer.send(value: 1) + observer.send(value: 2) + + observer.send(error: .error1) + + expect(interrupted) == false + expect(completed) == false + expect(error) == .error1 + expect(evensError) == .error1 + expect(oddsError) == .error1 + } } }