Skip to content

Conversation

AstraBert
Copy link
Member

Closes #31

In this PR, we try to expose (through stream_events) internal processes within the workflow (step marked as in progress, step running, state of queues...)

Optimally, we should also be able to capture and report the changes in the State

Example usage:

from workflows.events import StartEvent, StopEvent, Event, InternalDispatchEvent
from workflows import Context, Workflow, step

class SecondEvent(Event):
    greeting: str

class TestWf(Workflow):
    @step
    async def step_one(self, ev: StartEvent, ctx: Context) -> SecondEvent:
        print(ev.message)
        ctx.write_event_to_stream(ev)
        return SecondEvent(greeting="hello")
    @step
    async def step_two(self, ev: SecondEvent, ctx: Context) -> StopEvent:
        print(ev.greeting)
        ctx.write_event_to_stream(ev)
        return StopEvent(result="done")

async def main():
    wf = TestWf()
    handler = wf.run(message="hi")
    async for ev in handler.stream_events(expose_internal=True):
        if isinstance(ev, InternalDispatchEvent):
            print(f"Event {type(ev.data)}: {ev.data}")

    result = await handler
    print(result)

if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

Output:

Event <class 'workflows.events._QueueStateEvent'>: queue_name='_done' queue_size=1
Event <class 'workflows.events._QueueStateEvent'>: queue_name='step_one' queue_size=1
Event <class 'workflows.events._QueueStateEvent'>: queue_name='step_two' queue_size=1
hi
hello
Event <class 'workflows.events._InProgressStepEvent'>: ev=StartEvent() name='step_one' in_progress=True
Event <class 'workflows.events._RunningStepEvent'>: name='step_one' running=True
Event <class 'workflows.events._RunningStepEvent'>: name='step_one' running=False
Event <class 'workflows.events._InProgressStepEvent'>: ev=StartEvent() name='step_one' in_progress=False
Event <class 'workflows.events._QueueStateEvent'>: queue_name='_done' queue_size=1
Event <class 'workflows.events._QueueStateEvent'>: queue_name='step_one' queue_size=1
Event <class 'workflows.events._QueueStateEvent'>: queue_name='step_two' queue_size=2
Event <class 'workflows.events._InProgressStepEvent'>: ev=SecondEvent(greeting='hello') name='step_two' in_progress=True
Event <class 'workflows.events._RunningStepEvent'>: name='step_two' running=True
Event <class 'workflows.events._RunningStepEvent'>: name='step_two' running=False
Event <class 'workflows.events._InProgressStepEvent'>: ev=SecondEvent(greeting='hello') name='step_two' in_progress=False
Event <class 'workflows.events._QueueStateEvent'>: queue_name='_done' queue_size=2
Event <class 'workflows.events._QueueStateEvent'>: queue_name='step_one' queue_size=1
Event <class 'workflows.events._QueueStateEvent'>: queue_name='step_two' queue_size=1
Event <class 'workflows.events._InProgressStepEvent'>: ev=StopEvent() name='_done' in_progress=True
Event <class 'workflows.events._RunningStepEvent'>: name='_done' running=True
done

@coveralls
Copy link

coveralls commented Aug 21, 2025

Pull Request Test Coverage Report for Build 17152041363

Warning: This coverage report may be inaccurate.

This pull request's base commit is no longer the HEAD commit of its target branch. This means it includes changes from outside the original pull request, including, potentially, unrelated coverage changes.

Details

  • 46 of 47 (97.87%) changed or added relevant lines in 3 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage increased (+0.05%) to 96.219%

Changes Missing Coverage Covered Lines Changed/Added Lines %
src/workflows/events.py 27 28 96.43%
Totals Coverage Status
Change from base Build 17077019314: 0.05%
Covered Lines: 1196
Relevant Lines: 1243

💛 - Coveralls

@AstraBert AstraBert marked this pull request as ready for review August 21, 2025 19:05
@AstraBert AstraBert changed the title wip: expose internal events feat: expose internal events Aug 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request] Expose a public API for observing the runtime
3 participants