Skip to content

Support Token Streaming for Enhanced User Experience #450

@xingyaoww

Description

@xingyaoww

Support Token Streaming for Enhanced User Experience

Summary

Add support for LLM token streaming to improve user experience, especially with slower models like GPT-5. Currently, users must wait for the complete LLM response before seeing any output, which can create poor UX during long reasoning sessions.

Problem Statement

The current implementation explicitly rejects streaming requests:

# In openhands/sdk/llm/llm.py:354
if kwargs.get("stream", False):
    raise ValueError("Streaming is not supported")

This creates several UX issues:

  • Long wait times: Users see no feedback during LLM processing, especially problematic with slower models
  • Perceived unresponsiveness: No indication that the system is actively working
  • Poor competitive positioning: Streaming is now a standard feature in most AI applications

Current Architecture Analysis

The current event-driven architecture presents both challenges and opportunities:

Current Flow

  1. Agent.step() calls self.llm.completion() with complete message history
  2. LLM returns a complete ModelResponse
  3. Agent processes the response and generates ActionEvents or MessageEvents
  4. Events are sent via on_event callback to the conversation layer

Challenge

  • Events expect complete, structured data (actions, observations, messages)
  • Streaming returns incremental deltas, not complete events
  • The agent's event ontology doesn't naturally accommodate partial responses

Proposed Solution

Implement a dual-callback approach that preserves the existing event architecture while adding streaming capabilities:

1. Add on_token Callback to Agent

class Agent(AgentBase):
    def step(
        self,
        state: ConversationState,
        on_event: ConversationCallbackType,
        on_token: TokenCallbackType | None = None,  # NEW
    ) -> None:
        # ... existing logic ...
        
        response = self.llm.completion(
            messages=_messages,
            tools=list(self.tools_map.values()),
            on_token=on_token,  # Pass through to LLM
            # ... other params ...
        )

2. Extend LLM Class for Streaming

class LLM(BaseModel, RetryMixin, NonNativeToolCallingMixin):
    def completion(
        self,
        messages: list[Message],
        tools: Sequence[ToolBase] | None = None,
        on_token: TokenCallbackType | None = None,  # NEW
        **kwargs,
    ) -> ModelResponse:
        """
        If on_token is provided, stream tokens via callback while
        still returning complete ModelResponse for event processing.
        """
        if on_token and not kwargs.get("stream", False):
            kwargs["stream"] = True
            
        # Handle streaming internally, call on_token for each delta
        # Still return complete response for existing event processing

3. Update Conversation Layer

class LocalConversation:
    def __init__(
        self,
        # ... existing params ...
        on_token: TokenCallbackType | None = None,  # NEW
    ):
        self._on_token = on_token
        
    def _run_step(self):
        self.agent.step(
            self._state, 
            on_event=self._on_event,
            on_token=self._on_token,  # Pass through
        )

Implementation Benefits

Preserves Existing Architecture

  • No changes to event ontology or agent structure
  • Existing on_event flow remains unchanged
  • Backward compatibility maintained

Clean Separation of Concerns

  • on_token: Real-time streaming feedback
  • on_event: Structured event processing
  • Agent treats LLM calls as non-streaming internally

Server Integration Ready

  • on_token can stream directly to frontend via WebSocket
  • No complex event transformation needed
  • Simple token-by-token forwarding

Usage Examples

Basic Streaming

def token_handler(token: str):
    print(token, end="", flush=True)

conversation = Conversation(
    agent=agent,
    on_token=token_handler,
)

WebSocket Server Integration

async def websocket_token_handler(token: str):
    await websocket.send_json({
        "type": "token",
        "content": token,
        "timestamp": time.time()
    })

conversation = Conversation(
    agent=agent,
    on_token=websocket_token_handler,
)

Technical Considerations

Retry Logic

  • Streaming responses need special retry handling
  • May need to buffer tokens for retry scenarios
  • Consider fallback to non-streaming on retry failures

Error Handling

  • Stream interruption scenarios
  • Partial response recovery
  • Token callback exception handling

Performance

  • Minimal overhead when streaming disabled
  • Efficient token buffering and forwarding
  • Memory management for long responses

Alternative Approaches Considered

Event-Based Streaming

  • Would require new event types (TokenEvent, PartialResponseEvent)
  • Breaks existing event ontology
  • Complex integration with current architecture

Separate Streaming Channel

  • Using status/error channel for tokens
  • Channel not available to LLM/Agent layers
  • Architectural violation

Response Transformation

  • Converting deltas to events in real-time
  • Complex state management
  • Potential inconsistencies

Implementation Plan

Phase 1: Core Infrastructure

  • Add TokenCallbackType type definition
  • Extend LLM.completion() with on_token parameter
  • Implement streaming logic in LLM class
  • Add tests for streaming functionality

Phase 2: Agent Integration

  • Update Agent.step() signature
  • Pass on_token through agent layer
  • Ensure backward compatibility
  • Add agent-level streaming tests

Phase 3: Conversation Layer

  • Update Conversation constructors
  • Implement token callback forwarding
  • Add conversation-level streaming examples
  • Integration testing

Phase 4: Documentation & Examples

  • Update API documentation
  • Create streaming examples
  • Performance benchmarking
  • Migration guide for existing users

Success Metrics

  • Zero breaking changes to existing API
  • Streaming works with all supported LLM providers
  • <100ms latency for first token
  • Graceful fallback when streaming unavailable
  • Memory usage remains constant during streaming

Related Issues

This addresses user feedback about poor UX with slower models and brings the SDK in line with modern AI application standards where streaming is expected functionality.


Priority: High - UX improvement that affects all users, especially critical with slower models like GPT-5.

Complexity: Medium - Requires careful integration but leverages existing callback architecture.

Breaking Changes: None - Purely additive API changes with backward compatibility.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions