Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
10e46bf
Implement lazy evaluation with LazyMultiSet and iterator-based proces…
cursoragent Jul 10, 2025
b5f9586
Implement lazy evaluation for all d2mini operators with full compatib…
cursoragent Jul 10, 2025
8a458ca
Add detailed analysis of lazy evaluation for complex operators
cursoragent Jul 10, 2025
d7ffb82
Implement lazy evaluation with pseudo-lazy complex operators
cursoragent Jul 10, 2025
3ba5550
Implement lazy join generator for incremental join operations
cursoragent Jul 10, 2025
e34e062
Implement truly lazy processing for TopKWithFractionalIndex operator
cursoragent Jul 10, 2025
4e95a14
Revert true lazy evaluation, maintain pseudo-lazy for complex operators
cursoragent Jul 10, 2025
489d534
Refactor topK operator to use lazy processing and generator
cursoragent Jul 10, 2025
ee3e15d
Refactor ConsolidateOperator to use LazyMultiSet generator
cursoragent Jul 11, 2025
d35ae4a
Simplify ConsolidateOperator input messages handling
cursoragent Jul 11, 2025
5485d9a
Refactor distinct operator to use lazy generation and improve perform…
cursoragent Jul 11, 2025
e362f0a
Remove lazy evaluation demo test file
cursoragent Jul 11, 2025
251f357
Remove unnecessary constructor in NegateOperator
cursoragent Jul 11, 2025
e9c239a
Refactor reduce operator to use lazy generation and two-pass computation
cursoragent Jul 11, 2025
abba6f9
Checkpoint before follow-up message
cursoragent Jul 11, 2025
f76fc74
Optimize topK generator to avoid sending empty results
cursoragent Jul 11, 2025
047f7d6
Replace MultiSet.getInner() with direct iteration over messages
cursoragent Jul 11, 2025
34dd010
Remove unused imports from multiset-related files
cursoragent Jul 11, 2025
15bd4b7
Add type annotation for result array in multiset test
cursoragent Jul 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions packages/d2mini/src/graph.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { MultiSet, MultiSetArray } from './multiset.js'
import { MultiSet, MultiSetArray, IMultiSet } from './multiset.js'
import {
IOperator,
IDifferenceStreamReader,
Expand All @@ -9,13 +9,13 @@ import {
* A read handle to a dataflow edge that receives data from a writer.
*/
export class DifferenceStreamReader<T> implements IDifferenceStreamReader<T> {
#queue: MultiSet<T>[]
#queue: IMultiSet<T>[]

constructor(queue: MultiSet<T>[]) {
constructor(queue: IMultiSet<T>[]) {
this.#queue = queue
}

drain(): MultiSet<T>[] {
drain(): IMultiSet<T>[] {
const out = [...this.#queue].reverse()
this.#queue.length = 0
return out
Expand All @@ -30,20 +30,20 @@ export class DifferenceStreamReader<T> implements IDifferenceStreamReader<T> {
* A write handle to a dataflow edge that is allowed to publish data.
*/
export class DifferenceStreamWriter<T> implements IDifferenceStreamWriter<T> {
#queues: MultiSet<T>[][] = []
#queues: IMultiSet<T>[][] = []

sendData(collection: MultiSet<T> | MultiSetArray<T>): void {
if (!(collection instanceof MultiSet)) {
sendData(collection: IMultiSet<T> | MultiSetArray<T>): void {
if (!(collection instanceof MultiSet) && !('getInner' in collection)) {
collection = new MultiSet(collection)
}

for (const q of this.#queues) {
q.unshift(collection)
q.unshift(collection as IMultiSet<T>)
}
}

newReader(): DifferenceStreamReader<T> {
const q: MultiSet<T>[] = []
const q: IMultiSet<T>[] = []
this.#queues.push(q)
return new DifferenceStreamReader(q)
}
Expand Down Expand Up @@ -88,8 +88,8 @@ export abstract class UnaryOperator<Tin, Tout = Tin> extends Operator<
super(id, [inputA], output)
}

inputMessages(): MultiSet<Tin>[] {
return this.inputs[0].drain() as MultiSet<Tin>[]
inputMessages(): IMultiSet<Tin>[] {
return this.inputs[0].drain() as IMultiSet<Tin>[]
}
}

Expand All @@ -107,11 +107,11 @@ export abstract class BinaryOperator<T> extends Operator<T> {
super(id, [inputA, inputB], output)
}

inputAMessages(): MultiSet<T>[] {
inputAMessages(): IMultiSet<T>[] {
return this.inputs[0].drain()
}

inputBMessages(): MultiSet<T>[] {
inputBMessages(): IMultiSet<T>[] {
return this.inputs[1].drain()
}
}
Expand All @@ -120,7 +120,7 @@ export abstract class BinaryOperator<T> extends Operator<T> {
* Base class for operators that process a single input stream
*/
export abstract class LinearUnaryOperator<T, U> extends UnaryOperator<T | U> {
abstract inner(collection: MultiSet<T | U>): MultiSet<U>
abstract inner(collection: IMultiSet<T | U>): IMultiSet<U>

run(): void {
for (const message of this.inputMessages()) {
Expand Down
28 changes: 28 additions & 0 deletions packages/d2mini/src/indexes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,32 @@ export class Index<K, V> {

return new MultiSet(result)
}

*lazyJoin<V2>(other: Index<K, V2>): Generator<[[K, [V, V2]], number], void, unknown> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove the duplication between both branches like i did here: ac24ff0

if (this.size <= other.size) {
for (const [key, valueMap] of this.entries()) {
if (!other.has(key)) continue
const otherValues = other.get(key)
for (const [val1, mul1] of valueMap.values()) {
for (const [val2, mul2] of otherValues) {
if (mul1 !== 0 && mul2 !== 0) {
yield [[key, [val1, val2]], mul1 * mul2]
}
}
}
}
} else {
for (const [key, otherValueMap] of other.entries()) {
if (!this.has(key)) continue
const values = this.get(key)
for (const [val2, mul2] of otherValueMap.values()) {
for (const [val1, mul1] of values) {
if (mul1 !== 0 && mul2 !== 0) {
yield [[key, [val1, val2]], mul1 * mul2]
}
}
}
}
}
}
}
Loading