Skip to content

Conversation

@pavelgj
Copy link
Collaborator

@pavelgj pavelgj commented Nov 3, 2025

Genkit flows can be slow operations that stream intermediate progress. In scenarios with unreliable network conditions or client-side interruptions (e.g., page reloads), clients can lose their connection to the stream. Without a durability mechanism, the stream is lost, and the client cannot retrieve the in-progress or final results.

This PR introduces the concept of a StreamManager which is responsible for persisting the state of a stream. A StreamManager can be provided when defining a flow handler.

Three StreamManager implementations are included:

  • InMemoryStreamManager: Stores stream state in memory. This is useful for development and testing but is not truly durable as it will lose state if the process restarts.
  • firestoreStreamManager: A durable StreamManager that uses Firebase Firestore to store stream state.
  • rtdbStreamManager: A durable StreamManager that uses the Firebase Realtime Database to store stream state.

To make a flow's streams durable, pass a streamManager instance to the handler. This is supported in both the Express (expressHandler) and Next.js (appRoute) plugins. The new durable-streaming sample app demonstrates this for Express:

// From js/testapps/durable-streaming/src/index.ts

const fApp = initializeApp();
export const firestore = firestoreStreamManager({
  firebaseApp: fApp,
  db: getFirestore(fApp),
  collection: 'streamy',
});

// ...

app.post(
  '/streamyFirestore',
  expressHandler(streamy, { streamManager: firestore })
);

The sample app includes a streamyThrowy flow that intentionally throws an error mid-stream to demonstrate how a durable stream manager can preserve the stream's state up to the point of failure.

A client can then connect to a durable stream and resume it if necessary:

// Start a new stream
const result = streamFlow({
  url: `http://localhost:8080/myFlowDurable`,
  input: 'tell me a long story',
});
const streamId = await result.streamId; // Save this ID

// ... later, reconnect if needed ...
const reconnectedResult = streamFlow({
  url: `http://localhost:8080/myFlowDurable`,
  streamId: streamId,
});

Beta Feature: This is to be released as a beta feature.

Checklist (if applicable):

Genkit flows can be slow operations that stream intermediate progress. In scenarios with unreliable network conditions or client-side interruptions (e.g., page reloads), clients can lose their connection to the stream. Without a durability mechanism, the stream is lost, and the client cannot retrieve the in-progress or final results.

This PR introduces the concept of a `StreamManager` which is responsible for persisting the state of a stream. A `StreamManager` can be provided when defining a flow handler.

Three `StreamManager` implementations are included:

*   `InMemoryStreamManager`: Stores stream state in memory. This is useful for development and testing but is not truly durable as it will lose state if the process restarts.
*   `firestoreStreamManager`: A durable `StreamManager` that uses Firebase Firestore to store stream state.
*   `rtdbStreamManager`: A durable `StreamManager` that uses the Firebase Realtime Database to store stream state.

To make a flow's streams durable, pass a `streamManager` instance to the handler. This is supported in both the Express (`expressHandler`) and Next.js (`appRoute`) plugins. The new `durable-streaming` sample app demonstrates this for Express:

```typescript
// From js/testapps/durable-streaming/src/index.ts

const fApp = initializeApp();
export const firestore = firestoreStreamManager({
  firebaseApp: fApp,
  db: getFirestore(fApp),
  collection: 'streamy',
});

// ...

app.post(
  '/streamyFirestore',
  expressHandler(streamy, { streamManager: firestore })
);
```

The sample app includes a `streamyThrowy` flow that intentionally throws an error mid-stream to demonstrate how a durable stream manager can preserve the stream's state up to the point of failure.

A client can then connect to a durable stream and resume it if necessary:

```typescript
// Start a new stream
const result = streamFlow({
  url: `http://localhost:8080/myFlowDurable`,
  input: 'tell me a long story',
});
const streamId = await result.streamId; // Save this ID

// ... later, reconnect if needed ...
const reconnectedResult = streamFlow({
  url: `http://localhost:8080/myFlowDurable`,
  streamId: streamId,
});
```

**Beta Feature:** This is to be released as a beta feature.
@github-actions github-actions bot added docs Improvements or additions to documentation js config root labels Nov 3, 2025
@pavelgj pavelgj requested review from apascal07 and ssbushi November 3, 2025 22:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

config docs Improvements or additions to documentation js root

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

1 participant