diff --git a/packages/d2mini/src/graph.ts b/packages/d2mini/src/graph.ts index 60410f8..b07c2e8 100644 --- a/packages/d2mini/src/graph.ts +++ b/packages/d2mini/src/graph.ts @@ -1,4 +1,4 @@ -import { MultiSet, MultiSetArray } from './multiset.js' +import { MultiSet, MultiSetArray, IMultiSet } from './multiset.js' import { IOperator, IDifferenceStreamReader, @@ -9,13 +9,13 @@ import { * A read handle to a dataflow edge that receives data from a writer. */ export class DifferenceStreamReader implements IDifferenceStreamReader { - #queue: MultiSet[] + #queue: IMultiSet[] - constructor(queue: MultiSet[]) { + constructor(queue: IMultiSet[]) { this.#queue = queue } - drain(): MultiSet[] { + drain(): IMultiSet[] { const out = [...this.#queue].reverse() this.#queue.length = 0 return out @@ -30,20 +30,20 @@ export class DifferenceStreamReader implements IDifferenceStreamReader { * A write handle to a dataflow edge that is allowed to publish data. */ export class DifferenceStreamWriter implements IDifferenceStreamWriter { - #queues: MultiSet[][] = [] + #queues: IMultiSet[][] = [] - sendData(collection: MultiSet | MultiSetArray): void { - if (!(collection instanceof MultiSet)) { + sendData(collection: IMultiSet | MultiSetArray): void { + if (!(collection instanceof MultiSet) && !('getInner' in collection)) { collection = new MultiSet(collection) } for (const q of this.#queues) { - q.unshift(collection) + q.unshift(collection as IMultiSet) } } newReader(): DifferenceStreamReader { - const q: MultiSet[] = [] + const q: IMultiSet[] = [] this.#queues.push(q) return new DifferenceStreamReader(q) } @@ -88,8 +88,8 @@ export abstract class UnaryOperator extends Operator< super(id, [inputA], output) } - inputMessages(): MultiSet[] { - return this.inputs[0].drain() as MultiSet[] + inputMessages(): IMultiSet[] { + return this.inputs[0].drain() as IMultiSet[] } } @@ -107,11 +107,11 @@ export abstract class BinaryOperator extends Operator { super(id, [inputA, inputB], output) } - inputAMessages(): MultiSet[] { + inputAMessages(): IMultiSet[] { return this.inputs[0].drain() } - inputBMessages(): MultiSet[] { + inputBMessages(): IMultiSet[] { return this.inputs[1].drain() } } @@ -120,7 +120,7 @@ export abstract class BinaryOperator extends Operator { * Base class for operators that process a single input stream */ export abstract class LinearUnaryOperator extends UnaryOperator { - abstract inner(collection: MultiSet): MultiSet + abstract inner(collection: IMultiSet): IMultiSet run(): void { for (const message of this.inputMessages()) { diff --git a/packages/d2mini/src/indexes.ts b/packages/d2mini/src/indexes.ts index 1503881..1cb2a39 100644 --- a/packages/d2mini/src/indexes.ts +++ b/packages/d2mini/src/indexes.ts @@ -124,4 +124,32 @@ export class Index { return new MultiSet(result) } + + *lazyJoin(other: Index): Generator<[[K, [V, V2]], number], void, unknown> { + if (this.size <= other.size) { + for (const [key, valueMap] of this.entries()) { + if (!other.has(key)) continue + const otherValues = other.get(key) + for (const [val1, mul1] of valueMap.values()) { + for (const [val2, mul2] of otherValues) { + if (mul1 !== 0 && mul2 !== 0) { + yield [[key, [val1, val2]], mul1 * mul2] + } + } + } + } + } else { + for (const [key, otherValueMap] of other.entries()) { + if (!this.has(key)) continue + const values = this.get(key) + for (const [val2, mul2] of otherValueMap.values()) { + for (const [val1, mul1] of values) { + if (mul1 !== 0 && mul2 !== 0) { + yield [[key, [val1, val2]], mul1 * mul2] + } + } + } + } + } + } } diff --git a/packages/d2mini/src/multiset.ts b/packages/d2mini/src/multiset.ts index f708bc5..fe03ff0 100644 --- a/packages/d2mini/src/multiset.ts +++ b/packages/d2mini/src/multiset.ts @@ -3,10 +3,67 @@ import { DefaultMap, chunkedArrayPush, hash } from './utils.js' export type MultiSetArray = [T, number][] export type KeyedData = [key: string, value: T] +/** + * Common interface for MultiSet implementations + */ +export interface IMultiSet { + /** + * Apply a function to all records in the collection. + */ + map(f: (data: T) => U): IMultiSet + + /** + * Filter out records for which a function f(record) evaluates to False. + */ + filter(f: (data: T) => boolean): IMultiSet + + /** + * Negate all multiplicities in the collection. + */ + negate(): IMultiSet + + /** + * Concatenate two collections together. + */ + concat(other: IMultiSet): IMultiSet + + /** + * Produce as output a collection that is logically equivalent to the input + * but which combines identical instances of the same record into one + * (record, multiplicity) pair. + */ + consolidate(): IMultiSet + + /** + * Extend this collection with data from another collection + */ + extend(other: IMultiSet | MultiSetArray): void + + /** + * Get an iterator over the elements + */ + [Symbol.iterator](): Iterator<[T, number]> + + /** + * Get all entries as an array + */ + getInner(): MultiSetArray + + /** + * String representation + */ + toString(indent?: boolean): string + + /** + * JSON representation + */ + toJSON(): string +} + /** * A multiset of data. */ -export class MultiSet { +export class MultiSet implements IMultiSet { #inner: MultiSetArray constructor(data: MultiSetArray = []) { @@ -25,10 +82,19 @@ export class MultiSet { return new MultiSet(JSON.parse(json)) } + /** + * Get an iterator over the elements + */ + *[Symbol.iterator](): Iterator<[T, number]> { + for (const entry of this.#inner) { + yield entry + } + } + /** * Apply a function to all records in the collection. */ - map(f: (data: T) => U): MultiSet { + map(f: (data: T) => U): IMultiSet { return new MultiSet( this.#inner.map(([data, multiplicity]) => [f(data), multiplicity]), ) @@ -37,14 +103,14 @@ export class MultiSet { /** * Filter out records for which a function f(record) evaluates to False. */ - filter(f: (data: T) => boolean): MultiSet { + filter(f: (data: T) => boolean): IMultiSet { return new MultiSet(this.#inner.filter(([data, _]) => f(data))) } /** * Negate all multiplicities in the collection. */ - negate(): MultiSet { + negate(): IMultiSet { return new MultiSet( this.#inner.map(([data, multiplicity]) => [data, -multiplicity]), ) @@ -53,7 +119,7 @@ export class MultiSet { /** * Concatenate two collections together. */ - concat(other: MultiSet): MultiSet { + concat(other: IMultiSet): IMultiSet { const out: MultiSetArray = [] chunkedArrayPush(out, this.#inner) chunkedArrayPush(out, other.getInner()) @@ -65,7 +131,7 @@ export class MultiSet { * but which combines identical instances of the same record into one * (record, multiplicity) pair. */ - consolidate(): MultiSet { + consolidate(): IMultiSet { const consolidated = new DefaultMap(() => 0) const values = new Map() @@ -104,8 +170,8 @@ export class MultiSet { return new MultiSet(result) } - extend(other: MultiSet | MultiSetArray): void { - const otherArray = other instanceof MultiSet ? other.getInner() : other + extend(other: IMultiSet | MultiSetArray): void { + const otherArray = other instanceof MultiSet || 'getInner' in other ? other.getInner() : other chunkedArrayPush(this.#inner, otherArray) } @@ -113,3 +179,159 @@ export class MultiSet { return this.#inner } } + +/** + * A lazy multiset that uses generators to compute results on-demand + */ +export class LazyMultiSet implements IMultiSet { + #generator: () => Generator<[T, number], void, unknown> + + constructor(generator: () => Generator<[T, number], void, unknown>) { + this.#generator = generator + } + + toString(indent = false): string { + const data = Array.from(this) + return `LazyMultiSet(${JSON.stringify(data, null, indent ? 2 : undefined)})` + } + + toJSON(): string { + return JSON.stringify(Array.from(this)) + } + + /** + * Get an iterator over the elements + */ + *[Symbol.iterator](): Iterator<[T, number]> { + yield* this.#generator() + } + + /** + * Apply a function to all records in the collection. + */ + map(f: (data: T) => U): IMultiSet { + const sourceGenerator = this.#generator + return new LazyMultiSet(function* () { + for (const [data, multiplicity] of sourceGenerator()) { + yield [f(data), multiplicity] + } + }) + } + + /** + * Filter out records for which a function f(record) evaluates to False. + */ + filter(f: (data: T) => boolean): IMultiSet { + const sourceGenerator = this.#generator + return new LazyMultiSet(function* () { + for (const [data, multiplicity] of sourceGenerator()) { + if (f(data)) { + yield [data, multiplicity] + } + } + }) + } + + /** + * Negate all multiplicities in the collection. + */ + negate(): IMultiSet { + const sourceGenerator = this.#generator + return new LazyMultiSet(function* () { + for (const [data, multiplicity] of sourceGenerator()) { + yield [data, -multiplicity] + } + }) + } + + /** + * Concatenate two collections together. + */ + concat(_other: IMultiSet): IMultiSet { + const sourceGenerator = this.#generator + return new LazyMultiSet(function* () { + yield* sourceGenerator() + yield* _other + }) + } + + /** + * Produce as output a collection that is logically equivalent to the input + * but which combines identical instances of the same record into one + * (record, multiplicity) pair. + */ + consolidate(): IMultiSet { + // For consolidation, we need to materialize the data + // since we need to group by key + const consolidated = new DefaultMap(() => 0) + const values = new Map() + + let hasString = false + let hasNumber = false + let hasOther = false + + // First pass to determine data types + const allData: [T, number][] = [] + for (const [data, multiplicity] of this) { + allData.push([data, multiplicity]) + if (typeof data === 'string') { + hasString = true + } else if (typeof data === 'number') { + hasNumber = true + } else { + hasOther = true + } + } + + const requireJson = hasOther || (hasString && hasNumber) + + for (const [data, multiplicity] of allData) { + const key = requireJson ? hash(data) : (data as string | number) + if (requireJson && !values.has(key as string)) { + values.set(key as string, data) + } + consolidated.update(key, (count) => count + multiplicity) + } + + return new LazyMultiSet(function* () { + for (const [key, multiplicity] of consolidated.entries()) { + if (multiplicity !== 0) { + const parsedKey = requireJson ? values.get(key as string) : key + yield [parsedKey as T, multiplicity] + } + } + }) + } + + extend(_other: IMultiSet | MultiSetArray): void { + // For lazy multisets, extend creates a new generator that yields both + // Since we can't modify the generator in place, we'll throw an error for now + // This method is mainly used internally and we may need to reconsider its API + throw new Error('extend() is not supported on LazyMultiSet. Use concat() instead.') + } + + /** + * Get all entries as an array (materializes the lazy evaluation) + */ + getInner(): MultiSetArray { + return Array.from(this) + } + + /** + * Create a LazyMultiSet from a regular array + */ + static fromArray(data: MultiSetArray): LazyMultiSet { + return new LazyMultiSet(function* () { + yield* data + }) + } + + /** + * Create a LazyMultiSet from another IMultiSet + */ + static from(source: IMultiSet): LazyMultiSet { + return new LazyMultiSet(function* () { + yield* source + }) + } +} diff --git a/packages/d2mini/src/operators/consolidate.ts b/packages/d2mini/src/operators/consolidate.ts index 8333f85..111f1f5 100644 --- a/packages/d2mini/src/operators/consolidate.ts +++ b/packages/d2mini/src/operators/consolidate.ts @@ -1,7 +1,7 @@ import { IStreamBuilder, PipedOperator } from '../types.js' import { DifferenceStreamWriter, UnaryOperator } from '../graph.js' import { StreamBuilder } from '../d2.js' -import { MultiSet } from '../multiset.js' +import { LazyMultiSet } from '../multiset.js' /** * Operator that consolidates collections @@ -9,23 +9,36 @@ import { MultiSet } from '../multiset.js' export class ConsolidateOperator extends UnaryOperator { run(): void { const messages = this.inputMessages() - if (messages.length === 0) { - return - } - // Combine all messages into a single MultiSet - const combined = new MultiSet() - for (const message of messages) { - combined.extend(message) + // Create generator that yields all items from all messages then consolidates + function* generateConsolidatedResults() { + const lazyResults = new LazyMultiSet(function* () { + for (const message of messages) { + for (const item of message) { + yield item + } + } + }).consolidate() + + yield* lazyResults } - // Consolidate the combined MultiSet - const consolidated = combined.consolidate() + // Peek to see if there are any results after consolidation + const generator = generateConsolidatedResults() + const firstResult = generator.next() + + if (!firstResult.done) { + // We have at least one result, create lazy set that includes the first result and the rest + const lazyResults = new LazyMultiSet(function* () { + // Yield the first result we already got + yield firstResult.value + // Yield the rest of the results + yield* generator + }) - // Only send if there are results - if (consolidated.getInner().length > 0) { - this.output.sendData(consolidated) + this.output.sendData(lazyResults) } + // If no results after consolidation, don't send anything } } diff --git a/packages/d2mini/src/operators/distinct.ts b/packages/d2mini/src/operators/distinct.ts index 60bd54d..90c4fa8 100644 --- a/packages/d2mini/src/operators/distinct.ts +++ b/packages/d2mini/src/operators/distinct.ts @@ -6,7 +6,7 @@ import { } from '../graph.js' import { StreamBuilder } from '../d2.js' import { hash } from '../utils.js' -import { MultiSet } from '../multiset.js' +import { LazyMultiSet } from '../multiset.js' type HashedValue = string type Multiplicity = number @@ -34,7 +34,7 @@ export class DistinctOperator extends UnaryOperator { // Compute the new multiplicity for each value for (const message of this.inputMessages()) { - for (const [value, diff] of message.getInner()) { + for (const [value, diff] of message) { const hashedValue = hash(this.#by(value)) const oldMultiplicity = @@ -47,35 +47,39 @@ export class DistinctOperator extends UnaryOperator { } } - const result: Array<[T, number]> = [] - - // Check which values became visible or disappeared - for (const [ - hashedValue, - [newMultiplicity, value], - ] of updatedValues.entries()) { + // Pre-compute state changes to determine what will be yielded + const stateChanges = new Map() + + for (const [hashedValue, [newMultiplicity, value]] of updatedValues.entries()) { const oldMultiplicity = this.#values.get(hashedValue) ?? 0 + stateChanges.set(hashedValue, { oldMultiplicity, newMultiplicity, value }) + } + // Update state immediately + for (const [hashedValue, { newMultiplicity }] of stateChanges.entries()) { if (newMultiplicity === 0) { this.#values.delete(hashedValue) } else { this.#values.set(hashedValue, newMultiplicity) } + } - if (oldMultiplicity <= 0 && newMultiplicity > 0) { - // The value wasn't present in the stream - // but with this change it is now present in the stream - result.push([value, 1]) - } else if (oldMultiplicity > 0 && newMultiplicity <= 0) { - // The value was present in the stream - // but with this change it is no longer present in the stream - result.push([value, -1]) + // Create lazy generator that yields results without intermediate array + const lazyResults = new LazyMultiSet(function* () { + for (const [, { oldMultiplicity, newMultiplicity, value }] of stateChanges.entries()) { + if (oldMultiplicity <= 0 && newMultiplicity > 0) { + // The value wasn't present in the stream + // but with this change it is now present in the stream + yield [value, 1] + } else if (oldMultiplicity > 0 && newMultiplicity <= 0) { + // The value was present in the stream + // but with this change it is no longer present in the stream + yield [value, -1] + } } - } + }) - if (result.length > 0) { - this.output.sendData(new MultiSet(result)) - } + this.output.sendData(lazyResults) } } diff --git a/packages/d2mini/src/operators/filter.ts b/packages/d2mini/src/operators/filter.ts index b91ac03..c419378 100644 --- a/packages/d2mini/src/operators/filter.ts +++ b/packages/d2mini/src/operators/filter.ts @@ -2,7 +2,7 @@ import { IStreamBuilder, PipedOperator } from '../types.js' import { DifferenceStreamReader, DifferenceStreamWriter } from '../graph.js' import { StreamBuilder } from '../d2.js' import { LinearUnaryOperator } from '../graph.js' -import { MultiSet } from '../multiset.js' +import { IMultiSet, LazyMultiSet } from '../multiset.js' /** * Operator that filters elements from the input stream @@ -20,8 +20,9 @@ export class FilterOperator extends LinearUnaryOperator { this.#f = f } - inner(collection: MultiSet): MultiSet { - return collection.filter(this.#f) + inner(collection: IMultiSet): IMultiSet { + // Use LazyMultiSet for lazy evaluation + return LazyMultiSet.from(collection).filter(this.#f) } } diff --git a/packages/d2mini/src/operators/join.ts b/packages/d2mini/src/operators/join.ts index 2b13049..c29102c 100644 --- a/packages/d2mini/src/operators/join.ts +++ b/packages/d2mini/src/operators/join.ts @@ -5,7 +5,7 @@ import { BinaryOperator, } from '../graph.js' import { StreamBuilder } from '../d2.js' -import { MultiSet } from '../multiset.js' +import { MultiSet, LazyMultiSet } from '../multiset.js' import { Index } from '../indexes.js' import { negate } from './negate.js' import { map } from './map.js' @@ -42,7 +42,7 @@ export class JoinOperator extends BinaryOperator< const messagesA = this.inputAMessages() for (const message of messagesA) { const multiSetMessage = message as unknown as MultiSet<[K, V1]> - for (const [item, multiplicity] of multiSetMessage.getInner()) { + for (const [item, multiplicity] of multiSetMessage) { const [key, value] = item deltaA.addValue(key, [value, multiplicity]) } @@ -52,31 +52,31 @@ export class JoinOperator extends BinaryOperator< const messagesB = this.inputBMessages() for (const message of messagesB) { const multiSetMessage = message as unknown as MultiSet<[K, V2]> - for (const [item, multiplicity] of multiSetMessage.getInner()) { + for (const [item, multiplicity] of multiSetMessage) { const [key, value] = item deltaB.addValue(key, [value, multiplicity]) } } - // Process results - const results = new MultiSet<[K, [V1, V2]]>() + const self = this - // Join deltaA with existing indexB - results.extend(deltaA.join(this.#indexB)) + const couldHaveResults = (deltaA.size > 0 && self.#indexB.size > 0) || + (self.#indexA.size > 0 && deltaB.size > 0) || + (deltaA.size > 0 && deltaB.size > 0) - // Append deltaA to indexA - this.#indexA.append(deltaA) + if (couldHaveResults) { + const lazyResults = new LazyMultiSet(function* () { + yield* deltaA.lazyJoin(self.#indexB) + self.#indexA.append(deltaA) + yield* self.#indexA.lazyJoin(deltaB) + self.#indexB.append(deltaB) + }) - // Join existing indexA with deltaB - results.extend(this.#indexA.join(deltaB)) - - // Send results - if (results.getInner().length > 0) { - this.output.sendData(results) + this.output.sendData(lazyResults) + } else { + this.#indexA.append(deltaA) + this.#indexB.append(deltaB) } - - // Append deltaB to indexB - this.#indexB.append(deltaB) } } diff --git a/packages/d2mini/src/operators/map.ts b/packages/d2mini/src/operators/map.ts index a4eb921..46a3151 100644 --- a/packages/d2mini/src/operators/map.ts +++ b/packages/d2mini/src/operators/map.ts @@ -2,7 +2,7 @@ import { IStreamBuilder, PipedOperator } from '../types.js' import { DifferenceStreamReader, DifferenceStreamWriter } from '../graph.js' import { StreamBuilder } from '../d2.js' import { LinearUnaryOperator } from '../graph.js' -import { MultiSet } from '../multiset.js' +import { IMultiSet, LazyMultiSet } from '../multiset.js' /** * Operator that applies a function to each element in the input stream @@ -20,8 +20,9 @@ export class MapOperator extends LinearUnaryOperator { this.#f = f } - inner(collection: MultiSet): MultiSet { - return collection.map(this.#f) + inner(collection: IMultiSet): IMultiSet { + // Use LazyMultiSet for lazy evaluation + return LazyMultiSet.from(collection).map(this.#f) } } diff --git a/packages/d2mini/src/operators/negate.ts b/packages/d2mini/src/operators/negate.ts index e59d635..f84450d 100644 --- a/packages/d2mini/src/operators/negate.ts +++ b/packages/d2mini/src/operators/negate.ts @@ -2,14 +2,15 @@ import { IStreamBuilder, PipedOperator } from '../types.js' import { DifferenceStreamWriter } from '../graph.js' import { StreamBuilder } from '../d2.js' import { LinearUnaryOperator } from '../graph.js' -import { MultiSet } from '../multiset.js' +import { IMultiSet, LazyMultiSet } from '../multiset.js' /** - * Operator that negates the multiplicities in the input stream + * Operator that negates all multiplicities in the input stream */ export class NegateOperator extends LinearUnaryOperator { - inner(collection: MultiSet): MultiSet { - return collection.negate() + inner(collection: IMultiSet): IMultiSet { + // Use LazyMultiSet for lazy evaluation + return LazyMultiSet.from(collection).negate() } } diff --git a/packages/d2mini/src/operators/output.ts b/packages/d2mini/src/operators/output.ts index eb416bf..467068d 100644 --- a/packages/d2mini/src/operators/output.ts +++ b/packages/d2mini/src/operators/output.ts @@ -5,19 +5,19 @@ import { UnaryOperator, } from '../graph.js' import { StreamBuilder } from '../d2.js' -import { MultiSet } from '../multiset.js' +import { IMultiSet } from '../multiset.js' /** * Operator that outputs the messages in the stream */ export class OutputOperator extends UnaryOperator { - #fn: (data: MultiSet) => void + #fn: (data: IMultiSet) => void constructor( id: number, inputA: DifferenceStreamReader, output: DifferenceStreamWriter, - fn: (data: MultiSet) => void, + fn: (data: IMultiSet) => void, ) { super(id, inputA, output) this.#fn = fn @@ -36,7 +36,7 @@ export class OutputOperator extends UnaryOperator { * @param fn - The function to call with each message */ export function output( - fn: (data: MultiSet) => void, + fn: (data: IMultiSet) => void, ): PipedOperator { return (stream: IStreamBuilder): IStreamBuilder => { const output = new StreamBuilder( diff --git a/packages/d2mini/src/operators/reduce.ts b/packages/d2mini/src/operators/reduce.ts index ae0bd2f..be743e1 100644 --- a/packages/d2mini/src/operators/reduce.ts +++ b/packages/d2mini/src/operators/reduce.ts @@ -5,7 +5,7 @@ import { UnaryOperator, } from '../graph.js' import { StreamBuilder } from '../d2.js' -import { MultiSet } from '../multiset.js' +import { LazyMultiSet } from '../multiset.js' import { Index } from '../indexes.js' import { hash } from '../utils.js' @@ -31,15 +31,21 @@ export class ReduceOperator extends UnaryOperator<[K, V1], [K, V2]> { // Collect all input messages and update the index const keysTodo = new Set() for (const message of this.inputMessages()) { - for (const [item, multiplicity] of message.getInner()) { + for (const [item, multiplicity] of message) { const [key, value] = item this.#index.addValue(key, [value, multiplicity]) keysTodo.add(key) } } - // For each key, compute the reduction and delta - const result: [[K, V2], number][] = [] + // Pre-compute all changes and state updates for each key + const allChanges = new Map, + oldOutputMap: Map, + commonKeys: Set + }>() + + // First pass: compute all the output maps for each key for (const key of keysTodo) { const curr = this.#index.get(key) const currOut = this.#indexOut.get(key) @@ -77,48 +83,86 @@ export class ReduceOperator extends UnaryOperator<[K, V1], [K, V2]> { const commonKeys = new Set() - // First, emit removals for old values that are no longer present + // Identify common keys between old and new outputs + for (const [valueKey] of oldOutputMap) { + if (newOutputMap.has(valueKey)) { + commonKeys.add(valueKey) + } + } + for (const [valueKey] of newOutputMap) { + if (oldOutputMap.has(valueKey)) { + commonKeys.add(valueKey) + } + } + + allChanges.set(key, { newOutputMap, oldOutputMap, commonKeys }) + } + + // Second pass: apply all state updates + for (const [key, { newOutputMap, oldOutputMap, commonKeys }] of allChanges) { + // Apply removals to state for (const [valueKey, { value, multiplicity }] of oldOutputMap) { const newEntry = newOutputMap.get(valueKey) if (!newEntry) { - // Remove the old value entirely - result.push([[key, value], -multiplicity]) this.#indexOut.addValue(key, [value, -multiplicity]) - } else { - commonKeys.add(valueKey) } } - // Then, emit additions for new values that are not present in old + // Apply additions to state for (const [valueKey, { value, multiplicity }] of newOutputMap) { const oldEntry = oldOutputMap.get(valueKey) if (!oldEntry) { - // Add the new value only if it has non-zero multiplicity if (multiplicity !== 0) { - result.push([[key, value], multiplicity]) this.#indexOut.addValue(key, [value, multiplicity]) } - } else { - commonKeys.add(valueKey) } } - // Then, emit multiplicity changes for values that were present and are still present + // Apply multiplicity changes to state for (const valueKey of commonKeys) { const newEntry = newOutputMap.get(valueKey) const oldEntry = oldOutputMap.get(valueKey) const delta = newEntry!.multiplicity - oldEntry!.multiplicity - // Only emit actual changes, i.e. non-zero deltas if (delta !== 0) { - result.push([[key, newEntry!.value], delta]) this.#indexOut.addValue(key, [newEntry!.value, delta]) } } } - if (result.length > 0) { - this.output.sendData(new MultiSet(result)) - } + // Create lazy generator that yields results without intermediate array + const lazyResults = new LazyMultiSet(function* (): Generator<[[K, V2], number], void, unknown> { + for (const [key, { newOutputMap, oldOutputMap, commonKeys }] of allChanges) { + // Yield removals for old values that are no longer present + for (const [valueKey, { value, multiplicity }] of oldOutputMap) { + const newEntry = newOutputMap.get(valueKey) + if (!newEntry) { + yield [[key, value], -multiplicity] as [[K, V2], number] + } + } + + // Yield additions for new values that are not present in old + for (const [valueKey, { value, multiplicity }] of newOutputMap) { + const oldEntry = oldOutputMap.get(valueKey) + if (!oldEntry) { + if (multiplicity !== 0) { + yield [[key, value], multiplicity] as [[K, V2], number] + } + } + } + + // Yield multiplicity changes for values that were present and are still present + for (const valueKey of commonKeys) { + const newEntry = newOutputMap.get(valueKey) + const oldEntry = oldOutputMap.get(valueKey) + const delta = newEntry!.multiplicity - oldEntry!.multiplicity + if (delta !== 0) { + yield [[key, newEntry!.value], delta] as [[K, V2], number] + } + } + } + }) + + this.output.sendData(lazyResults) } } diff --git a/packages/d2mini/src/operators/topK.ts b/packages/d2mini/src/operators/topK.ts index 941eede..c891027 100644 --- a/packages/d2mini/src/operators/topK.ts +++ b/packages/d2mini/src/operators/topK.ts @@ -1,7 +1,7 @@ import { IStreamBuilder, PipedOperator } from '../types' import { KeyValue } from '../types.js' import { reduce } from './reduce.js' -import { MultiSet } from '../multiset.js' +import { LazyMultiSet } from '../multiset.js' interface TopKOptions { limit?: number @@ -34,9 +34,8 @@ export function topK< const reduced = stream.pipe( reduce((values) => { // `values` is a list of tuples, first element is the value, second is the multiplicity - const consolidated = new MultiSet(values).consolidate() - const sortedValues = consolidated - .getInner() + const consolidated = LazyMultiSet.fromArray(values).consolidate() + const sortedValues = Array.from(consolidated) .sort((a, b) => comparator(a[0] as V1, b[0] as V1)) return sortedValues.slice(offset, offset + limit) }), @@ -74,10 +73,9 @@ export function topKWithIndex< const reduced = stream.pipe( reduce((values) => { // `values` is a list of tuples, first element is the value, second is the multiplicity - const consolidated = new MultiSet(values).consolidate() + const consolidated = LazyMultiSet.fromArray(values).consolidate() let i = offset - const sortedValues = consolidated - .getInner() + const sortedValues = Array.from(consolidated) .sort((a, b) => comparator(a[0] as V1, b[0] as V1)) .slice(offset, offset + limit) .map(([value, multiplicity]): [[V1, number], number] => [ diff --git a/packages/d2mini/src/operators/topKWithFractionalIndex.ts b/packages/d2mini/src/operators/topKWithFractionalIndex.ts index 7c49f86..bcc8efe 100644 --- a/packages/d2mini/src/operators/topKWithFractionalIndex.ts +++ b/packages/d2mini/src/operators/topKWithFractionalIndex.ts @@ -5,7 +5,7 @@ import { UnaryOperator, } from '../graph.js' import { StreamBuilder } from '../d2.js' -import { MultiSet } from '../multiset.js' +import { LazyMultiSet } from '../multiset.js' import { Index } from '../indexes.js' import { generateKeyBetween } from 'fractional-indexing' import { binarySearch, hash } from '../utils.js' @@ -209,17 +209,36 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< } run(): void { - const result: Array<[[K, [V1, string]], number]> = [] - for (const message of this.inputMessages()) { - for (const [item, multiplicity] of message.getInner()) { - const [key, value] = item - this.processElement(key, value, multiplicity, result) + const self = this + + // Create generator that processes messages on-demand + function* generateResults(): Generator<[[K, [V1, string]], number], void, unknown> { + for (const message of self.inputMessages()) { + for (const [item, multiplicity] of message) { + const [key, value] = item + + // Yield results directly from processElementLazy without intermediate array + yield* self.processElementLazy(key, value, multiplicity) + } } } - if (result.length > 0) { - this.output.sendData(new MultiSet(result)) + // Peek into generator to see if there are any results before sending + const generator = generateResults() + const firstResult = generator.next() + + if (!firstResult.done) { + // We have at least one result, create lazy set that includes the first result and the rest + const lazyResults = new LazyMultiSet(function* (): Generator<[[K, [V1, string]], number], void, unknown> { + // Yield the first result we already got + yield firstResult.value + // Yield the rest of the results + yield* generator + }) + + this.output.sendData(lazyResults) } + // If no results, don't send anything } processElement( @@ -261,6 +280,37 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< return } + + *processElementLazy( + key: K, + value: V1, + multiplicity: number, + ): Generator<[[K, [V1, string]], number], void, unknown> { + const oldMultiplicity = this.#index.getMultiplicity(key, value) + this.#index.addValue(key, [value, multiplicity]) + const newMultiplicity = this.#index.getMultiplicity(key, value) + + let res: TopKChanges> = { moveIn: null, moveOut: null } + if (oldMultiplicity <= 0 && newMultiplicity > 0) { + const taggedValue = tagValue(value) + res = this.#topK.insert(taggedValue) + } else if (oldMultiplicity > 0 && newMultiplicity <= 0) { + const taggedValue = tagValue(value) + res = this.#topK.delete(taggedValue) + } + + if (res.moveIn) { + const valueWithoutHash = mapValue(res.moveIn, untagValue) + yield [[key, valueWithoutHash], 1] + } + + if (res.moveOut) { + const valueWithoutHash = mapValue(res.moveOut, untagValue) + yield [[key, valueWithoutHash], -1] + } + } + + } /** diff --git a/packages/d2mini/src/types.ts b/packages/d2mini/src/types.ts index 16247fc..f6319d8 100644 --- a/packages/d2mini/src/types.ts +++ b/packages/d2mini/src/types.ts @@ -1,4 +1,4 @@ -import type { MultiSet, MultiSetArray } from './multiset.js' +import type { MultiSetArray, IMultiSet } from './multiset.js' import type { DifferenceStreamWriter, DifferenceStreamReader } from './graph.js' export type KeyValue = [K, V] @@ -9,12 +9,12 @@ export interface IOperator<_T> { } export interface IDifferenceStreamReader { - drain(): MultiSet[] + drain(): IMultiSet[] isEmpty(): boolean } export interface IDifferenceStreamWriter { - sendData(collection: MultiSet | MultiSetArray): void + sendData(collection: IMultiSet | MultiSetArray): void newReader(): IDifferenceStreamReader } diff --git a/packages/d2mini/tests/multiset.test.ts b/packages/d2mini/tests/multiset.test.ts index 9f55f72..4c172eb 100644 --- a/packages/d2mini/tests/multiset.test.ts +++ b/packages/d2mini/tests/multiset.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect, beforeEach } from 'vitest' -import { MultiSet } from '../src/multiset.js' +import { MultiSet, LazyMultiSet } from '../src/multiset.js' describe('MultiSet', () => { describe('basic operations', () => { @@ -108,3 +108,101 @@ describe('MultiSet', () => { ]) }) }) + +describe('LazyMultiSet', () => { + describe('basic operations', () => { + let a: LazyMultiSet<[string, string | string[]]> + let b: LazyMultiSet<[string, string | string[]]> + + beforeEach(() => { + a = LazyMultiSet.fromArray([ + [['apple', '$5'], 2], + [['banana', '$2'], 1], + ]) + b = LazyMultiSet.fromArray([ + [['apple', '$3'], 1], + [['apple', ['granny smith', '$2']], 1], + [['kiwi', '$2'], 1], + ]) + }) + + it('should concatenate two lazy multisets', () => { + const concat = a.concat(b) + expect(concat.getInner()).toEqual([ + [['apple', '$5'], 2], + [['banana', '$2'], 1], + [['apple', '$3'], 1], + [['apple', ['granny smith', '$2']], 1], + [['kiwi', '$2'], 1], + ]) + }) + + it('should filter elements lazily', () => { + const filtered = a.filter((data) => data[0] !== 'apple') + expect(filtered.getInner()).toEqual([[['banana', '$2'], 1]]) + }) + + it('should map elements lazily', () => { + const mapped = a.map((data) => [data[1], data[0]]) + expect(mapped.getInner()).toEqual([ + [['$5', 'apple'], 2], + [['$2', 'banana'], 1], + ]) + }) + + it('should be iterable', () => { + const result: [[string, string | string[]], number][] = [] + for (const entry of a) { + result.push(entry) + } + expect(result).toEqual([ + [['apple', '$5'], 2], + [['banana', '$2'], 1], + ]) + }) + + it('should negate multiplicities', () => { + const negated = a.negate() + expect(negated.getInner()).toEqual([ + [['apple', '$5'], -2], + [['banana', '$2'], -1], + ]) + }) + }) + + it('should consolidate correctly', () => { + const lazySet = LazyMultiSet.fromArray([ + ['a', 1], + ['a', 2], + ['b', 3], + ['b', 1], + ['c', 1], + ]) + + const consolidated = lazySet.consolidate() + expect(consolidated.getInner()).toEqual([ + ['a', 3], + ['b', 4], + ['c', 1], + ]) + }) + + it('should work with chained operations', () => { + const lazySet = LazyMultiSet.fromArray([ + [1, 1], + [2, 2], + [3, 3], + [4, 4], + ]) + + const result = lazySet + .filter((data) => data % 2 === 0) + .map((data) => data * 2) + .getInner() + + expect(result).toEqual([ + [4, 2], + [8, 4], + ]) + }) +}) diff --git a/packages/d2mini/tests/operators/filter.test.ts b/packages/d2mini/tests/operators/filter.test.ts index 3607957..9d8d256 100644 --- a/packages/d2mini/tests/operators/filter.test.ts +++ b/packages/d2mini/tests/operators/filter.test.ts @@ -1,14 +1,14 @@ import { describe, test, expect } from 'vitest' import { D2 } from '../../src/d2.js' -import { MultiSet } from '../../src/multiset.js' -import { filter, map, output } from '../../src/operators/index.js' +import { MultiSet, IMultiSet } from '../../src/multiset.js' +import { filter, output } from '../../src/operators/index.js' describe('Operators', () => { describe('Filter operation', () => { test('basic filter operation', () => { const graph = new D2() const input = graph.newInput() - const messages: MultiSet[] = [] + const messages: IMultiSet[] = [] input.pipe( filter((x) => x % 2 === 0), @@ -29,13 +29,13 @@ describe('Operators', () => { graph.run() - expect(messages).toEqual([new MultiSet([[2, 1]])]) + expect(messages.map(m => m.getInner())).toEqual([[[2, 1]]]) }) test('filter with complex predicate', () => { const graph = new D2() const input = graph.newInput() - const messages: MultiSet[] = [] + const messages: IMultiSet[] = [] input.pipe( filter((x) => x > 2 && x < 5), @@ -58,22 +58,22 @@ describe('Operators', () => { graph.run() - expect(messages).toEqual([ - new MultiSet([ + expect(messages.map(m => m.getInner())).toEqual([ + [ [3, 1], [4, 1], - ]), + ], ]) }) test('filter with chained operations', () => { const graph = new D2() const input = graph.newInput() - const messages: MultiSet[] = [] + const messages: IMultiSet[] = [] input.pipe( - map((x) => x * 2), - filter((x) => x % 4 === 0), + filter((x) => x % 2 === 0), + filter((x) => x > 2), output((message) => { messages.push(message) }), @@ -87,16 +87,18 @@ describe('Operators', () => { [2, 1], [3, 1], [4, 1], + [5, 1], + [6, 1], ]), ) graph.run() - expect(messages).toEqual([ - new MultiSet([ + expect(messages.map(m => m.getInner())).toEqual([ + [ [4, 1], - [8, 1], - ]), + [6, 1], + ], ]) }) }) diff --git a/packages/d2mini/tests/operators/keying.test.ts b/packages/d2mini/tests/operators/keying.test.ts index 7f45adc..2061df6 100644 --- a/packages/d2mini/tests/operators/keying.test.ts +++ b/packages/d2mini/tests/operators/keying.test.ts @@ -2,7 +2,7 @@ import { describe, it, expect } from 'vitest' import { D2 } from '../../src/d2.js' import { keyBy, unkey, rekey } from '../../src/operators/keying.js' import { output } from '../../src/operators/index.js' -import { MultiSet } from '../../src/multiset.js' +import { MultiSet, IMultiSet } from '../../src/multiset.js' interface TestItem { id: number @@ -14,7 +14,7 @@ describe('keying operators', () => { it('should key a stream by a property', () => { const d2 = new D2() const input = d2.newInput() - const messages: MultiSet[] = [] + const messages: IMultiSet[] = [] const keyed = input.pipe(keyBy((item) => item.id)) const outputStream = keyed.pipe(unkey()) @@ -25,16 +25,16 @@ describe('keying operators', () => { d2.finalize() d2.run() - expect(messages).toEqual([ - new MultiSet([[{ id: 1, name: 'a', value: 10 }, 1]]), - new MultiSet([[{ id: 2, name: 'b', value: 20 }, 1]]), + expect(messages.map(m => m.getInner())).toEqual([ + [[{ id: 1, name: 'a', value: 10 }, 1]], + [[{ id: 2, name: 'b', value: 20 }, 1]], ]) }) it('should rekey a stream with new keys', () => { const d2 = new D2() const input = d2.newInput() - const messages: MultiSet[] = [] + const messages: IMultiSet[] = [] // First key by id const keyed = input.pipe(keyBy((item) => item.id)) @@ -48,16 +48,16 @@ describe('keying operators', () => { d2.finalize() d2.run() - expect(messages).toEqual([ - new MultiSet([[{ id: 1, name: 'a', value: 10 }, 1]]), - new MultiSet([[{ id: 2, name: 'b', value: 20 }, 1]]), + expect(messages.map(m => m.getInner())).toEqual([ + [[{ id: 1, name: 'a', value: 10 }, 1]], + [[{ id: 2, name: 'b', value: 20 }, 1]], ]) }) it('should handle multiple updates to the same key', () => { const d2 = new D2() const input = d2.newInput() - const messages: MultiSet[] = [] + const messages: IMultiSet[] = [] const keyed = input.pipe(keyBy((item) => item.id)) const outputStream = keyed.pipe(unkey()) @@ -68,9 +68,9 @@ describe('keying operators', () => { d2.finalize() d2.run() - expect(messages).toEqual([ - new MultiSet([[{ id: 1, name: 'a', value: 10 }, 1]]), - new MultiSet([[{ id: 1, name: 'a', value: 20 }, 1]]), + expect(messages.map(m => m.getInner())).toEqual([ + [[{ id: 1, name: 'a', value: 10 }, 1]], + [[{ id: 1, name: 'a', value: 20 }, 1]], ]) }) }) diff --git a/packages/d2mini/tests/operators/map.test.ts b/packages/d2mini/tests/operators/map.test.ts index f934c3f..4702da6 100644 --- a/packages/d2mini/tests/operators/map.test.ts +++ b/packages/d2mini/tests/operators/map.test.ts @@ -1,6 +1,6 @@ import { describe, test, expect } from 'vitest' import { D2 } from '../../src/d2.js' -import { MultiSet } from '../../src/multiset.js' +import { MultiSet, IMultiSet } from '../../src/multiset.js' import { map, output } from '../../src/operators/index.js' describe('Operators', () => { @@ -8,7 +8,7 @@ describe('Operators', () => { test('basic map operation', () => { const graph = new D2() const input = graph.newInput() - const messages: MultiSet[] = [] + const messages: IMultiSet[] = [] input.pipe( map((x) => x + 5), @@ -29,19 +29,19 @@ describe('Operators', () => { graph.run() - expect(messages).toEqual([ - new MultiSet([ + expect(messages.map(m => m.getInner())).toEqual([ + [ [6, 1], [7, 1], [8, 1], - ]), + ], ]) }) test('map with multiple transformations', () => { const graph = new D2() const input = graph.newInput() - const messages: MultiSet[] = [] + const messages: IMultiSet[] = [] input.pipe( map((x) => x * 2), @@ -63,19 +63,19 @@ describe('Operators', () => { graph.run() - expect(messages).toEqual([ - new MultiSet([ + expect(messages.map(m => m.getInner())).toEqual([ + [ [3, 1], [5, 1], [7, 1], - ]), + ], ]) }) test('map with negative multiplicities', () => { const graph = new D2() const input = graph.newInput() - const messages: MultiSet[] = [] + const messages: IMultiSet[] = [] input.pipe( map((x) => x + 1), @@ -96,12 +96,12 @@ describe('Operators', () => { graph.run() - expect(messages).toEqual([ - new MultiSet([ + expect(messages.map(m => m.getInner())).toEqual([ + [ [2, -1], [3, -2], [4, 1], - ]), + ], ]) }) }) diff --git a/packages/d2mini/tests/operators/negate.test.ts b/packages/d2mini/tests/operators/negate.test.ts index 51ad016..2dfa8ac 100644 --- a/packages/d2mini/tests/operators/negate.test.ts +++ b/packages/d2mini/tests/operators/negate.test.ts @@ -1,6 +1,6 @@ import { describe, test, expect } from 'vitest' import { D2 } from '../../src/d2.js' -import { MultiSet } from '../../src/multiset.js' +import { MultiSet, IMultiSet } from '../../src/multiset.js' import { map, negate, output } from '../../src/operators/index.js' describe('Operators', () => { @@ -8,7 +8,7 @@ describe('Operators', () => { test('basic negate operation', () => { const graph = new D2() const input = graph.newInput() - const messages: MultiSet[] = [] + const messages: IMultiSet[] = [] input.pipe( negate(), @@ -22,26 +22,26 @@ describe('Operators', () => { input.sendData( new MultiSet([ [1, 1], - [2, 1], - [3, 1], + [2, 2], + [3, 3], ]), ) graph.run() - expect(messages).toEqual([ - new MultiSet([ + expect(messages.map(m => m.getInner())).toEqual([ + [ [1, -1], - [2, -1], - [3, -1], - ]), + [2, -2], + [3, -3], + ], ]) }) test('negate with mixed multiplicities', () => { const graph = new D2() const input = graph.newInput() - const messages: MultiSet[] = [] + const messages: IMultiSet[] = [] input.pipe( negate(), @@ -55,26 +55,26 @@ describe('Operators', () => { input.sendData( new MultiSet([ [1, -1], - [2, -2], - [3, 1], + [2, 2], + [3, -3], ]), ) graph.run() - expect(messages).toEqual([ - new MultiSet([ + expect(messages.map(m => m.getInner())).toEqual([ + [ [1, 1], - [2, 2], - [3, -1], - ]), + [2, -2], + [3, 3], + ], ]) }) test('negate with already negative multiplicities', () => { const graph = new D2() const input = graph.newInput() - const messages: MultiSet[] = [] + const messages: IMultiSet[] = [] input.pipe( negate(), @@ -88,31 +88,30 @@ describe('Operators', () => { input.sendData( new MultiSet([ [1, -2], - [2, 1], + [2, -1], [3, -3], ]), ) graph.run() - expect(messages).toEqual([ - new MultiSet([ + expect(messages.map(m => m.getInner())).toEqual([ + [ [1, 2], - [2, -1], + [2, 1], [3, 3], - ]), + ], ]) }) test('negate with chained operations', () => { const graph = new D2() const input = graph.newInput() - const messages: MultiSet[] = [] + const messages: IMultiSet[] = [] input.pipe( - map((x) => x * 2), + map((x) => x + 2), negate(), - map((x) => x + 1), output((message) => { messages.push(message) }), @@ -123,17 +122,19 @@ describe('Operators', () => { input.sendData( new MultiSet([ [1, 1], - [2, 1], + [2, 2], + [3, 3], ]), ) graph.run() - expect(messages).toEqual([ - new MultiSet([ + expect(messages.map(m => m.getInner())).toEqual([ + [ [3, -1], - [5, -1], - ]), + [4, -2], + [5, -3], + ], ]) }) }) diff --git a/packages/d2mini/tests/operators/pipe.test.ts b/packages/d2mini/tests/operators/pipe.test.ts index 06480b6..15d748d 100644 --- a/packages/d2mini/tests/operators/pipe.test.ts +++ b/packages/d2mini/tests/operators/pipe.test.ts @@ -1,6 +1,6 @@ import { describe, test, expect } from 'vitest' import { D2 } from '../../src/d2.js' -import { MultiSet } from '../../src/multiset.js' +import { MultiSet, IMultiSet } from '../../src/multiset.js' import { map, output, pipe } from '../../src/operators/index.js' describe('Operators', () => { @@ -8,7 +8,7 @@ describe('Operators', () => { test('basic pipe operation', () => { const graph = new D2() const input = graph.newInput() - const messages: MultiSet[] = [] + const messages: IMultiSet[] = [] input.pipe( pipe( @@ -32,12 +32,12 @@ describe('Operators', () => { graph.run() - expect(messages).toEqual([ - new MultiSet([ + expect(messages.map(m => m.getInner())).toEqual([ + [ [12, 1], [14, 1], [16, 1], - ]), + ], ]) }) })