-
Notifications
You must be signed in to change notification settings - Fork 38
Description
Support Token Streaming for Enhanced User Experience
Summary
Add support for LLM token streaming to improve user experience, especially with slower models like GPT-5. Currently, users must wait for the complete LLM response before seeing any output, which can create poor UX during long reasoning sessions.
Problem Statement
The current implementation explicitly rejects streaming requests:
# In openhands/sdk/llm/llm.py:354
if kwargs.get("stream", False):
raise ValueError("Streaming is not supported")This creates several UX issues:
- Long wait times: Users see no feedback during LLM processing, especially problematic with slower models
- Perceived unresponsiveness: No indication that the system is actively working
- Poor competitive positioning: Streaming is now a standard feature in most AI applications
Current Architecture Analysis
The current event-driven architecture presents both challenges and opportunities:
Current Flow
Agent.step()callsself.llm.completion()with complete message history- LLM returns a complete
ModelResponse - Agent processes the response and generates
ActionEvents orMessageEvents - Events are sent via
on_eventcallback to the conversation layer
Challenge
- Events expect complete, structured data (actions, observations, messages)
- Streaming returns incremental deltas, not complete events
- The agent's event ontology doesn't naturally accommodate partial responses
Proposed Solution
Implement a dual-callback approach that preserves the existing event architecture while adding streaming capabilities:
1. Add on_token Callback to Agent
class Agent(AgentBase):
def step(
self,
state: ConversationState,
on_event: ConversationCallbackType,
on_token: TokenCallbackType | None = None, # NEW
) -> None:
# ... existing logic ...
response = self.llm.completion(
messages=_messages,
tools=list(self.tools_map.values()),
on_token=on_token, # Pass through to LLM
# ... other params ...
)2. Extend LLM Class for Streaming
class LLM(BaseModel, RetryMixin, NonNativeToolCallingMixin):
def completion(
self,
messages: list[Message],
tools: Sequence[ToolBase] | None = None,
on_token: TokenCallbackType | None = None, # NEW
**kwargs,
) -> ModelResponse:
"""
If on_token is provided, stream tokens via callback while
still returning complete ModelResponse for event processing.
"""
if on_token and not kwargs.get("stream", False):
kwargs["stream"] = True
# Handle streaming internally, call on_token for each delta
# Still return complete response for existing event processing3. Update Conversation Layer
class LocalConversation:
def __init__(
self,
# ... existing params ...
on_token: TokenCallbackType | None = None, # NEW
):
self._on_token = on_token
def _run_step(self):
self.agent.step(
self._state,
on_event=self._on_event,
on_token=self._on_token, # Pass through
)Implementation Benefits
✅ Preserves Existing Architecture
- No changes to event ontology or agent structure
- Existing
on_eventflow remains unchanged - Backward compatibility maintained
✅ Clean Separation of Concerns
on_token: Real-time streaming feedbackon_event: Structured event processing- Agent treats LLM calls as non-streaming internally
✅ Server Integration Ready
on_tokencan stream directly to frontend via WebSocket- No complex event transformation needed
- Simple token-by-token forwarding
Usage Examples
Basic Streaming
def token_handler(token: str):
print(token, end="", flush=True)
conversation = Conversation(
agent=agent,
on_token=token_handler,
)WebSocket Server Integration
async def websocket_token_handler(token: str):
await websocket.send_json({
"type": "token",
"content": token,
"timestamp": time.time()
})
conversation = Conversation(
agent=agent,
on_token=websocket_token_handler,
)Technical Considerations
Retry Logic
- Streaming responses need special retry handling
- May need to buffer tokens for retry scenarios
- Consider fallback to non-streaming on retry failures
Error Handling
- Stream interruption scenarios
- Partial response recovery
- Token callback exception handling
Performance
- Minimal overhead when streaming disabled
- Efficient token buffering and forwarding
- Memory management for long responses
Alternative Approaches Considered
❌ Event-Based Streaming
- Would require new event types (
TokenEvent,PartialResponseEvent) - Breaks existing event ontology
- Complex integration with current architecture
❌ Separate Streaming Channel
- Using status/error channel for tokens
- Channel not available to LLM/Agent layers
- Architectural violation
❌ Response Transformation
- Converting deltas to events in real-time
- Complex state management
- Potential inconsistencies
Implementation Plan
Phase 1: Core Infrastructure
- Add
TokenCallbackTypetype definition - Extend
LLM.completion()withon_tokenparameter - Implement streaming logic in LLM class
- Add tests for streaming functionality
Phase 2: Agent Integration
- Update
Agent.step()signature - Pass
on_tokenthrough agent layer - Ensure backward compatibility
- Add agent-level streaming tests
Phase 3: Conversation Layer
- Update
Conversationconstructors - Implement token callback forwarding
- Add conversation-level streaming examples
- Integration testing
Phase 4: Documentation & Examples
- Update API documentation
- Create streaming examples
- Performance benchmarking
- Migration guide for existing users
Success Metrics
- Zero breaking changes to existing API
- Streaming works with all supported LLM providers
- <100ms latency for first token
- Graceful fallback when streaming unavailable
- Memory usage remains constant during streaming
Related Issues
This addresses user feedback about poor UX with slower models and brings the SDK in line with modern AI application standards where streaming is expected functionality.
Priority: High - UX improvement that affects all users, especially critical with slower models like GPT-5.
Complexity: Medium - Requires careful integration but leverages existing callback architecture.
Breaking Changes: None - Purely additive API changes with backward compatibility.