Skip to content

Conversation

ruifigueira
Copy link
Contributor

Proposal to add type safety to streaming calls.

Usage:

class MyAgent extends Agent<typeof env, {}> {
  @callable({ streaming: true })
  performStream(response: StreamingResponse<number, string>, result: string): void {
    response.send(1);
    response.send(2);
    response.end(result);
  }
}

const { stub } = useAgent<MyAgent, {}>({ agent: "my-agent" });

stub.performStream({
  onChunk: (chunk) => console.log('streaming: ', chunk),
  onDone: (finalChunk) => console.log('done: ', finalChunk),
}, "hello");

I'm opening this as a draft as I'm not very happy with the current solution, first argument being a StreamOptions doesn't seem right.

I'll gladly accept suggestions on how to improve it.

Copy link

changeset-bot bot commented May 24, 2025

🦋 Changeset detected

Latest commit: eea893b

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 2 packages
Name Type
agents Minor
hono-agents Major

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@ruifigueira
Copy link
Contributor Author

BTW, my idea is to integrate streaming properly into stubs and simplify everything by just keeping type safety in stubs (I think call function adds unnecessary complexity due to args being key optional, and that's why I need RequiredAgentMethods and OptionalAgentMethods types).

Copy link

pkg-pr-new bot commented Sep 29, 2025

Open in StackBlitz

npm i https://pkg.pr.new/cloudflare/agents@304

commit: eea893b

@ruifigueira ruifigueira marked this pull request as ready for review September 29, 2025 07:20
@ruifigueira ruifigueira marked this pull request as draft September 29, 2025 07:42
@ruifigueira
Copy link
Contributor Author

ruifigueira commented Sep 30, 2025

@whoiskatrin @threepointone I'm trying a different approach for streaming methods using async generators.

The example above will look like this:

class MyAgent extends Agent<typeof env, {}> {
  @callable({ streaming: true })
  performStream(response: StreamingResponse<number, string>, result: string): void {
    response.send(1);
    response.send(2);
    response.end(result);
  }
}

// in react

const { streamingStub } = useAgent<MyAgent, {}>({ agent: "my-agent" });

// we can consume chunks with for await but not return value
for await (const chunk of streamingStub.performStream('Hello')) {
  console.log('streaming: ', chunk);
}

For chunks (yielded values) consumption, this is a very clean approach. If we need to consume the onDone object, it's also possible but we need to iterate with .next() and check the done property..

With this approach, it would also be worth of supporting declaring streaming method implementations as async generator functions, something like:

class MyAgent extends Agent<typeof env, {}> {
  @callable({ streaming: true })
  async * performStream(result: string) { // returns a AsyncGenerator<number, string>
    yield 1;
    yield await Promise.resolve(2);
    return result;
  }
}

Edit: streamingStub implementation is flawed, we may loose chunks, but it's just a matter of improving it if you find this approach acceptable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant