Skip to content
28 changes: 17 additions & 11 deletions dist/Bacon.js
Original file line number Diff line number Diff line change
Expand Up @@ -2919,10 +2919,23 @@ function diff(src, start, f) {
return transformP(scan(src, [start, nullMarker], (function (prevTuple, next) { return [next, f(prevTuple[0], next)]; })), composeT(filterT(function (tuple) { return tuple[1] !== nullMarker; }), mapT(function (tuple) { return tuple[1]; })), new Desc(src, "diff", [start, f]));
}

/** @hidden */
function flatScanSeedless(src, f) {
var current;
var isSeeded = false;
return src.flatMapConcat(function (next) {
return (isSeeded ? makeObservable(f(current, next)) : makeObservable(next))
.doAction(function (updated) {
isSeeded = true;
current = updated;
});
}).toProperty();
}
/** @hidden */
function flatScan(src, seed, f) {
var current = seed;
return src.flatMapConcat(function (next) {
// @ts-ignore: TS2722 Cannot invoke an object which is possibly 'undefined'. Cause it's optional!
return makeObservable(f(current, next)).doAction(function (updated) { return current = updated; });
}).toProperty().startWith(seed).withDesc(new Desc(src, "flatScan", [seed, f]));
}
Expand Down Expand Up @@ -4185,17 +4198,10 @@ var EventStream = /** @class */ (function (_super) {
*/
EventStream.prototype.flatMapWithConcurrencyLimit = function (limit, f) { return flatMapWithConcurrencyLimit(this, limit, f); };
EventStream.prototype.flatMapEvent = function (f) { return flatMapEvent(this, f); };
/**
Scans stream with given seed value and accumulator function, resulting to a Property.
Difference to [`scan`](#scan) is that the function `f` can return an [`EventStream`](eventstream.html) or a [`Property`](property.html) instead
of a pure value, meaning that you can use [`flatScan`](#flatscan) for asynchronous updates of state. It serializes
updates so that that the next update will be queued until the previous one has completed.

* @param seed initial value to start with
* @param f transition function from previous state and new value to next state
* @typeparam V2 state and result type
*/
EventStream.prototype.flatScan = function (seed, f) {
if (arguments.length == 1) {
return flatScanSeedless(this, seed);
}
return flatScan(this, seed, f);
};
EventStream.prototype.groupBy = function (keyF, limitF) {
Expand Down Expand Up @@ -5223,7 +5229,7 @@ var $ = {
/**
* Bacon.js version as string
*/
var version = '3.0.5';
var version = '<version>';

exports.$ = $;
exports.Bus = Bus;
Expand Down
2 changes: 1 addition & 1 deletion dist/Bacon.min.js
Original file line number Diff line number Diff line change
@@ -1 +1 @@
undefined
undefined
17 changes: 15 additions & 2 deletions dist/Bacon.noAssert.js
Original file line number Diff line number Diff line change
Expand Up @@ -2558,6 +2558,16 @@
f
]));
}
function flatScanSeedless(src, f) {
var current;
var isSeeded = false;
return src.flatMapConcat(function (next) {
return (isSeeded ? makeObservable(f(current, next)) : makeObservable(next)).doAction(function (updated) {
isSeeded = true;
current = updated;
});
}).toProperty();
}
function flatScan(src, seed, f) {
var current = seed;
return src.flatMapConcat(function (next) {
Expand Down Expand Up @@ -3154,6 +3164,9 @@
return flatMapEvent(this, f);
};
EventStream.prototype.flatScan = function (seed, f) {
if (arguments.length == 1) {
return flatScanSeedless(this, seed);
}
return flatScan(this, seed, f);
};
EventStream.prototype.groupBy = function (keyF, limitF) {
Expand Down Expand Up @@ -3818,7 +3831,7 @@
jQuery.fn.asEventStream = $.asEventStream;
}
};
var version = '3.0.5';
var version = '<version>';
exports.$ = $;
exports.Bus = Bus;
exports.CompositeUnsubscribe = CompositeUnsubscribe;
Expand Down Expand Up @@ -3880,4 +3893,4 @@
exports.zipAsArray = zipAsArray;
exports.zipWith = zipWith;
Object.defineProperty(exports, '__esModule', { value: true });
}));
}));
19 changes: 17 additions & 2 deletions src/flatscan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,25 @@ import { Observable, Property } from "./observable";
import { Desc } from "./describe";
import { Function2 } from "./types";

/** @hidden */
export function flatScanSeedless<V>(src: Observable<V>, f: Function2<V, V, Observable<V> | V>): Property<V> {
let current: V;
let isSeeded = false;

return src.flatMapConcat(function (next: V) {
return (isSeeded ? makeObservable(f(current, next)) : makeObservable(next))
.doAction(function (updated: V) {
isSeeded = true;
current = updated;
});
}).toProperty().withDesc(new Desc(src, "flatScan", [f]));
}

/** @hidden */
export function flatScan<In, Out>(src: Observable<In>, seed: Out, f: Function2<Out, In, Observable<Out> | Out>): Property<Out> {
let current = seed
let current = seed;
return src.flatMapConcat((next: In) =>
// @ts-ignore: TS2722 Cannot invoke an object which is possibly 'undefined'. Cause it's optional!
makeObservable(f(current, next)).doAction(updated => current = updated)
).toProperty().startWith(seed).withDesc(new Desc(src, "flatScan", [seed, f]))
}
}
9 changes: 8 additions & 1 deletion src/fold.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@ import "./scan";
import Observable from "./observable";
import { Desc } from "./describe";
import { Accumulator } from "./scan";
import { Property } from "./observable";;
import { Property } from "./observable";

/** @hidden */
export default function fold<In, Out>(src: Observable<In>, seed: Out, f: Accumulator<In, Out>): Property<Out> {
return <any>src.scan(seed, f)
.last()
.withDesc(new Desc(src, "fold", [seed, f]));
}

/** @hidden */
export function foldSeedless<InOut>(src: Observable<InOut>, f: Accumulator<InOut, InOut>): Property<InOut> {
return <any>src.scan(f)
.last()
.withDesc(new Desc(src, "fold", [f]));
}
52 changes: 41 additions & 11 deletions src/observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import doLogT from "./dolog";
import doErrorT from "./doerror";
import doActionT from "./doaction";
import doEndT from "./doend";
import { Accumulator, default as scan } from "./scan";
import { Accumulator, scanSeedless, default as scan } from "./scan";
import mapEndT from "./mapend";
import mapErrorT from "./maperror";
import { SpawnerOrObservable, EventSpawner, EventOrValue } from "./flatmap_";
Expand All @@ -40,7 +40,7 @@ import { filter } from "./filter";
import { and, not, or } from "./boolean";
import flatMapFirst from "./flatmapfirst";
import addPropertyInitValueToStream from "./internal/addpropertyinitialvaluetostream";
import fold from "./fold";
import { default as fold, foldSeedless } from "./fold";
import { startWithE, startWithP } from "./startwith";
import takeUntil from "./takeuntil";
import flatMap from "./flatmap";
Expand Down Expand Up @@ -69,7 +69,7 @@ import skipWhile from "./skipwhile";
import { groupBy, GroupTransformer } from "./groupby";
import { slidingWindow } from "./slidingwindow";
import { diff, Differ } from "./diff";
import { flatScan } from "./flatscan";
import { flatScan, flatScanSeedless } from "./flatscan";
import { holdWhen } from "./holdwhen";
import { zip } from "./zip";
import decode from "./decode";
Expand Down Expand Up @@ -407,8 +407,16 @@ Works like [`scan`](#scan) but only emits the final
value, i.e. the value just before the observable ends. Returns a
[`Property`](property.html).
*/
fold<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2> {
return fold(this, seed, f)

fold<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2>

fold(f: Accumulator<V, V>): Property<V>

fold<V2>(seed: V2 | Accumulator<V, V>, f?: Accumulator<V, V2>): Property<V2> {
if (arguments.length === 1) {
return <any>foldSeedless(this, seed as any as Accumulator<V, V>);
}
return fold(this, seed as any as V2, f as any as Accumulator<V, V2>)
}

/**
Expand Down Expand Up @@ -596,8 +604,15 @@ Only applicable for observables with arrays as values.
}
/** A synonym for [scan](#scan).
*/
reduce<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2> {
return fold(this, seed, f)
reduce<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2>

reduce(f: Accumulator<V, V>): Property<V>

reduce<V2>(seed: V2 | Accumulator<V, V>, f?: Accumulator<V, V2>): Property<V2> {
if (arguments.length === 1) {
return <any>foldSeedless(this, seed as any as Accumulator<V, V>);
}
return fold(this, seed as any as V2, f as any as Accumulator<V, V2>)
}

/**
Expand Down Expand Up @@ -639,8 +654,16 @@ identically to EventStream.scan: the `seed` will be the initial value of
seed won't be output as is. Instead, the initial value of `r` will be `f(seed, x)`. This makes sense,
because there can only be 1 initial value for a Property at a time.
*/
scan<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2> {
return scan(this, seed, f)

scan<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2>

scan(f: Accumulator<V, V>): Property<V>

scan<V2>(seed: V2 | Accumulator<V, V>, f?: Accumulator<V, V2>): Property<V2> {
if (arguments.length === 1) {
return <any>scanSeedless(this, seed as any as Accumulator<V, V>);
}
return <any>scan(this, seed as any as V2, f as any as Accumulator<V, V2>)
}
/**
Skips the first n elements from the stream
Expand Down Expand Up @@ -1408,8 +1431,15 @@ export class EventStream<V> extends Observable<V> {
* @param f transition function from previous state and new value to next state
* @typeparam V2 state and result type
*/
flatScan<V2>(seed: V2, f: Function2<V2, V, Observable<V2>>): Property<V2> {
return <any>flatScan(this, seed, f)
flatScan<V2>(seed: V2, f: Function2<V2, V, Observable<V2>>): Property<V2>

flatScan(f: Function2<V, V, Observable<V>>): Property<V>

flatScan<V2>(seed: V2 | Function2<V2, V, Observable<V2>>, f?: Function2<V2, V, Observable<V2>>): Property<V2> {
if (arguments.length == 1) {
return <any>flatScanSeedless(this, seed as any as Function2<V, V, Observable<V>>)
}
return <any>flatScan(this, seed as any as V2, f as any as Function2<V2, V, Observable<V2>>)
}

/**
Expand Down
28 changes: 27 additions & 1 deletion src/scan.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Observable from "./observable";
import { Property } from "./observable";;
import { Property } from "./observable";
import { Event, hasValue, Initial } from "./event";
import { more, noMore } from "./reply";
import { nop } from "./helpers";
Expand Down Expand Up @@ -58,3 +58,29 @@ export default function scan<In, Out>(src: Observable<In>, seed: Out, f: Accumul
}
return resultProperty = new Property(new Desc(src, "scan", [seed, f]), subscribe)
}

/** @hidden */
export function scanSeedless<V>(src: Observable<V>, f: Accumulator<V, V>): Property<V> {
let acc: V;
let hasAccumulatedFirstValue: Boolean = false;
const subscribe: Subscribe<V> = (sink: EventSink<V>) => {
let unsub = src.subscribeInternal(function(event: Event<V>) {
if (hasValue(event)) {
//console.log("has value: ", hasValue(event), "isInitial:", event.isInitial);
if (!hasAccumulatedFirstValue) {
acc = event.value;
hasAccumulatedFirstValue = true;
return sink(<any>event); // let the initial event pass through
}

acc = f(acc, event.value);
return sink(event.apply(acc));

} else {
return sink(<any>event);
}
});
return unsub;
}
return new Property(new Desc(src, "scan", [f]), subscribe)
}
14 changes: 14 additions & 0 deletions test/flatscan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ describe("EventStream.flatScan", function() {
[0, 1, 3, error(), 6], semiunstable)
);

describe("Without a seed value", () => {
it ("accumulates values with given seed and accumulator function which returns a stream of updated values", () =>
expectPropertyEvents(
() => series(1, [1, 2, error(), 3]).flatScan(addAsync(1)),
[1, 3, error(), 6]
)
);
it("Serializes updates even when they occur while performing previous update", () =>
expectPropertyEvents(
() => series(1, [0, 1, 2, error(), 3]).flatScan(addAsync(5)),
[0, error(), 1, 3, 6], semiunstable)
);
});

return it("yields the seed value immediately", function() {
const outputs: number[] = [];
new Bacon.Bus().flatScan(0, (a, b) => <any>1).onValue(value => { outputs.push(value) });
Expand Down
38 changes: 34 additions & 4 deletions test/fold.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,39 @@ describe("EventStream.fold", function() {
);
describe("has reduce as synonym", () =>
expectPropertyEvents(
() => series(1, [1, 2, error(), 3]).fold(0, add),
() => series(1, [1, 2, error(), 3]).reduce(0, add),
[error(), 6])
);
describe("works with synchronous source", () =>
expectPropertyEvents(
() => fromArray([1, 2, error(), 3]).fold(0, add),
[error(), 6], unstable)
);

describe("Without seed value", function(){
it("folds stream into a single-valued Property, passes through errors", () =>
expectPropertyEvents(
() => series(1, [0, 1, 2, error(), 3]).fold(add),
[error(), 6])
);
it("has reduce as synonym", () =>
expectPropertyEvents(
() => series(1, [1, 2, error(), 3]).reduce(add),
[error(), 6])
);
it("works with synchronous source", () =>
expectPropertyEvents(
() => fromArray([0, 1, 2, error(), 3]).fold(add),
[error(), 6], unstable)
);
it("works with really large chunks too, with { eager: true }", function() {
const count = 50000;
return expectPropertyEvents(
() => series(1, range(1, count, true)).fold((x: number,y: number) => x+1),
[count]);
});
});

return describe("works with really large chunks too, with { eager: true }", function() {
const count = 50000;
return expectPropertyEvents(
Expand All @@ -24,10 +49,15 @@ describe("EventStream.fold", function() {
});
});

describe("Property.fold", () =>
describe("Property.fold", () => {
describe("Folds Property into a single-valued one", () =>
expectPropertyEvents(
() => series(1, [2,3]).toProperty(1).fold(0, add),
() => series(1, [2, 3]).toProperty(1).fold(0, add),
[6])
);
describe("Without seed value folds Property into a single-valued one", () =>
expectPropertyEvents(
() => series(1, [2, 3]).toProperty(1).fold(add),
[6])
)
);
});
Loading