-
Notifications
You must be signed in to change notification settings - Fork 168
Description
What problem are you trying to solve?
The chunk of a stream can contain any type of value. Some of these values may need to call a specific method on them for them to be properly cleaned up. When a ReadableStream or a WritableStream is cancelled prematurely they both have a cancel
method that will be called so they can clean up any resources they've locked down.
BUT, this is insufficient for when the chunks themselves contain resources that need to be manually freed. For example, a readable stream may produce chunks of objects that have more readable streams on themselves. If these chunks are sitting in the internal queue waiting for the next pull
method to be called when cancel
is, then the readable streams on the chunks won't have their cancel methods be called.
Below is an example of this, where three chunks have been enqueued, but only the first was able to be cleaned up. What happens to the other two? We don't know.
function createSource(i: number): ReadableStream<Uint8Array> {
return new ReadableStream({
start(_controller) {
console.log(`${i} Started!`);
},
pull(controller) {
controller.enqueue(new Uint8Array(100));
},
cancel() {
console.log(`${i} Cancelled!`);
},
});
}
type Input = { path: string; readable: ReadableStream<Uint8Array> };
ReadableStream.from<Input>([
{ path: "potato", readable: createSource(1) },
{ path: "cake", readable: createSource(2) },
{ path: "carrot", readable: createSource(3) },
])
.pipeThrough(
new TransformStream<Input, unknown>({
transform(chunk, controller) {
const reader = chunk.readable.getReader();
try {
// Has an error while processing.
throw "STOP";
} catch (e) {
reader.cancel(e);
controller.error(e);
}
},
}),
)
.pipeTo(new WritableStream())
.catch(() => {});
What solutions exist today?
The only reliable way currently to clean up these resources would be to catch the error at the transform step to stop it reaching the next readable up, and manually ask it for each chunk to manually cancel their readables. Because we don't know how many are in the queue.
In the above example, all three sources are in the queue, but realistically we don't know how many are in the queue or how many more chunks there are to come.
How would you solve it?
The easiest way to solve it would be to provide the internal queue upon cancellation so we can also clean up any resources that may exist in it.
{
async cancel(reason: any, internalQueue: Input[]): Promise<void> {
for (const chunk of internalQueue) {
await chunk.readable.cancel(reason);
}
}
}
Anything else?
TransformStreams don't have a cancel
method, so if their step have resources that need releasing, then one can't use the TransformStream
constructor to create their transform stream and must instead shimmy a WritableStream and ReadableStream together. Although from my experience, one would just use a TransformStream
internally still merely to convert a writable to a readable.