Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 91 additions & 1 deletion app/agent/base.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from abc import ABC, abstractmethod
from contextlib import asynccontextmanager
from typing import List, Optional
import uuid

from pydantic import BaseModel, Field, model_validator

from app.llm import LLM
from app.logger import logger
from app.sandbox.client import SANDBOX_CLIENT
from app.schema import ROLE_TYPE, AgentState, Memory, Message
from app.event.init import ensure_event_system_initialized, publish_agent_step_start


class BaseAgent(BaseModel, ABC):
Expand Down Expand Up @@ -42,17 +44,34 @@ class BaseAgent(BaseModel, ABC):

duplicate_threshold: int = 2

# Event system integration
conversation_id: Optional[str] = Field(default=None, description="Current conversation ID for event tracking")
enable_events: bool = Field(default=True, description="Whether to publish events during execution")

class Config:
arbitrary_types_allowed = True
extra = "allow" # Allow extra fields for flexibility in subclasses

@model_validator(mode="after")
def initialize_agent(self) -> "BaseAgent":
"""Initialize agent with default settings if not provided."""
"""Initialize agent with default settings and event system."""
if self.llm is None or not isinstance(self.llm, LLM):
self.llm = LLM(config_name=self.name.lower())
if not isinstance(self.memory, Memory):
self.memory = Memory()

# Initialize system prompt if provided
if self.system_prompt:
self.memory.add_message(Message(role="system", content=self.system_prompt))

# Ensure event system is initialized if events are enabled
if self.enable_events:
ensure_event_system_initialized()

# Generate conversation ID if not provided
if self.conversation_id is None:
self.conversation_id = str(uuid.uuid4())

return self

@asynccontextmanager
Expand Down Expand Up @@ -138,8 +157,36 @@ async def run(self, request: Optional[str] = None) -> str:
):
self.current_step += 1
logger.info(f"Executing step {self.current_step}/{self.max_steps}")

# Publish step start event
if self.enable_events:
try:
await publish_agent_step_start(
agent_name=self.name,
agent_type=self.__class__.__name__,
step_number=self.current_step,
conversation_id=self.conversation_id
)
except Exception as e:
logger.warning(f"Failed to publish agent step start event: {e}")

step_result = await self.step()

# Publish step complete event
if self.enable_events:
try:
from app.event import create_agent_step_start_event, publish_event, AgentStepCompleteEvent
complete_event = AgentStepCompleteEvent(
agent_name=self.name,
agent_type=self.__class__.__name__,
step_number=self.current_step,
result=step_result,
conversation_id=self.conversation_id
)
await publish_event(complete_event)
except Exception as e:
logger.warning(f"Failed to publish agent step complete event: {e}")

# Check for stuck state
if self.is_stuck():
self.handle_stuck_state()
Expand Down Expand Up @@ -194,3 +241,46 @@ def messages(self) -> List[Message]:
def messages(self, value: List[Message]):
"""Set the list of messages in the agent's memory."""
self.memory.messages = value

# Event system integration methods

def set_conversation_id(self, conversation_id: str) -> None:
"""Set the conversation ID for event tracking."""
self.conversation_id = conversation_id

def enable_event_publishing(self, enabled: bool = True) -> None:
"""Enable or disable event publishing."""
self.enable_events = enabled
if enabled:
ensure_event_system_initialized()

async def publish_custom_event(self, event_type: str, data: dict) -> bool:
"""Publish a custom agent event.

Args:
event_type: Type of the event (e.g., "agent.custom.thinking")
data: Event data dictionary

Returns:
bool: True if event was published successfully
"""
if not self.enable_events:
return False

try:
from app.event import BaseEvent, publish_event

event = BaseEvent(
event_type=event_type,
data={
"agent_name": self.name,
"agent_type": self.__class__.__name__,
"conversation_id": self.conversation_id,
**data
},
source=self.name
)
return await publish_event(event)
except Exception as e:
logger.warning(f"Failed to publish custom event {event_type}: {e}")
return False
4 changes: 2 additions & 2 deletions app/agent/browser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from app.prompt.browser import NEXT_STEP_PROMPT, SYSTEM_PROMPT
from app.schema import Message, ToolChoice
from app.tool import BrowserUseTool, Terminate, ToolCollection

from app.tool.crawl4ai import Crawl4aiTool

# Avoid circular import if BrowserAgent needs BrowserContextHelper
if TYPE_CHECKING:
Expand Down Expand Up @@ -98,7 +98,7 @@ class BrowserAgent(ToolCallAgent):

# Configure the available tools
available_tools: ToolCollection = Field(
default_factory=lambda: ToolCollection(BrowserUseTool(), Terminate())
default_factory=lambda: ToolCollection(BrowserUseTool(), Terminate(), Crawl4aiTool())
)

# Use Auto for tool choice to allow both tool usage and free-form responses
Expand Down
2 changes: 2 additions & 0 deletions app/agent/manus.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from app.tool.mcp import MCPClients, MCPClientTool
from app.tool.python_execute import PythonExecute
from app.tool.str_replace_editor import StrReplaceEditor
from app.tool.crawl4ai import Crawl4aiTool


class Manus(ToolCallAgent):
Expand All @@ -38,6 +39,7 @@ class Manus(ToolCallAgent):
StrReplaceEditor(),
AskHuman(),
Terminate(),
Crawl4aiTool(),
)
)

Expand Down
174 changes: 174 additions & 0 deletions app/event/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
"""Event system package for OpenManus.

This is the main entry point for the event system, providing a clean API
that abstracts the internal layered architecture.
"""

# Core abstractions
from app.event.core import (
BaseEvent,
BaseEventHandler,
BaseEventBus,
ChainableEvent,
EventContext,
EventStatus,
ToolExecutionStatus
)

# Infrastructure components
from app.event.infrastructure import (
EventHandlerRegistry,
HandlerInfo,
event_handler,
get_global_registry,
BaseMiddleware,
MiddlewareChain,
MiddlewareContext,
LoggingMiddleware,
RetryMiddleware,
ErrorIsolationMiddleware,
MetricsMiddleware,
create_default_middleware_chain,
SimpleEventBus,
ChainableEventBus,
get_global_bus,
set_global_bus,
publish_event,
subscribe_handler,
unsubscribe_handler,
get_bus_stats
)

# Domain events
from app.event.domain import (
# Agent events
AgentEvent,
AgentStepStartEvent,
AgentStepCompleteEvent,
ChainableAgentEvent,
ChainableAgentStepStartEvent,
ChainableAgentStepCompleteEvent,

# Conversation events
ConversationEvent,
ConversationCreatedEvent,
ConversationClosedEvent,
UserInputEvent,
InterruptEvent,
AgentResponseEvent,
LLMStreamEvent,
ToolResultDisplayEvent,

# Tool events
ToolEvent,
ToolExecutionEvent,
ToolResultEvent,
ChainableToolEvent,
ChainableToolExecutionRequestEvent,
ChainableToolExecutionCompletedEvent,

# System events
SystemEvent,
SystemErrorEvent,
ChainableSystemEvent,
ChainableLogWriteEvent,
ChainableMetricsUpdateEvent,
ChainableStreamEvent,
ChainableStreamStartEvent,
ChainableStreamChunkEvent,
ChainableStreamEndEvent,
ChainableStreamInterruptEvent
)

# Event factory functions
from app.event.interfaces import (
create_agent_step_start_event,
create_chainable_agent_step_start_event,
create_conversation_created_event,
create_user_input_event,
create_interrupt_event,
create_tool_execution_event,
create_chainable_tool_execution_request_event,
create_system_error_event
)

__all__ = [
# Core abstractions
"BaseEvent",
"BaseEventHandler",
"BaseEventBus",
"ChainableEvent",
"EventContext",
"EventStatus",
"ToolExecutionStatus",

# Infrastructure components
"EventHandlerRegistry",
"HandlerInfo",
"event_handler",
"get_global_registry",
"BaseMiddleware",
"MiddlewareChain",
"MiddlewareContext",
"LoggingMiddleware",
"RetryMiddleware",
"ErrorIsolationMiddleware",
"MetricsMiddleware",
"create_default_middleware_chain",
"SimpleEventBus",
"ChainableEventBus",
"get_global_bus",
"set_global_bus",
"publish_event",
"subscribe_handler",
"unsubscribe_handler",
"get_bus_stats",

# Agent events
"AgentEvent",
"AgentStepStartEvent",
"AgentStepCompleteEvent",
"ChainableAgentEvent",
"ChainableAgentStepStartEvent",
"ChainableAgentStepCompleteEvent",

# Conversation events
"ConversationEvent",
"ConversationCreatedEvent",
"ConversationClosedEvent",
"UserInputEvent",
"InterruptEvent",
"AgentResponseEvent",
"LLMStreamEvent",
"ToolResultDisplayEvent",

# Tool events
"ToolEvent",
"ToolExecutionEvent",
"ToolResultEvent",
"ChainableToolEvent",
"ChainableToolExecutionRequestEvent",
"ChainableToolExecutionCompletedEvent",

# System events
"SystemEvent",
"SystemErrorEvent",
"ChainableSystemEvent",
"ChainableLogWriteEvent",
"ChainableMetricsUpdateEvent",
"ChainableStreamEvent",
"ChainableStreamStartEvent",
"ChainableStreamChunkEvent",
"ChainableStreamEndEvent",
"ChainableStreamInterruptEvent",

# Event factory functions
"create_agent_step_start_event",
"create_chainable_agent_step_start_event",
"create_conversation_created_event",
"create_user_input_event",
"create_interrupt_event",
"create_tool_execution_event",
"create_chainable_tool_execution_request_event",
"create_system_error_event",
]
23 changes: 23 additions & 0 deletions app/event/core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Core event system components.

This module contains the fundamental abstractions and types for the event system.
"""

from .base import (
BaseEvent,
BaseEventHandler,
BaseEventBus,
ChainableEvent,
EventContext
)
from .types import EventStatus, ToolExecutionStatus

__all__ = [
"BaseEvent",
"BaseEventHandler",
"BaseEventBus",
"ChainableEvent",
"EventContext",
"EventStatus",
"ToolExecutionStatus"
]
Loading