diff --git a/app/agent/base.py b/app/agent/base.py index 65f660073..be8b6c736 100644 --- a/app/agent/base.py +++ b/app/agent/base.py @@ -1,6 +1,7 @@ from abc import ABC, abstractmethod from contextlib import asynccontextmanager from typing import List, Optional +import uuid from pydantic import BaseModel, Field, model_validator @@ -8,6 +9,7 @@ from app.logger import logger from app.sandbox.client import SANDBOX_CLIENT from app.schema import ROLE_TYPE, AgentState, Memory, Message +from app.event.init import ensure_event_system_initialized, publish_agent_step_start class BaseAgent(BaseModel, ABC): @@ -42,17 +44,34 @@ class BaseAgent(BaseModel, ABC): duplicate_threshold: int = 2 + # Event system integration + conversation_id: Optional[str] = Field(default=None, description="Current conversation ID for event tracking") + enable_events: bool = Field(default=True, description="Whether to publish events during execution") + class Config: arbitrary_types_allowed = True extra = "allow" # Allow extra fields for flexibility in subclasses @model_validator(mode="after") def initialize_agent(self) -> "BaseAgent": - """Initialize agent with default settings if not provided.""" + """Initialize agent with default settings and event system.""" if self.llm is None or not isinstance(self.llm, LLM): self.llm = LLM(config_name=self.name.lower()) if not isinstance(self.memory, Memory): self.memory = Memory() + + # Initialize system prompt if provided + if self.system_prompt: + self.memory.add_message(Message(role="system", content=self.system_prompt)) + + # Ensure event system is initialized if events are enabled + if self.enable_events: + ensure_event_system_initialized() + + # Generate conversation ID if not provided + if self.conversation_id is None: + self.conversation_id = str(uuid.uuid4()) + return self @asynccontextmanager @@ -138,8 +157,36 @@ async def run(self, request: Optional[str] = None) -> str: ): self.current_step += 1 logger.info(f"Executing step {self.current_step}/{self.max_steps}") + + # Publish step start event + if self.enable_events: + try: + await publish_agent_step_start( + agent_name=self.name, + agent_type=self.__class__.__name__, + step_number=self.current_step, + conversation_id=self.conversation_id + ) + except Exception as e: + logger.warning(f"Failed to publish agent step start event: {e}") + step_result = await self.step() + # Publish step complete event + if self.enable_events: + try: + from app.event import create_agent_step_start_event, publish_event, AgentStepCompleteEvent + complete_event = AgentStepCompleteEvent( + agent_name=self.name, + agent_type=self.__class__.__name__, + step_number=self.current_step, + result=step_result, + conversation_id=self.conversation_id + ) + await publish_event(complete_event) + except Exception as e: + logger.warning(f"Failed to publish agent step complete event: {e}") + # Check for stuck state if self.is_stuck(): self.handle_stuck_state() @@ -194,3 +241,46 @@ def messages(self) -> List[Message]: def messages(self, value: List[Message]): """Set the list of messages in the agent's memory.""" self.memory.messages = value + + # Event system integration methods + + def set_conversation_id(self, conversation_id: str) -> None: + """Set the conversation ID for event tracking.""" + self.conversation_id = conversation_id + + def enable_event_publishing(self, enabled: bool = True) -> None: + """Enable or disable event publishing.""" + self.enable_events = enabled + if enabled: + ensure_event_system_initialized() + + async def publish_custom_event(self, event_type: str, data: dict) -> bool: + """Publish a custom agent event. + + Args: + event_type: Type of the event (e.g., "agent.custom.thinking") + data: Event data dictionary + + Returns: + bool: True if event was published successfully + """ + if not self.enable_events: + return False + + try: + from app.event import BaseEvent, publish_event + + event = BaseEvent( + event_type=event_type, + data={ + "agent_name": self.name, + "agent_type": self.__class__.__name__, + "conversation_id": self.conversation_id, + **data + }, + source=self.name + ) + return await publish_event(event) + except Exception as e: + logger.warning(f"Failed to publish custom event {event_type}: {e}") + return False diff --git a/app/agent/browser.py b/app/agent/browser.py index 92d8ea62f..306ed6daf 100644 --- a/app/agent/browser.py +++ b/app/agent/browser.py @@ -8,7 +8,7 @@ from app.prompt.browser import NEXT_STEP_PROMPT, SYSTEM_PROMPT from app.schema import Message, ToolChoice from app.tool import BrowserUseTool, Terminate, ToolCollection - +from app.tool.crawl4ai import Crawl4aiTool # Avoid circular import if BrowserAgent needs BrowserContextHelper if TYPE_CHECKING: @@ -98,7 +98,7 @@ class BrowserAgent(ToolCallAgent): # Configure the available tools available_tools: ToolCollection = Field( - default_factory=lambda: ToolCollection(BrowserUseTool(), Terminate()) + default_factory=lambda: ToolCollection(BrowserUseTool(), Terminate(), Crawl4aiTool()) ) # Use Auto for tool choice to allow both tool usage and free-form responses diff --git a/app/agent/manus.py b/app/agent/manus.py index df40edbba..bfd2d4cb7 100644 --- a/app/agent/manus.py +++ b/app/agent/manus.py @@ -13,6 +13,7 @@ from app.tool.mcp import MCPClients, MCPClientTool from app.tool.python_execute import PythonExecute from app.tool.str_replace_editor import StrReplaceEditor +from app.tool.crawl4ai import Crawl4aiTool class Manus(ToolCallAgent): @@ -38,6 +39,7 @@ class Manus(ToolCallAgent): StrReplaceEditor(), AskHuman(), Terminate(), + Crawl4aiTool(), ) ) diff --git a/app/event/__init__.py b/app/event/__init__.py new file mode 100644 index 000000000..e9f49e9b2 --- /dev/null +++ b/app/event/__init__.py @@ -0,0 +1,174 @@ +"""Event system package for OpenManus. + +This is the main entry point for the event system, providing a clean API +that abstracts the internal layered architecture. +""" + +# Core abstractions +from app.event.core import ( + BaseEvent, + BaseEventHandler, + BaseEventBus, + ChainableEvent, + EventContext, + EventStatus, + ToolExecutionStatus +) + +# Infrastructure components +from app.event.infrastructure import ( + EventHandlerRegistry, + HandlerInfo, + event_handler, + get_global_registry, + BaseMiddleware, + MiddlewareChain, + MiddlewareContext, + LoggingMiddleware, + RetryMiddleware, + ErrorIsolationMiddleware, + MetricsMiddleware, + create_default_middleware_chain, + SimpleEventBus, + ChainableEventBus, + get_global_bus, + set_global_bus, + publish_event, + subscribe_handler, + unsubscribe_handler, + get_bus_stats +) + +# Domain events +from app.event.domain import ( + # Agent events + AgentEvent, + AgentStepStartEvent, + AgentStepCompleteEvent, + ChainableAgentEvent, + ChainableAgentStepStartEvent, + ChainableAgentStepCompleteEvent, + + # Conversation events + ConversationEvent, + ConversationCreatedEvent, + ConversationClosedEvent, + UserInputEvent, + InterruptEvent, + AgentResponseEvent, + LLMStreamEvent, + ToolResultDisplayEvent, + + # Tool events + ToolEvent, + ToolExecutionEvent, + ToolResultEvent, + ChainableToolEvent, + ChainableToolExecutionRequestEvent, + ChainableToolExecutionCompletedEvent, + + # System events + SystemEvent, + SystemErrorEvent, + ChainableSystemEvent, + ChainableLogWriteEvent, + ChainableMetricsUpdateEvent, + ChainableStreamEvent, + ChainableStreamStartEvent, + ChainableStreamChunkEvent, + ChainableStreamEndEvent, + ChainableStreamInterruptEvent +) + +# Event factory functions +from app.event.interfaces import ( + create_agent_step_start_event, + create_chainable_agent_step_start_event, + create_conversation_created_event, + create_user_input_event, + create_interrupt_event, + create_tool_execution_event, + create_chainable_tool_execution_request_event, + create_system_error_event +) + +__all__ = [ + # Core abstractions + "BaseEvent", + "BaseEventHandler", + "BaseEventBus", + "ChainableEvent", + "EventContext", + "EventStatus", + "ToolExecutionStatus", + + # Infrastructure components + "EventHandlerRegistry", + "HandlerInfo", + "event_handler", + "get_global_registry", + "BaseMiddleware", + "MiddlewareChain", + "MiddlewareContext", + "LoggingMiddleware", + "RetryMiddleware", + "ErrorIsolationMiddleware", + "MetricsMiddleware", + "create_default_middleware_chain", + "SimpleEventBus", + "ChainableEventBus", + "get_global_bus", + "set_global_bus", + "publish_event", + "subscribe_handler", + "unsubscribe_handler", + "get_bus_stats", + + # Agent events + "AgentEvent", + "AgentStepStartEvent", + "AgentStepCompleteEvent", + "ChainableAgentEvent", + "ChainableAgentStepStartEvent", + "ChainableAgentStepCompleteEvent", + + # Conversation events + "ConversationEvent", + "ConversationCreatedEvent", + "ConversationClosedEvent", + "UserInputEvent", + "InterruptEvent", + "AgentResponseEvent", + "LLMStreamEvent", + "ToolResultDisplayEvent", + + # Tool events + "ToolEvent", + "ToolExecutionEvent", + "ToolResultEvent", + "ChainableToolEvent", + "ChainableToolExecutionRequestEvent", + "ChainableToolExecutionCompletedEvent", + + # System events + "SystemEvent", + "SystemErrorEvent", + "ChainableSystemEvent", + "ChainableLogWriteEvent", + "ChainableMetricsUpdateEvent", + "ChainableStreamEvent", + "ChainableStreamStartEvent", + "ChainableStreamChunkEvent", + "ChainableStreamEndEvent", + "ChainableStreamInterruptEvent", + + # Event factory functions + "create_agent_step_start_event", + "create_chainable_agent_step_start_event", + "create_conversation_created_event", + "create_user_input_event", + "create_interrupt_event", + "create_tool_execution_event", + "create_chainable_tool_execution_request_event", + "create_system_error_event", +] diff --git a/app/event/core/__init__.py b/app/event/core/__init__.py new file mode 100644 index 000000000..9adb28ad2 --- /dev/null +++ b/app/event/core/__init__.py @@ -0,0 +1,23 @@ +"""Core event system components. + +This module contains the fundamental abstractions and types for the event system. +""" + +from .base import ( + BaseEvent, + BaseEventHandler, + BaseEventBus, + ChainableEvent, + EventContext +) +from .types import EventStatus, ToolExecutionStatus + +__all__ = [ + "BaseEvent", + "BaseEventHandler", + "BaseEventBus", + "ChainableEvent", + "EventContext", + "EventStatus", + "ToolExecutionStatus" +] diff --git a/app/event/core/base.py b/app/event/core/base.py new file mode 100644 index 000000000..aa0443de1 --- /dev/null +++ b/app/event/core/base.py @@ -0,0 +1,352 @@ +"""Base classes for the event bus system.""" + +import asyncio +import uuid +from abc import ABC, abstractmethod +from datetime import datetime +from typing import Any, Dict, List, Optional, Type, Union +from dataclasses import dataclass, field + +from pydantic import BaseModel, Field + +from app.logger import logger +from .types import EventStatus + + +@dataclass +class EventContext: + """事件执行上下文,用于管理事件链和中断""" + root_event_id: str # 根事件ID + conversation_id: str # 对话ID + agent_id: str # 智能体ID + execution_chain: List[str] # 事件执行链 ["event1", "event2", "event3"] + cancellation_token: asyncio.Event # 取消令牌 + metadata: Dict[str, Any] = field(default_factory=dict) + + +class BaseEvent(BaseModel): + """Base class for all events in the system. + + Provides common event properties and serialization capabilities. + All custom events should inherit from this class. + """ + + # Core event properties + event_id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Unique event identifier") + event_type: str = Field(..., description="Type/name of the event") + timestamp: datetime = Field(default_factory=datetime.now, description="Event creation timestamp") + + # Event metadata + source: Optional[str] = Field(None, description="Source component that generated the event") + status: EventStatus = Field(default=EventStatus.PENDING, description="Current event status") + + # Event data + data: Dict[str, Any] = Field(default_factory=dict, description="Event payload data") + metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional event metadata") + + # Processing tracking + processed_by: List[str] = Field(default_factory=list, description="List of handlers that processed this event") + error_message: Optional[str] = Field(None, description="Error message if event processing failed") + + class Config: + arbitrary_types_allowed = True + extra = "allow" # Allow extra fields for flexibility + + def mark_processing(self, handler_name: str) -> None: + """Mark event as being processed by a handler.""" + self.status = EventStatus.PROCESSING + if handler_name not in self.processed_by: + self.processed_by.append(handler_name) + + def mark_completed(self) -> None: + """Mark event as successfully completed.""" + self.status = EventStatus.COMPLETED + + def mark_failed(self, error: str) -> None: + """Mark event as failed with error message.""" + self.status = EventStatus.FAILED + self.error_message = error + + def mark_cancelled(self) -> None: + """Mark event as cancelled.""" + self.status = EventStatus.CANCELLED + + +class ChainableEvent(BaseEvent): + """支持事件链和中断的扩展事件类 + + 继承自 BaseEvent,添加了事件链管理和中断支持功能。 + 适用于需要支持级联事件和中断的场景。 + """ + + # Event context for chain management and interruption + context: Optional[EventContext] = Field(None, description="Event execution context") + + def create_child_event(self, event_type: str, data: Dict[str, Any], **kwargs) -> 'ChainableEvent': + """创建子事件,继承上下文 + + Args: + event_type: 子事件类型 + data: 子事件数据 + **kwargs: 其他事件属性 + + Returns: + ChainableEvent: 继承了上下文的子事件 + """ + child_event = ChainableEvent( + event_type=event_type, + data=data, + source=self.source, + **kwargs + ) + + if self.context: + # 继承父事件的上下文 + child_event.context = EventContext( + root_event_id=self.context.root_event_id, + conversation_id=self.context.conversation_id, + agent_id=self.context.agent_id, + execution_chain=self.context.execution_chain + [self.event_id], + cancellation_token=self.context.cancellation_token, + metadata=self.context.metadata.copy() + ) + + return child_event + + def is_cancelled(self) -> bool: + """检查事件是否被取消 + + Returns: + bool: 如果事件被取消返回True + """ + return self.context and self.context.cancellation_token.is_set() + + def get_conversation_id(self) -> Optional[str]: + """获取对话ID + + Returns: + Optional[str]: 对话ID,如果没有上下文则返回None + """ + if self.context: + return self.context.conversation_id + return self.data.get('conversation_id') + + def get_agent_id(self) -> Optional[str]: + """获取智能体ID + + Returns: + Optional[str]: 智能体ID,如果没有上下文则返回None + """ + if self.context: + return self.context.agent_id + return self.data.get('agent_id') + + def get_execution_chain(self) -> List[str]: + """获取事件执行链 + + Returns: + List[str]: 事件执行链 + """ + if self.context: + return self.context.execution_chain + [self.event_id] + return [self.event_id] + + def get_root_event_id(self) -> str: + """获取根事件ID + + Returns: + str: 根事件ID + """ + if self.context: + return self.context.root_event_id + return self.event_id + + +class BaseEventHandler(ABC, BaseModel): + """Abstract base class for event handlers. + + Event handlers process specific types of events. Each handler should + implement the handle method to define its processing logic. + """ + + # Handler identification + name: str = Field(..., description="Unique name of the event handler") + description: Optional[str] = Field(None, description="Optional handler description") + + # Handler configuration + enabled: bool = Field(default=True, description="Whether the handler is enabled") + + # Event filtering + supported_events: List[str] = Field(default_factory=list, description="List of event types this handler supports") + + class Config: + arbitrary_types_allowed = True + extra = "allow" + + @abstractmethod + async def handle(self, event: BaseEvent) -> bool: + """Handle an event. + + Args: + event: The event to process + + Returns: + bool: True if event was handled successfully, False otherwise + + Raises: + Exception: If event processing fails + """ + pass + + def can_handle(self, event: BaseEvent) -> bool: + """Check if this handler can process the given event. + + Args: + event: The event to check + + Returns: + bool: True if handler can process the event + """ + if not self.enabled: + return False + + # If no specific events are configured, handle all events + if not self.supported_events: + return True + + return event.event_type in self.supported_events + + async def safe_handle(self, event: BaseEvent) -> bool: + """Safely handle an event with error handling and logging. + + Args: + event: The event to process + + Returns: + bool: True if event was handled successfully, False otherwise + """ + if not self.can_handle(event): + return False + + try: + event.mark_processing(self.name) + logger.debug(f"Handler '{self.name}' processing event {event.event_id} ({event.event_type})") + + result = await self.handle(event) + + if result: + logger.debug(f"Handler '{self.name}' successfully processed event {event.event_id}") + else: + logger.warning(f"Handler '{self.name}' failed to process event {event.event_id}") + + return result + + except Exception as e: + error_msg = f"Handler '{self.name}' error processing event {event.event_id}: {str(e)}" + logger.error(error_msg) + event.mark_failed(error_msg) + return False + + +class BaseEventBus(ABC, BaseModel): + """Abstract base class for event bus implementations. + + The event bus is responsible for routing events to appropriate handlers + and managing the overall event processing lifecycle. + """ + + # Bus configuration + name: str = Field(default="EventBus", description="Name of the event bus") + max_concurrent_events: int = Field(default=10, description="Maximum concurrent event processing") + + # Handler management + handlers: Dict[str, BaseEventHandler] = Field(default_factory=dict, description="Registered event handlers") + + # Processing state + active_events: Dict[str, BaseEvent] = Field(default_factory=dict, description="Currently processing events") + event_history: List[BaseEvent] = Field(default_factory=list, description="Event processing history") + max_history_size: int = Field(default=1000, description="Maximum number of events to keep in history") + + class Config: + arbitrary_types_allowed = True + extra = "allow" + + @abstractmethod + async def publish(self, event: BaseEvent) -> bool: + """Publish an event to the bus for processing. + + Args: + event: The event to publish + + Returns: + bool: True if event was published successfully + """ + pass + + @abstractmethod + async def subscribe(self, handler: BaseEventHandler) -> bool: + """Subscribe a handler to the event bus. + + Args: + handler: The event handler to register + + Returns: + bool: True if handler was registered successfully + """ + pass + + @abstractmethod + async def unsubscribe(self, handler_name: str) -> bool: + """Unsubscribe a handler from the event bus. + + Args: + handler_name: Name of the handler to unregister + + Returns: + bool: True if handler was unregistered successfully + """ + pass + + def get_handler(self, name: str) -> Optional[BaseEventHandler]: + """Get a registered handler by name. + + Args: + name: Name of the handler + + Returns: + Optional[BaseEventHandler]: The handler if found, None otherwise + """ + return self.handlers.get(name) + + def add_to_history(self, event: BaseEvent) -> None: + """Add an event to the processing history. + + Args: + event: The event to add to history + """ + self.event_history.append(event) + + # Maintain history size limit + if len(self.event_history) > self.max_history_size: + self.event_history = self.event_history[-self.max_history_size:] + + def get_event_stats(self) -> Dict[str, Any]: + """Get statistics about event processing. + + Returns: + Dict[str, Any]: Event processing statistics + """ + total_events = len(self.event_history) + active_count = len(self.active_events) + + status_counts = {} + for event in self.event_history: + status = event.status.value + status_counts[status] = status_counts.get(status, 0) + 1 + + return { + "total_events": total_events, + "active_events": active_count, + "registered_handlers": len(self.handlers), + "status_distribution": status_counts, + } diff --git a/app/event/core/types.py b/app/event/core/types.py new file mode 100644 index 000000000..dfab7e092 --- /dev/null +++ b/app/event/core/types.py @@ -0,0 +1,21 @@ +"""Event system type definitions.""" + +from enum import Enum + + +class EventStatus(str, Enum): + """Enumeration of possible event statuses.""" + + PENDING = "pending" + PROCESSING = "processing" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + + +class ToolExecutionStatus(str, Enum): + """Tool execution status.""" + STARTED = "started" + COMPLETED = "completed" + FAILED = "failed" + TIMEOUT = "timeout" diff --git a/app/event/domain/__init__.py b/app/event/domain/__init__.py new file mode 100644 index 000000000..b97b5e232 --- /dev/null +++ b/app/event/domain/__init__.py @@ -0,0 +1,106 @@ +"""Domain events package. + +This module contains all domain-specific events organized by business domain. +""" + +# Agent domain events +from .agent import ( + AgentEvent, + AgentStepStartEvent, + AgentStepCompleteEvent, + create_agent_step_start_event, + ChainableAgentEvent, + ChainableAgentStepStartEvent, + ChainableAgentStepCompleteEvent, + create_chainable_agent_step_start_event +) + +# Conversation domain events +from .conversation import ( + ConversationEvent, + ConversationCreatedEvent, + ConversationClosedEvent, + UserInputEvent, + InterruptEvent, + AgentResponseEvent, + LLMStreamEvent, + ToolResultDisplayEvent, + create_conversation_created_event, + create_user_input_event, + create_interrupt_event +) + +# Tool domain events +from .tool import ( + ToolEvent, + ToolExecutionEvent, + ToolResultEvent, + create_tool_execution_event, + ChainableToolEvent, + ChainableToolExecutionRequestEvent, + ChainableToolExecutionCompletedEvent, + create_chainable_tool_execution_request_event +) + +# System domain events +from .system import ( + SystemEvent, + SystemErrorEvent, + create_system_error_event, + ChainableSystemEvent, + ChainableLogWriteEvent, + ChainableMetricsUpdateEvent, + ChainableStreamEvent, + ChainableStreamStartEvent, + ChainableStreamChunkEvent, + ChainableStreamEndEvent, + ChainableStreamInterruptEvent +) + +__all__ = [ + # Agent events + "AgentEvent", + "AgentStepStartEvent", + "AgentStepCompleteEvent", + "create_agent_step_start_event", + "ChainableAgentEvent", + "ChainableAgentStepStartEvent", + "ChainableAgentStepCompleteEvent", + "create_chainable_agent_step_start_event", + + # Conversation events + "ConversationEvent", + "ConversationCreatedEvent", + "ConversationClosedEvent", + "UserInputEvent", + "InterruptEvent", + "AgentResponseEvent", + "LLMStreamEvent", + "ToolResultDisplayEvent", + "create_conversation_created_event", + "create_user_input_event", + "create_interrupt_event", + + # Tool events + "ToolEvent", + "ToolExecutionEvent", + "ToolResultEvent", + "create_tool_execution_event", + "ChainableToolEvent", + "ChainableToolExecutionRequestEvent", + "ChainableToolExecutionCompletedEvent", + "create_chainable_tool_execution_request_event", + + # System events + "SystemEvent", + "SystemErrorEvent", + "create_system_error_event", + "ChainableSystemEvent", + "ChainableLogWriteEvent", + "ChainableMetricsUpdateEvent", + "ChainableStreamEvent", + "ChainableStreamStartEvent", + "ChainableStreamChunkEvent", + "ChainableStreamEndEvent", + "ChainableStreamInterruptEvent" +] diff --git a/app/event/domain/agent/__init__.py b/app/event/domain/agent/__init__.py new file mode 100644 index 000000000..1eb9d5be3 --- /dev/null +++ b/app/event/domain/agent/__init__.py @@ -0,0 +1,33 @@ +"""Agent domain events. + +This module contains all agent-related events including step events, +response events, and chainable agent events. +""" + +from .events import ( + AgentEvent, + AgentStepStartEvent, + AgentStepCompleteEvent, + create_agent_step_start_event +) + +from .chainable import ( + ChainableAgentEvent, + ChainableAgentStepStartEvent, + ChainableAgentStepCompleteEvent, + create_chainable_agent_step_start_event +) + +__all__ = [ + # Basic agent events + "AgentEvent", + "AgentStepStartEvent", + "AgentStepCompleteEvent", + "create_agent_step_start_event", + + # Chainable agent events + "ChainableAgentEvent", + "ChainableAgentStepStartEvent", + "ChainableAgentStepCompleteEvent", + "create_chainable_agent_step_start_event" +] diff --git a/app/event/domain/agent/chainable.py b/app/event/domain/agent/chainable.py new file mode 100644 index 000000000..4a733964d --- /dev/null +++ b/app/event/domain/agent/chainable.py @@ -0,0 +1,73 @@ +"""Chainable agent events.""" + +from datetime import datetime +from typing import Optional + +from app.event.core.base import ChainableEvent + + +class ChainableAgentEvent(ChainableEvent): + """支持链式的智能体事件基类""" + + def __init__(self, agent_name: str, agent_type: str, conversation_id: Optional[str] = None, **kwargs): + super().__init__( + event_type=f"agent.{self.__class__.__name__.lower().replace('event', '')}", + data={ + "agent_name": agent_name, + "agent_type": agent_type, + "conversation_id": conversation_id, + "agent_id": agent_name # 用于上下文管理 + }, + **kwargs + ) + + +class ChainableAgentStepStartEvent(ChainableAgentEvent): + """智能体开始处理步骤事件(支持链式)""" + + def __init__(self, agent_name: str, agent_type: str, step_number: int, + conversation_id: Optional[str] = None, **kwargs): + super().__init__( + agent_name=agent_name, + agent_type=agent_type, + conversation_id=conversation_id, + **kwargs + ) + self.data.update({ + "step_number": step_number, + "start_time": datetime.now().isoformat(), + }) + + +class ChainableAgentStepCompleteEvent(ChainableAgentEvent): + """智能体完成处理步骤事件(支持链式)""" + + def __init__(self, agent_name: str, agent_type: str, step_number: int, + result: Optional[str] = None, conversation_id: Optional[str] = None, **kwargs): + super().__init__( + agent_name=agent_name, + agent_type=agent_type, + conversation_id=conversation_id, + **kwargs + ) + self.data.update({ + "step_number": step_number, + "result": result, + "complete_time": datetime.now().isoformat(), + }) + + +def create_chainable_agent_step_start_event( + agent_name: str, + agent_type: str, + step_number: int, + conversation_id: Optional[str] = None +) -> ChainableAgentStepStartEvent: + """创建智能体步骤开始事件""" + return ChainableAgentStepStartEvent( + agent_name=agent_name, + agent_type=agent_type, + step_number=step_number, + conversation_id=conversation_id, + source=agent_name + ) diff --git a/app/event/domain/agent/events.py b/app/event/domain/agent/events.py new file mode 100644 index 000000000..fc484aa79 --- /dev/null +++ b/app/event/domain/agent/events.py @@ -0,0 +1,69 @@ +"""Basic agent events.""" + +from datetime import datetime +from typing import Optional + +from app.event.core.base import BaseEvent + + +class AgentEvent(BaseEvent): + """Base class for all agent-related events.""" + + def __init__(self, agent_name: str, agent_type: str, conversation_id: Optional[str] = None, **kwargs): + super().__init__( + event_type=f"agent.{self.__class__.__name__.lower().replace('event', '')}", + data={ + "agent_name": agent_name, + "agent_type": agent_type, + }, + **kwargs + ) + if conversation_id: + self.conversation_id = conversation_id + + +class AgentStepStartEvent(AgentEvent): + """智能体开始处理事件""" + + def __init__(self, agent_name: str, agent_type: str, step_number: int, + conversation_id: Optional[str] = None, **kwargs): + super().__init__( + agent_name=agent_name, + agent_type=agent_type, + conversation_id=conversation_id, + **kwargs + ) + self.data.update({ + "step_number": step_number, + "start_time": datetime.now().isoformat(), + }) + + +class AgentStepCompleteEvent(AgentEvent): + """智能体完成处理事件""" + + def __init__(self, agent_name: str, agent_type: str, step_number: int, + result: Optional[str] = None, conversation_id: Optional[str] = None, **kwargs): + super().__init__( + agent_name=agent_name, + agent_type=agent_type, + conversation_id=conversation_id, + **kwargs + ) + self.data.update({ + "step_number": step_number, + "result": result, + "complete_time": datetime.now().isoformat(), + }) + + +def create_agent_step_start_event(agent_name: str, agent_type: str, step_number: int, + conversation_id: Optional[str] = None) -> AgentStepStartEvent: + """创建智能体开始处理事件""" + return AgentStepStartEvent( + agent_name=agent_name, + agent_type=agent_type, + step_number=step_number, + conversation_id=conversation_id, + source=agent_name + ) diff --git a/app/event/domain/conversation/__init__.py b/app/event/domain/conversation/__init__.py new file mode 100644 index 000000000..a087163e2 --- /dev/null +++ b/app/event/domain/conversation/__init__.py @@ -0,0 +1,36 @@ +"""Conversation domain events. + +This module contains all conversation-related events including user input, +agent responses, streaming events, and conversation lifecycle events. +""" + +from .events import ( + ConversationEvent, + ConversationCreatedEvent, + ConversationClosedEvent, + UserInputEvent, + InterruptEvent, + AgentResponseEvent, + LLMStreamEvent, + ToolResultDisplayEvent, + create_conversation_created_event, + create_user_input_event, + create_interrupt_event +) + +__all__ = [ + # Basic conversation events + "ConversationEvent", + "ConversationCreatedEvent", + "ConversationClosedEvent", + "UserInputEvent", + "InterruptEvent", + "AgentResponseEvent", + "LLMStreamEvent", + "ToolResultDisplayEvent", + + # Factory functions + "create_conversation_created_event", + "create_user_input_event", + "create_interrupt_event" +] diff --git a/app/event/domain/conversation/events.py b/app/event/domain/conversation/events.py new file mode 100644 index 000000000..e828ff099 --- /dev/null +++ b/app/event/domain/conversation/events.py @@ -0,0 +1,172 @@ +"""Conversation domain events.""" + +import uuid +from datetime import datetime +from typing import Optional + +from app.event.core.base import BaseEvent + + +class ConversationEvent(BaseEvent): + """Base class for all conversation-related events.""" + + def __init__(self, conversation_id: str, user_id: str, **kwargs): + super().__init__( + event_type=f"conversation.{self.__class__.__name__.lower().replace('event', '')}", + data={ + "conversation_id": conversation_id, + "user_id": user_id, + }, + **kwargs + ) + # 设置追踪信息 + self.conversation_id = conversation_id + self.user_id = user_id + + +class ConversationCreatedEvent(ConversationEvent): + """对话创建事件""" + + def __init__(self, conversation_id: str, user_id: str, title: Optional[str] = None, **kwargs): + super().__init__(conversation_id=conversation_id, user_id=user_id, **kwargs) + self.data.update({ + "title": title, + "created_at": datetime.now().isoformat(), + }) + + +class ConversationClosedEvent(ConversationEvent): + """对话关闭事件""" + + def __init__(self, conversation_id: str, user_id: str, reason: str = "user_closed", **kwargs): + super().__init__(conversation_id=conversation_id, user_id=user_id, **kwargs) + self.data.update({ + "reason": reason, + "closed_at": datetime.now().isoformat(), + }) + + +class UserInputEvent(ConversationEvent): + """用户输入事件""" + + def __init__(self, conversation_id: str, user_id: str, message: str, + message_id: Optional[str] = None, **kwargs): + super().__init__(conversation_id=conversation_id, user_id=user_id, **kwargs) + self.data.update({ + "message": message, + "message_id": message_id or str(uuid.uuid4()), + "input_length": len(message), + }) + + +class InterruptEvent(ConversationEvent): + """中断事件""" + + def __init__(self, conversation_id: str, user_id: str, reason: str = "user_interrupt", + interrupted_event_id: Optional[str] = None, **kwargs): + super().__init__(conversation_id=conversation_id, user_id=user_id, **kwargs) + self.data.update({ + "reason": reason, + "interrupted_event_id": interrupted_event_id, + "interrupt_time": datetime.now().isoformat(), + }) + + +class AgentResponseEvent(ConversationEvent): + """智能体响应事件""" + + def __init__(self, agent_name: str, agent_type: str, response: str, + conversation_id: str, user_id: Optional[str] = None, response_type: str = "text", **kwargs): + # For conversation events, we need user_id, but for agent responses it might not be directly available + # We'll use a placeholder or get it from the conversation context + super().__init__( + conversation_id=conversation_id, + user_id=user_id or "system", # Use system as fallback + **kwargs + ) + self.data.update({ + "response": response, + "response_type": response_type, + "response_length": len(response), + "response_time": datetime.now().isoformat(), + "agent_name": agent_name, + "agent_type": agent_type, + }) + + +class LLMStreamEvent(ConversationEvent): + """LLM流式输出事件""" + + def __init__(self, agent_name: str, agent_type: str, content: str, + is_complete: bool = False, conversation_id: str = None, + user_id: Optional[str] = None, **kwargs): + super().__init__( + conversation_id=conversation_id, + user_id=user_id or "system", + **kwargs + ) + self.data.update({ + "content": content, + "is_complete": is_complete, + "agent_name": agent_name, + "agent_type": agent_type, + "timestamp": datetime.now().isoformat(), + }) + + +class ToolResultDisplayEvent(ConversationEvent): + """工具结果显示事件""" + + def __init__(self, tool_name: str, result: str, conversation_id: str = None, + user_id: Optional[str] = None, truncated: bool = False, **kwargs): + super().__init__( + conversation_id=conversation_id, + user_id=user_id or "system", + **kwargs + ) + self.data.update({ + "tool_name": tool_name, + "result": result, + "truncated": truncated, + "timestamp": datetime.now().isoformat(), + }) + + +# ============================================================================ +# Event Factory Functions +# ============================================================================ + +def create_conversation_created_event(conversation_id: str, user_id: str, + title: Optional[str] = None) -> ConversationCreatedEvent: + """创建对话创建事件""" + return ConversationCreatedEvent( + conversation_id=conversation_id, + user_id=user_id, + title=title, + source="conversation_service" + ) + + +def create_user_input_event(conversation_id: str, user_id: str, message: str, + parent_event_id: Optional[str] = None) -> UserInputEvent: + """创建用户输入事件""" + event = UserInputEvent( + conversation_id=conversation_id, + user_id=user_id, + message=message, + source="user_interface" + ) + if parent_event_id: + event.parent_events = [parent_event_id] + return event + + +def create_interrupt_event(conversation_id: str, user_id: str, + interrupted_event_id: Optional[str] = None) -> InterruptEvent: + """创建中断事件""" + return InterruptEvent( + conversation_id=conversation_id, + user_id=user_id, + interrupted_event_id=interrupted_event_id, + source="user_interface" + ) diff --git a/app/event/domain/system/__init__.py b/app/event/domain/system/__init__.py new file mode 100644 index 000000000..b6a3db1ea --- /dev/null +++ b/app/event/domain/system/__init__.py @@ -0,0 +1,39 @@ +"""System domain events. + +This module contains all system-related events including error events, +logging events, metrics events, and streaming events. +""" + +from .events import ( + SystemEvent, + SystemErrorEvent, + create_system_error_event +) + +from .chainable import ( + ChainableSystemEvent, + ChainableLogWriteEvent, + ChainableMetricsUpdateEvent, + ChainableStreamEvent, + ChainableStreamStartEvent, + ChainableStreamChunkEvent, + ChainableStreamEndEvent, + ChainableStreamInterruptEvent +) + +__all__ = [ + # Basic system events + "SystemEvent", + "SystemErrorEvent", + "create_system_error_event", + + # Chainable system events + "ChainableSystemEvent", + "ChainableLogWriteEvent", + "ChainableMetricsUpdateEvent", + "ChainableStreamEvent", + "ChainableStreamStartEvent", + "ChainableStreamChunkEvent", + "ChainableStreamEndEvent", + "ChainableStreamInterruptEvent" +] diff --git a/app/event/domain/system/chainable.py b/app/event/domain/system/chainable.py new file mode 100644 index 000000000..68c5747ac --- /dev/null +++ b/app/event/domain/system/chainable.py @@ -0,0 +1,136 @@ +"""Chainable system events.""" + +from datetime import datetime +from typing import Any, Optional + +from app.event.core.base import ChainableEvent + + +class ChainableSystemEvent(ChainableEvent): + """支持链式的系统事件基类""" + + def __init__(self, component: str, conversation_id: Optional[str] = None, **kwargs): + super().__init__( + event_type=f"system.{self.__class__.__name__.lower().replace('event', '')}", + data={ + "component": component, + "conversation_id": conversation_id, + }, + **kwargs + ) + + +class ChainableLogWriteEvent(ChainableSystemEvent): + """日志写入事件(支持链式)""" + + def __init__(self, component: str, log_level: str, message: str, + conversation_id: Optional[str] = None, **kwargs): + super().__init__( + component=component, + conversation_id=conversation_id, + **kwargs + ) + self.data.update({ + "log_level": log_level, + "message": message, + "timestamp": datetime.now().isoformat(), + }) + + +class ChainableMetricsUpdateEvent(ChainableSystemEvent): + """指标更新事件(支持链式)""" + + def __init__(self, component: str, metric_name: str, value: Any, + conversation_id: Optional[str] = None, **kwargs): + super().__init__( + component=component, + conversation_id=conversation_id, + **kwargs + ) + self.data.update({ + "metric_name": metric_name, + "value": value, + "timestamp": datetime.now().isoformat(), + }) + + +class ChainableStreamEvent(ChainableEvent): + """支持链式的流式输出事件基类""" + + def __init__(self, agent_id: str, conversation_id: str, stream_id: str, **kwargs): + super().__init__( + event_type=f"stream.{self.__class__.__name__.lower().replace('event', '')}", + data={ + "agent_id": agent_id, + "conversation_id": conversation_id, + "stream_id": stream_id, + }, + **kwargs + ) + + +class ChainableStreamStartEvent(ChainableStreamEvent): + """流式输出开始事件(支持链式)""" + + def __init__(self, agent_id: str, conversation_id: str, stream_id: str, **kwargs): + super().__init__( + agent_id=agent_id, + conversation_id=conversation_id, + stream_id=stream_id, + **kwargs + ) + self.data.update({ + "start_time": datetime.now().isoformat(), + }) + + +class ChainableStreamChunkEvent(ChainableStreamEvent): + """流式输出片段事件(支持链式)""" + + def __init__(self, agent_id: str, conversation_id: str, stream_id: str, + chunk_data: str, chunk_index: int, **kwargs): + super().__init__( + agent_id=agent_id, + conversation_id=conversation_id, + stream_id=stream_id, + **kwargs + ) + self.data.update({ + "chunk_data": chunk_data, + "chunk_index": chunk_index, + "timestamp": datetime.now().isoformat(), + }) + + +class ChainableStreamEndEvent(ChainableStreamEvent): + """流式输出结束事件(支持链式)""" + + def __init__(self, agent_id: str, conversation_id: str, stream_id: str, + total_chunks: int, **kwargs): + super().__init__( + agent_id=agent_id, + conversation_id=conversation_id, + stream_id=stream_id, + **kwargs + ) + self.data.update({ + "total_chunks": total_chunks, + "end_time": datetime.now().isoformat(), + }) + + +class ChainableStreamInterruptEvent(ChainableStreamEvent): + """流式输出中断事件(支持链式)""" + + def __init__(self, agent_id: str, conversation_id: str, stream_id: str, + new_user_input: str, **kwargs): + super().__init__( + agent_id=agent_id, + conversation_id=conversation_id, + stream_id=stream_id, + **kwargs + ) + self.data.update({ + "new_user_input": new_user_input, + "interrupt_time": datetime.now().isoformat(), + }) diff --git a/app/event/domain/system/events.py b/app/event/domain/system/events.py new file mode 100644 index 000000000..78d66b052 --- /dev/null +++ b/app/event/domain/system/events.py @@ -0,0 +1,46 @@ +"""Basic system events.""" + +from typing import Optional, Dict, Any + +from app.event.core.base import BaseEvent + + +class SystemEvent(BaseEvent): + """Base class for all system-related events.""" + + def __init__(self, component: str, **kwargs): + super().__init__( + event_type=f"system.{self.__class__.__name__.lower().replace('event', '')}", + data={"component": component}, + **kwargs + ) + + +class SystemErrorEvent(SystemEvent): + """系统错误事件""" + + def __init__(self, component: str, error_type: str, error_message: str, + context: Optional[Dict[str, Any]] = None, conversation_id: Optional[str] = None, **kwargs): + super().__init__( + component=component, + **kwargs + ) + self.data.update({ + "error_type": error_type, + "error_message": error_message, + "context": context or {}, + }) + if conversation_id: + self.conversation_id = conversation_id + + +def create_system_error_event(component: str, error_type: str, error_message: str, + conversation_id: Optional[str] = None) -> SystemErrorEvent: + """创建系统错误事件""" + return SystemErrorEvent( + component=component, + error_type=error_type, + error_message=error_message, + conversation_id=conversation_id, + source=component + ) diff --git a/app/event/domain/system/test_events.py b/app/event/domain/system/test_events.py new file mode 100644 index 000000000..b84d02c62 --- /dev/null +++ b/app/event/domain/system/test_events.py @@ -0,0 +1,26 @@ +"""Test events for the event system.""" + +from app.event.core.base import BaseEvent + + +class TestEvent(BaseEvent): + """Basic test event.""" + + def __init__(self, a: str, b: str, priority: int = 0, **kwargs): + super().__init__( + event_type="test.basic", + data={"a": a, "b": b}, + **kwargs + ) + + +class TestAddEvent(BaseEvent): + """Test event for addition operations.""" + + def __init__(self, a: str, b: str, priority: int = 0, **kwargs): + super().__init__( + event_type="test.testadd", + data={"a": a, "b": b}, + **kwargs + ) + diff --git a/app/event/domain/tool/__init__.py b/app/event/domain/tool/__init__.py new file mode 100644 index 000000000..7fda5dc8e --- /dev/null +++ b/app/event/domain/tool/__init__.py @@ -0,0 +1,33 @@ +"""Tool domain events. + +This module contains all tool-related events including execution events, +result events, and chainable tool events. +""" + +from .events import ( + ToolEvent, + ToolExecutionEvent, + ToolResultEvent, + create_tool_execution_event +) + +from .chainable import ( + ChainableToolEvent, + ChainableToolExecutionRequestEvent, + ChainableToolExecutionCompletedEvent, + create_chainable_tool_execution_request_event +) + +__all__ = [ + # Basic tool events + "ToolEvent", + "ToolExecutionEvent", + "ToolResultEvent", + "create_tool_execution_event", + + # Chainable tool events + "ChainableToolEvent", + "ChainableToolExecutionRequestEvent", + "ChainableToolExecutionCompletedEvent", + "create_chainable_tool_execution_request_event" +] diff --git a/app/event/domain/tool/chainable.py b/app/event/domain/tool/chainable.py new file mode 100644 index 000000000..05c291f4b --- /dev/null +++ b/app/event/domain/tool/chainable.py @@ -0,0 +1,72 @@ +"""Chainable tool events.""" + +from datetime import datetime +from typing import Any, Dict, Optional + +from app.event.core.base import ChainableEvent + + +class ChainableToolEvent(ChainableEvent): + """支持链式的工具事件基类""" + + def __init__(self, tool_name: str, agent_id: str, conversation_id: Optional[str] = None, **kwargs): + super().__init__( + event_type=f"tool.{self.__class__.__name__.lower().replace('event', '')}", + data={ + "tool_name": tool_name, + "agent_id": agent_id, + "conversation_id": conversation_id, + }, + **kwargs + ) + + +class ChainableToolExecutionRequestEvent(ChainableToolEvent): + """工具执行请求事件(支持链式)""" + + def __init__(self, tool_name: str, args: Dict[str, Any], agent_id: str, + conversation_id: Optional[str] = None, **kwargs): + super().__init__( + tool_name=tool_name, + agent_id=agent_id, + conversation_id=conversation_id, + **kwargs + ) + self.data.update({ + "args": args, + "request_time": datetime.now().isoformat(), + }) + + +class ChainableToolExecutionCompletedEvent(ChainableToolEvent): + """工具执行完成事件(支持链式)""" + + def __init__(self, tool_name: str, agent_id: str, result: Any, success: bool = True, + conversation_id: Optional[str] = None, **kwargs): + super().__init__( + tool_name=tool_name, + agent_id=agent_id, + conversation_id=conversation_id, + **kwargs + ) + self.data.update({ + "result": result, + "success": success, + "complete_time": datetime.now().isoformat(), + }) + + +def create_chainable_tool_execution_request_event( + tool_name: str, + args: Dict[str, Any], + agent_id: str, + conversation_id: Optional[str] = None +) -> ChainableToolExecutionRequestEvent: + """创建工具执行请求事件""" + return ChainableToolExecutionRequestEvent( + tool_name=tool_name, + args=args, + agent_id=agent_id, + conversation_id=conversation_id, + source=agent_id + ) diff --git a/app/event/domain/tool/events.py b/app/event/domain/tool/events.py new file mode 100644 index 000000000..6f3483b12 --- /dev/null +++ b/app/event/domain/tool/events.py @@ -0,0 +1,74 @@ +"""Basic tool events.""" + +from datetime import datetime +from typing import Optional, Dict, Any + +from app.event.core.base import BaseEvent +from app.event.core.types import ToolExecutionStatus + + +class ToolEvent(BaseEvent): + """Base class for all tool-related events.""" + + def __init__(self, tool_name: str, tool_type: str, conversation_id: Optional[str] = None, **kwargs): + super().__init__( + event_type=f"tool.{self.__class__.__name__.lower().replace('event', '')}", + data={ + "tool_name": tool_name, + "tool_type": tool_type, + }, + **kwargs + ) + if conversation_id: + self.conversation_id = conversation_id + + +class ToolExecutionEvent(ToolEvent): + """工具执行事件""" + + def __init__(self, tool_name: str, tool_type: str, status: ToolExecutionStatus, + parameters: Dict[str, Any], result: Any = None, + execution_time: Optional[float] = None, conversation_id: Optional[str] = None, **kwargs): + super().__init__( + tool_name=tool_name, + tool_type=tool_type, + conversation_id=conversation_id, + **kwargs + ) + self.data.update({ + "status": status.value, + "parameters": parameters, + "result": str(result) if result is not None else None, + "execution_time": execution_time, + }) + + +class ToolResultEvent(ToolEvent): + """工具结果事件""" + + def __init__(self, tool_name: str, tool_type: str, result: Any, success: bool = True, + error_message: Optional[str] = None, conversation_id: Optional[str] = None, **kwargs): + super().__init__( + tool_name=tool_name, + tool_type=tool_type, + conversation_id=conversation_id, + **kwargs + ) + self.data.update({ + "result": str(result) if result is not None else None, + "success": success, + "error_message": error_message, + }) + + +def create_tool_execution_event(tool_name: str, tool_type: str, status: str, + parameters: Dict[str, Any], conversation_id: Optional[str] = None) -> ToolExecutionEvent: + """创建工具执行事件""" + return ToolExecutionEvent( + tool_name=tool_name, + tool_type=tool_type, + status=ToolExecutionStatus(status), + parameters=parameters, + conversation_id=conversation_id, + source="tool_system" + ) diff --git a/app/event/infrastructure/__init__.py b/app/event/infrastructure/__init__.py new file mode 100644 index 000000000..8dd0cb051 --- /dev/null +++ b/app/event/infrastructure/__init__.py @@ -0,0 +1,65 @@ +"""Infrastructure layer components. + +This module contains the concrete implementations of the event system +infrastructure including registries, middleware, and bus implementations. +""" + +# Registry system +from .registry import ( + EventHandlerRegistry, + HandlerInfo, + event_handler, + get_global_registry +) + +# Middleware system +from .middleware import ( + BaseMiddleware, + MiddlewareChain, + MiddlewareContext, + LoggingMiddleware, + RetryMiddleware, + ErrorIsolationMiddleware, + MetricsMiddleware, + create_default_middleware_chain +) + +# Bus implementations +from .bus import ( + SimpleEventBus, + ChainableEventBus, + get_global_bus, + set_global_bus, + publish_event, + subscribe_handler, + unsubscribe_handler, + get_bus_stats +) + +__all__ = [ + # Registry system + "EventHandlerRegistry", + "HandlerInfo", + "event_handler", + "get_global_registry", + + # Middleware system + "BaseMiddleware", + "MiddlewareChain", + "MiddlewareContext", + "LoggingMiddleware", + "RetryMiddleware", + "ErrorIsolationMiddleware", + "MetricsMiddleware", + "create_default_middleware_chain", + + # Bus implementations + "SimpleEventBus", + "ChainableEventBus", + "get_global_bus", + "set_global_bus", + "publish_event", + "subscribe_handler", + "unsubscribe_handler", + "get_bus_stats" +] diff --git a/app/event/infrastructure/bus/__init__.py b/app/event/infrastructure/bus/__init__.py new file mode 100644 index 000000000..99b35975e --- /dev/null +++ b/app/event/infrastructure/bus/__init__.py @@ -0,0 +1,31 @@ +"""Event bus implementations. + +This module contains concrete implementations of event buses including +simple bus and chainable bus with interrupt support. +""" + +from .simple_bus import ( + SimpleEventBus, + get_global_bus, + set_global_bus, + publish_event, + subscribe_handler, + unsubscribe_handler, + get_bus_stats +) + +from .chainable_bus import ChainableEventBus + +__all__ = [ + # Simple event bus + "SimpleEventBus", + "get_global_bus", + "set_global_bus", + "publish_event", + "subscribe_handler", + "unsubscribe_handler", + "get_bus_stats", + + # Chainable event bus + "ChainableEventBus" +] diff --git a/app/event/infrastructure/bus/basic_bus.py b/app/event/infrastructure/bus/basic_bus.py new file mode 100644 index 000000000..7eed2fc12 --- /dev/null +++ b/app/event/infrastructure/bus/basic_bus.py @@ -0,0 +1,187 @@ +"""Basic event bus implementation. + +This module provides a simple, clean event bus implementation focused on +core functionality: event publishing, handler dispatch, and middleware processing. +""" + +import asyncio +from typing import Any, Dict, List, Optional +from concurrent.futures import ThreadPoolExecutor + +from app.logger import logger +from app.event.core.base import BaseEvent, BaseEventBus +from app.event.infrastructure.registry import EventHandlerRegistry, get_global_registry +from app.event.infrastructure.middleware import MiddlewareChain, MiddlewareContext, create_default_middleware_chain + + +class BasicEventBus(BaseEventBus): + """Basic event bus with simple async dispatch and registry support. + + This implementation focuses on core functionality: + - Event publishing and handler dispatch + - Middleware processing + - Basic statistics and metrics + + For advanced features like event chains and interruption, extend this class. + """ + + def __init__( + self, + name: str = "BasicEventBus", + max_concurrent_events: int = 10, + registry: Optional[EventHandlerRegistry] = None, + middleware_chain: Optional[MiddlewareChain] = None + ): + super().__init__(name=name, max_concurrent_events=max_concurrent_events) + + # Use provided registry or global registry + self.registry = registry or get_global_registry() + + # Use provided middleware chain or create default + self.middleware_chain = middleware_chain or create_default_middleware_chain() + + # Processing semaphore to limit concurrent events + self.processing_semaphore = asyncio.Semaphore(max_concurrent_events) + + async def publish(self, event: BaseEvent) -> bool: + """Publish an event to the bus for processing. + + Args: + event: The event to publish + + Returns: + bool: True if at least one handler processed the event successfully + """ + # Add to active events + self.active_events[event.event_id] = event + + try: + # Get matching handlers + handlers = self.registry.get_handlers_for_event(event.event_type) + + if not handlers: + logger.debug(f"No handlers found for event type: {event.event_type}") + event.mark_completed() + return False + + # Process handlers + success_count = 0 + async with self.processing_semaphore: + for handler_info in handlers: + try: + # Create middleware context + context = MiddlewareContext( + event=event, + handler_name=handler_info.name + ) + + # Process through middleware chain + async def handler_wrapper(): + return await handler_info.handler(event) + + success = await self.middleware_chain.process(context, handler_wrapper) + + if success: + success_count += 1 + logger.debug(f"Handler '{handler_info.name}' successfully processed event {event.event_id}") + else: + logger.warning(f"Handler '{handler_info.name}' failed to process event {event.event_id}") + + except Exception as e: + logger.error(f"Error in handler '{handler_info.name}' for event {event.event_id}: {str(e)}") + + # Mark event as completed if at least one handler succeeded + if success_count > 0: + event.mark_completed() + return True + else: + event.mark_failed("No handlers processed the event successfully") + return False + + except Exception as e: + logger.error(f"Error publishing event {event.event_id}: {str(e)}") + event.mark_failed(str(e)) + return False + finally: + # Remove from active events and add to history + if event.event_id in self.active_events: + del self.active_events[event.event_id] + self.add_to_history(event) + + async def subscribe(self, handler) -> bool: + """Subscribe a handler to the event bus. + + Args: + handler: The event handler to register + + Returns: + bool: True if handler was registered successfully + """ + # This is handled by the registry system + logger.info(f"Handler subscription should be done through the registry system") + return True + + async def unsubscribe(self, handler_name: str) -> bool: + """Unsubscribe a handler from the event bus. + + Args: + handler_name: Name of the handler to unregister + + Returns: + bool: True if handler was unregistered successfully + """ + return self.registry.unregister_handler(handler_name) + + def get_metrics(self) -> Dict[str, Any]: + """Get bus-specific metrics. + + Returns: + Dict[str, Any]: Bus metrics + """ + return { + "bus_name": self.name, + "max_concurrent_events": self.max_concurrent_events, + "registry_handlers": len(self.registry._handlers), + "middleware_enabled": self.middleware_chain is not None, + } + + +# Global bus instance +_global_bus: Optional[BasicEventBus] = None + + +def get_global_bus() -> BasicEventBus: + """Get the global event bus instance.""" + global _global_bus + if _global_bus is None: + _global_bus = BasicEventBus() + return _global_bus + + +def set_global_bus(bus: BasicEventBus) -> None: + """Set the global event bus instance.""" + global _global_bus + _global_bus = bus + + +async def publish_event(event: BaseEvent) -> bool: + """Publish an event using the global bus.""" + return await get_global_bus().publish(event) + + +async def subscribe_handler(handler) -> bool: + """Subscribe a handler using the global bus.""" + return await get_global_bus().subscribe(handler) + + +async def unsubscribe_handler(handler_name: str) -> bool: + """Unsubscribe a handler using the global bus.""" + return await get_global_bus().unsubscribe(handler_name) + + +def get_bus_stats() -> Dict[str, Any]: + """Get statistics from the global bus.""" + bus = get_global_bus() + stats = bus.get_event_stats() + stats.update(bus.get_metrics()) + return stats diff --git a/app/event/infrastructure/bus/chainable_bus.py b/app/event/infrastructure/bus/chainable_bus.py new file mode 100644 index 000000000..c01fd1835 --- /dev/null +++ b/app/event/infrastructure/bus/chainable_bus.py @@ -0,0 +1,285 @@ +"""Chainable event bus implementation with interrupt support. + +This module provides an enhanced event bus that extends SimpleEventBus with: +1. Event chain management and context propagation +2. Cascading event interruption support +3. Conversation/agent-based event grouping +4. Context-aware event processing + +This follows the layered architecture principle - ChainableEventBus builds upon +SimpleEventBus core functionality and adds advanced chain management features. +""" + +import asyncio +from typing import Any, Dict, List, Optional, Set, Union + +from app.logger import logger +from app.event.core.base import BaseEvent, ChainableEvent, EventContext +from app.event.infrastructure.bus.simple_bus import SimpleEventBus + + +class ChainableEventBus(SimpleEventBus): + """Event bus with support for event chains and interruption. + + Extends SimpleEventBus with advanced features: + - Event chain management and context propagation + - Cascading event interruption + - Conversation/agent-based event grouping + - Context-aware event processing + + Design principle: Build upon SimpleEventBus core functionality, + adding chain management without breaking the basic event processing. + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + # Event chain management for interruption support + self.conversation_contexts: Dict[str, EventContext] = {} + self.active_event_chains: Dict[str, Set[str]] = {} # root_event_id -> {child_event_ids} + + async def publish(self, event: Union[BaseEvent, ChainableEvent]) -> bool: + """Publish an event with optional chain and interruption support. + + Args: + event: The event to publish (BaseEvent or ChainableEvent) + + Returns: + bool: True if at least one handler processed the event successfully + """ + # If it's a ChainableEvent, use enhanced processing with context management + if isinstance(event, ChainableEvent): + return await self._publish_chainable_event(event) + else: + # For regular BaseEvent, use parent class logic + return await super().publish(event) + + async def _publish_chainable_event(self, event: ChainableEvent) -> bool: + """Publish a chainable event with context management and interruption support.""" + + # 1. Check if event is already cancelled + if event.is_cancelled(): + logger.info(f"ChainableEvent {event.event_id} was cancelled before processing") + return False + + # 2. Initialize context if this is a root event + if not event.context: + self._initialize_event_context(event) + + # 3. Add event to execution chain + self._add_to_execution_chain(event) + + # 4. Use parent class processing but with context awareness + try: + # Add to active events for tracking + self.active_events[event.event_id] = event + + # Get matching handlers from registry + independent_handlers, dependent_handlers = self.registry.get_handlers_for_event(event.event_type) + all_handlers = independent_handlers + dependent_handlers + + if not all_handlers: + logger.debug(f"No handlers found for event type: {event.event_type}") + event.mark_completed() + return False + + # Process handlers with context awareness and interruption checks + success_count = 0 + async with self.processing_semaphore: + for handler_info in all_handlers: + # Check for cancellation before each handler + if event.is_cancelled(): + logger.info(f"Event {event.event_id} was cancelled during processing") + break + + try: + # Create middleware context + from app.event.infrastructure.middleware import MiddlewareContext + context = MiddlewareContext( + event=event, + handler_name=handler_info.name + ) + + # Define context-aware handler wrapper + async def handler_wrapper(middleware_context): + # Final cancellation check before handler execution + if event.is_cancelled(): + return False + # Check if handler is async or sync + import inspect + if inspect.iscoroutinefunction(handler_info.handler): + return await handler_info.handler(event) + else: + return handler_info.handler(event) + + # Process through middleware chain + success = await self.middleware_chain.process(context, handler_wrapper) + + if success: + success_count += 1 + logger.debug(f"Handler '{handler_info.name}' successfully processed chainable event {event.event_id}") + else: + logger.warning(f"Handler '{handler_info.name}' failed to process chainable event {event.event_id}") + + except Exception as e: + logger.error(f"Error in handler '{handler_info.name}' for chainable event {event.event_id}: {str(e)}") + + # Update event status based on results + if success_count > 0: + event.mark_completed() + return True + else: + event.mark_failed("No handlers processed the chainable event successfully") + return False + + except Exception as e: + logger.error(f"Error publishing chainable event {event.event_id}: {str(e)}") + event.mark_failed(str(e)) + return False + finally: + # Cleanup: remove from active events and add to history + if event.event_id in self.active_events: + del self.active_events[event.event_id] + self.add_to_history(event) + self._cleanup_event_chain(event) + + def _initialize_event_context(self, event: ChainableEvent) -> None: + """Initialize event context for a root event.""" + conversation_id = event.get_conversation_id() or 'default' + agent_id = event.get_agent_id() or 'unknown' + + cancellation_token = asyncio.Event() + event.context = EventContext( + root_event_id=event.event_id, + conversation_id=conversation_id, + agent_id=agent_id, + execution_chain=[], + cancellation_token=cancellation_token + ) + + # Save context for conversation-level management + self.conversation_contexts[conversation_id] = event.context + self.active_event_chains[event.event_id] = {event.event_id} + + logger.debug(f"Created new event chain for conversation {conversation_id}, root event {event.event_id}") + + def _add_to_execution_chain(self, event: ChainableEvent) -> None: + """Add event to the execution chain.""" + if event.context and event.context.root_event_id in self.active_event_chains: + self.active_event_chains[event.context.root_event_id].add(event.event_id) + event.context.execution_chain.append(event.event_id) + + def _cleanup_event_chain(self, event: ChainableEvent) -> None: + """Clean up event chain tracking. + + Note: We keep the chain and context alive for potential interruption + until explicitly cleaned up or after a timeout. + """ + if event.context and event.context.root_event_id in self.active_event_chains: + chain = self.active_event_chains[event.context.root_event_id] + # Remove this specific event from the chain but keep the chain alive + chain.discard(event.event_id) + + # Only clean up if this is explicitly a chain termination event + # or if the chain has been marked for cleanup + # For now, we keep chains alive to support interruption + + async def interrupt_conversation(self, conversation_id: str, reason: str = "user_interrupt") -> bool: + """Interrupt all active events in a conversation. + + Args: + conversation_id: The conversation to interrupt + reason: Reason for interruption + + Returns: + bool: True if interruption was successful + """ + if conversation_id not in self.conversation_contexts: + logger.warning(f"No active conversation found for ID: {conversation_id}") + return False + + context = self.conversation_contexts[conversation_id] + context.cancellation_token.set() + + logger.info(f"Interrupted conversation {conversation_id}, reason: {reason}") + return True + + async def interrupt_event_chain(self, root_event_id: str, reason: str = "chain_interrupt") -> bool: + """Interrupt a specific event chain. + + Args: + root_event_id: The root event ID of the chain to interrupt + reason: Reason for interruption + + Returns: + bool: True if interruption was successful + """ + if root_event_id not in self.active_event_chains: + logger.warning(f"No active event chain found for root event: {root_event_id}") + return False + + # Find the context and set cancellation token + for context in self.conversation_contexts.values(): + if context.root_event_id == root_event_id: + context.cancellation_token.set() + logger.info(f"Interrupted event chain {root_event_id}, reason: {reason}") + return True + + logger.warning(f"Could not find context for event chain: {root_event_id}") + return False + + async def end_event_chain(self, root_event_id: str, reason: str = "chain_complete") -> bool: + """Explicitly end an event chain and clean up resources. + + Args: + root_event_id: The root event ID of the chain to end + reason: Reason for ending the chain + + Returns: + bool: True if chain was ended successfully + """ + if root_event_id not in self.active_event_chains: + logger.warning(f"No active event chain found for root event: {root_event_id}") + return False + + # Clean up the chain + del self.active_event_chains[root_event_id] + + # Clean up associated conversation context + for conv_id, context in list(self.conversation_contexts.items()): + if context.root_event_id == root_event_id: + del self.conversation_contexts[conv_id] + break + + logger.info(f"Ended event chain {root_event_id}, reason: {reason}") + return True + + def get_active_chains(self) -> Dict[str, Any]: + """Get information about active event chains. + + Returns: + Dict containing active chain information + """ + return { + "active_conversations": list(self.conversation_contexts.keys()), + "active_chains": { + root_id: list(chain) + for root_id, chain in self.active_event_chains.items() + }, + "total_active_events": sum(len(chain) for chain in self.active_event_chains.values()) + } + + def get_metrics(self) -> Dict[str, Any]: + """Get bus-specific metrics including chain information. + + Returns: + Dict[str, Any]: Enhanced bus metrics with chain data + """ + metrics = super().get_metrics() + metrics.update({ + "active_conversations": len(self.conversation_contexts), + "active_event_chains": len(self.active_event_chains), + "total_chained_events": sum(len(chain) for chain in self.active_event_chains.values()), + "supports_interruption": True, + }) + return metrics diff --git a/app/event/infrastructure/bus/simple_bus.py b/app/event/infrastructure/bus/simple_bus.py new file mode 100644 index 000000000..3022e795c --- /dev/null +++ b/app/event/infrastructure/bus/simple_bus.py @@ -0,0 +1,242 @@ +"""Simple event bus implementation. + +This module provides a clean, simple event bus focused on core functionality: +- Event publishing and handler dispatch +- Middleware processing +- Basic statistics and metrics + +This follows the single responsibility principle - SimpleEventBus only handles +basic event processing. For advanced features like event chains and interruption, +use ChainableEventBus which extends this class. +""" + +import asyncio +from typing import Any, Dict, Optional + +from app.logger import logger +from app.event.core.base import BaseEvent, BaseEventBus +from app.event.infrastructure.registry import EventHandlerRegistry, get_global_registry +from app.event.infrastructure.middleware import MiddlewareChain, MiddlewareContext, create_default_middleware_chain + + +class SimpleEventBus(BaseEventBus): + """Simple event bus with basic async dispatch and registry support. + + This implementation focuses on core functionality: + - Event publishing and handler dispatch + - Middleware processing + - Basic statistics and metrics + + Design principle: Keep it simple and focused. Advanced features like + event chains, interruption, and context management are handled by + ChainableEventBus which extends this class. + """ + + def __init__( + self, + name: str = "SimpleEventBus", + max_concurrent_events: int = 10, + registry: Optional[EventHandlerRegistry] = None, + middleware_chain: Optional[MiddlewareChain] = None + ): + super().__init__(name=name, max_concurrent_events=max_concurrent_events) + + # Use provided registry or global registry + self.registry = registry or get_global_registry() + + # Use provided middleware chain or create default + self.middleware_chain = middleware_chain or create_default_middleware_chain() + + # Processing semaphore to limit concurrent events + self.processing_semaphore = asyncio.Semaphore(max_concurrent_events) + + async def publish(self, event: BaseEvent) -> bool: + """Publish an event to the bus for processing. + + This is the core method that: + 1. Finds matching handlers for the event + 2. Processes each handler through the middleware chain + 3. Tracks success/failure and updates event status + + Args: + event: The event to publish + + Returns: + bool: True if at least one handler processed the event successfully + """ + # Add to active events for tracking + self.active_events[event.event_id] = event + + try: + # Get matching handlers from registry + independent_handlers, dependent_handlers = self.registry.get_handlers_for_event(event.event_type) + all_handlers = independent_handlers + dependent_handlers + + if not all_handlers: + logger.debug(f"No handlers found for event type: {event.event_type}") + event.mark_completed() + return False + + # Process handlers with concurrency control + success_count = 0 + async with self.processing_semaphore: + for handler_info in all_handlers: + try: + # Create middleware context + context = MiddlewareContext( + event=event, + handler_name=handler_info.name + ) + + # Define handler wrapper for middleware + async def handler_wrapper(middleware_context): + # Check if handler is async or sync + import inspect + if inspect.iscoroutinefunction(handler_info.handler): + return await handler_info.handler(event) + else: + return handler_info.handler(event) + + # Process through middleware chain + success = await self.middleware_chain.process(context, handler_wrapper) + + if success: + success_count += 1 + logger.debug(f"Handler '{handler_info.name}' successfully processed event {event.event_id}") + else: + logger.warning(f"Handler '{handler_info.name}' failed to process event {event.event_id}") + + except Exception as e: + logger.error(f"Error in handler '{handler_info.name}' for event {event.event_id}: {str(e)}") + + # Update event status based on results + if success_count > 0: + event.mark_completed() + return True + else: + event.mark_failed("No handlers processed the event successfully") + return False + + except Exception as e: + logger.error(f"Error publishing event {event.event_id}: {str(e)}") + event.mark_failed(str(e)) + return False + finally: + # Cleanup: remove from active events and add to history + if event.event_id in self.active_events: + del self.active_events[event.event_id] + self.add_to_history(event) + + async def subscribe(self, handler) -> bool: + """Subscribe a handler to the event bus. + + Note: Handler registration is managed by the registry system. + Use the @event_handler decorator or registry.register_handler() instead. + + Args: + handler: The event handler to register + + Returns: + bool: True (registration is handled by registry) + """ + logger.info("Handler subscription should be done through the registry system using @event_handler decorator") + return True + + async def unsubscribe(self, handler_name: str) -> bool: + """Unsubscribe a handler from the event bus. + + Args: + handler_name: Name of the handler to unregister + + Returns: + bool: True if handler was unregistered successfully + """ + return self.registry.unregister_handler(handler_name) + + def get_metrics(self) -> Dict[str, Any]: + """Get bus-specific metrics. + + Returns: + Dict[str, Any]: Bus metrics including registry and middleware info + """ + return { + "bus_name": self.name, + "max_concurrent_events": self.max_concurrent_events, + "registry_handlers": len(self.registry._handlers), + "middleware_enabled": self.middleware_chain is not None, + } + + +# Global bus instance management +_global_bus: Optional[SimpleEventBus] = None + + +def get_global_bus() -> SimpleEventBus: + """Get the global event bus instance. + + Returns: + SimpleEventBus: The global bus instance + """ + global _global_bus + if _global_bus is None: + _global_bus = SimpleEventBus() + return _global_bus + + +def set_global_bus(bus: SimpleEventBus) -> None: + """Set the global event bus instance. + + Args: + bus: The bus instance to set as global + """ + global _global_bus + _global_bus = bus + + +# Convenience functions for global bus operations +async def publish_event(event: BaseEvent) -> bool: + """Publish an event using the global bus. + + Args: + event: The event to publish + + Returns: + bool: True if event was processed successfully + """ + return await get_global_bus().publish(event) + + +async def subscribe_handler(handler) -> bool: + """Subscribe a handler using the global bus. + + Args: + handler: The handler to subscribe + + Returns: + bool: True if subscription was successful + """ + return await get_global_bus().subscribe(handler) + + +async def unsubscribe_handler(handler_name: str) -> bool: + """Unsubscribe a handler using the global bus. + + Args: + handler_name: Name of the handler to unsubscribe + + Returns: + bool: True if unsubscription was successful + """ + return await get_global_bus().unsubscribe(handler_name) + + +def get_bus_stats() -> Dict[str, Any]: + """Get statistics from the global bus. + + Returns: + Dict[str, Any]: Combined event stats and bus metrics + """ + bus = get_global_bus() + stats = bus.get_event_stats() + stats.update(bus.get_metrics()) + return stats diff --git a/app/event/infrastructure/middleware.py b/app/event/infrastructure/middleware.py new file mode 100644 index 000000000..3a4ffe8ee --- /dev/null +++ b/app/event/infrastructure/middleware.py @@ -0,0 +1,361 @@ +"""Event processing middleware system. + +This module provides middleware components for event processing including +error handling, retry mechanisms, logging, and metrics collection. +""" + +import asyncio +import time +from abc import ABC, abstractmethod +from typing import Any, Callable, Dict, List +from dataclasses import dataclass + +from app.logger import logger +from app.event.core.base import BaseEvent + + +@dataclass +class MiddlewareContext: + """Context passed through middleware chain.""" + + event: BaseEvent + handler_name: str + attempt: int = 1 + start_time: float = 0.0 + metadata: Dict[str, Any] = None + + def __post_init__(self): + if self.metadata is None: + self.metadata = {} + if self.start_time == 0.0: + self.start_time = time.time() + + +class BaseMiddleware(ABC): + """Base class for event processing middleware.""" + + def __init__(self, name: str): + self.name = name + + @abstractmethod + async def process(self, context: MiddlewareContext, next_middleware: Callable) -> bool: + """Process the event through this middleware. + + Args: + context: The middleware context + next_middleware: The next middleware in the chain + + Returns: + bool: True if processing should continue, False otherwise + """ + pass + + +class TestMiddleware(BaseMiddleware): + """Middleware for testing.""" + + def __init__(self): + super().__init__("test") + + async def process(self, context: MiddlewareContext, next_middleware: Callable) -> bool: + """Process the event through this middleware.""" + print("Test middleware processing") + return await next_middleware(context) + + +class LoggingMiddleware(BaseMiddleware): + """Middleware for logging event processing.""" + + def __init__(self, log_level: str = "DEBUG"): + super().__init__("logging") + self.log_level = log_level.upper() + + async def process(self, context: MiddlewareContext, next_middleware: Callable) -> bool: + """Log event processing details.""" + event = context.event + handler_name = context.handler_name + + # Log start + if self.log_level == "DEBUG": + logger.debug( + f"[{handler_name}] Processing event {event.event_id} " + f"({event.event_type}) - attempt {context.attempt}" + ) + + start_time = time.time() + + try: + # Call next middleware + result = await next_middleware(context) + + # Log success + duration = time.time() - start_time + if result: + logger.info( + f"[{handler_name}] Successfully processed event {event.event_id} " + f"in {duration:.3f}s" + ) + else: + logger.warning( + f"[{handler_name}] Failed to process event {event.event_id} " + f"after {duration:.3f}s" + ) + + return result + + except Exception as e: + # Log error + duration = time.time() - start_time + logger.error( + f"[{handler_name}] Error processing event {event.event_id} " + f"after {duration:.3f}s: {str(e)}" + ) + raise + + +class RetryMiddleware(BaseMiddleware): + """Middleware for handling retries on failure.""" + + def __init__(self, max_retries: int = 3, base_delay: float = 1.0, backoff_factor: float = 2.0): + super().__init__("retry") + self.max_retries = max_retries + self.base_delay = base_delay + self.backoff_factor = backoff_factor + + async def process(self, context: MiddlewareContext, next_middleware: Callable) -> bool: + """Handle retries for failed event processing.""" + last_exception = None + + for attempt in range(1, self.max_retries + 1): + context.attempt = attempt + + try: + result = await next_middleware(context) + if result: + return True + + # If handler returned False, don't retry + if attempt == 1: + return False + + except Exception as e: + last_exception = e + + if attempt == self.max_retries: + # Last attempt failed, re-raise the exception + logger.error( + f"Handler '{context.handler_name}' failed after {self.max_retries} attempts: {str(e)}" + ) + raise + + # Calculate delay for next attempt + delay = self.base_delay * (self.backoff_factor ** (attempt - 1)) + logger.warning( + f"Handler '{context.handler_name}' attempt {attempt} failed: {str(e)}. " + f"Retrying in {delay:.1f}s..." + ) + + await asyncio.sleep(delay) + + return False + + +class ErrorIsolationMiddleware(BaseMiddleware): + """Middleware for isolating errors to prevent cascade failures.""" + + def __init__(self, isolate_errors: bool = True): + super().__init__("error_isolation") + self.isolate_errors = isolate_errors + + async def process(self, context: MiddlewareContext, next_middleware: Callable) -> bool: + """Isolate errors to prevent them from affecting other handlers.""" + if not self.isolate_errors: + return await next_middleware(context) + + try: + return await next_middleware(context) + except Exception as e: + # Log the error but don't let it propagate + logger.error( + f"Handler '{context.handler_name}' failed with isolated error: {str(e)}" + ) + + # Mark event as failed + context.event.mark_failed(f"Handler '{context.handler_name}' error: {str(e)}") + + # Return False to indicate failure, but don't raise + return False + + +class MetricsMiddleware(BaseMiddleware): + """Middleware for collecting processing metrics.""" + + def __init__(self): + super().__init__("metrics") + self.metrics = { + "total_events": 0, + "successful_events": 0, + "failed_events": 0, + "handler_stats": {}, + "event_type_stats": {}, + } + + async def process(self, context: MiddlewareContext, next_middleware: Callable) -> bool: + """Collect metrics during event processing.""" + event = context.event + handler_name = context.handler_name + + # Initialize handler stats if needed + if handler_name not in self.metrics["handler_stats"]: + self.metrics["handler_stats"][handler_name] = { + "total": 0, + "successful": 0, + "failed": 0, + "total_duration": 0.0, + "avg_duration": 0.0 + } + + # Initialize event type stats if needed + if event.event_type not in self.metrics["event_type_stats"]: + self.metrics["event_type_stats"][event.event_type] = { + "total": 0, + "successful": 0, + "failed": 0 + } + + start_time = time.time() + + try: + result = await next_middleware(context) + duration = time.time() - start_time + + # Update metrics + self.metrics["total_events"] += 1 + self.metrics["handler_stats"][handler_name]["total"] += 1 + self.metrics["handler_stats"][handler_name]["total_duration"] += duration + self.metrics["event_type_stats"][event.event_type]["total"] += 1 + + if result: + self.metrics["successful_events"] += 1 + self.metrics["handler_stats"][handler_name]["successful"] += 1 + self.metrics["event_type_stats"][event.event_type]["successful"] += 1 + else: + self.metrics["failed_events"] += 1 + self.metrics["handler_stats"][handler_name]["failed"] += 1 + self.metrics["event_type_stats"][event.event_type]["failed"] += 1 + + # Update average duration + handler_stats = self.metrics["handler_stats"][handler_name] + handler_stats["avg_duration"] = handler_stats["total_duration"] / handler_stats["total"] + + return result + + except Exception as e: + duration = time.time() - start_time + + # Update failure metrics + self.metrics["total_events"] += 1 + self.metrics["failed_events"] += 1 + self.metrics["handler_stats"][handler_name]["total"] += 1 + self.metrics["handler_stats"][handler_name]["failed"] += 1 + self.metrics["handler_stats"][handler_name]["total_duration"] += duration + self.metrics["event_type_stats"][event.event_type]["total"] += 1 + self.metrics["event_type_stats"][event.event_type]["failed"] += 1 + + # Update average duration + handler_stats = self.metrics["handler_stats"][handler_name] + handler_stats["avg_duration"] = handler_stats["total_duration"] / handler_stats["total"] + + raise + + def get_metrics(self) -> Dict[str, Any]: + """Get collected metrics.""" + return self.metrics.copy() + + def reset_metrics(self) -> None: + """Reset all metrics.""" + self.metrics = { + "total_events": 0, + "successful_events": 0, + "failed_events": 0, + "handler_stats": {}, + "event_type_stats": {}, + } + + +class MiddlewareChain: + """Manages a chain of middleware for event processing.""" + + def __init__(self, middlewares: List[BaseMiddleware] = None): + self.middlewares = middlewares or [] + + def add_middleware(self, middleware: BaseMiddleware) -> None: + """Add middleware to the chain.""" + self.middlewares.append(middleware) + + def remove_middleware(self, name: str) -> bool: + """Remove middleware by name.""" + for i, middleware in enumerate(self.middlewares): + if middleware.name == name: + del self.middlewares[i] + return True + return False + + async def process(self, context: MiddlewareContext, handler: Callable) -> bool: + """Process event through the middleware chain.""" + if not self.middlewares: + # No middleware, call handler directly + return await self._call_handler(handler, context) + + # Create the middleware chain + async def create_chain(index: int): + if index >= len(self.middlewares): + # End of chain, call the actual handler + return await self._call_handler(handler, context) + + # Call current middleware with next middleware as continuation + middleware = self.middlewares[index] + return await middleware.process(context, lambda _: create_chain(index + 1)) + + return await create_chain(0) + + async def _call_handler(self, handler: Callable, context: MiddlewareContext) -> bool: + """Call the actual event handler.""" + import inspect + + if inspect.iscoroutinefunction(handler): + return await handler(context.event) + else: + return handler(context.event) + + +# Default middleware chain factory +def create_default_middleware_chain( + enable_logging: bool = True, + enable_retry: bool = True, + enable_error_isolation: bool = True, + enable_metrics: bool = True, + enable_Test: bool = True, + max_retries: int = 3, + retry_delay: float = 1.0, + log_level: str = "INFO" +) -> MiddlewareChain: + """Create a default middleware chain with common middleware.""" + middlewares = [] + + if enable_logging: + middlewares.append(LoggingMiddleware(log_level)) + + if enable_metrics: + middlewares.append(MetricsMiddleware()) + + if enable_error_isolation: + middlewares.append(ErrorIsolationMiddleware()) + + if enable_retry: + middlewares.append(RetryMiddleware(max_retries, retry_delay)) + + if enable_Test: + middlewares.append(TestMiddleware()) + + return MiddlewareChain(middlewares) diff --git a/app/event/infrastructure/registry.py b/app/event/infrastructure/registry.py new file mode 100644 index 000000000..3948193c8 --- /dev/null +++ b/app/event/infrastructure/registry.py @@ -0,0 +1,333 @@ +"""Event handler registry system with decorator support. + +This module provides automatic handler registration, wildcard pattern matching, +dependency resolution for event handlers. +""" + +import fnmatch +from typing import Callable, Dict, List, Optional, Set +from dataclasses import dataclass, field + +from app.logger import logger + + +@dataclass +class HandlerInfo: + """Information about a registered event handler.""" + + name: str + handler: Callable + patterns: List[str] + depends_on: List[str] = field(default_factory=list) + retry_count: int = 3 + retry_delay: float = 1.0 + enabled: bool = True + + def matches_event(self, event_type: str) -> bool: + """Check if this handler matches the given event type.""" + return any(fnmatch.fnmatch(event_type, pattern) for pattern in self.patterns) + + +class EventHandlerRegistry: + """Registry for managing event handlers with advanced features.""" + + def __init__(self): + self._handlers: Dict[str, HandlerInfo] = {} + self._dependency_graph: Dict[str, Set[str]] = {} + self._execution_order_cache: Dict[str, List[str]] = {} + + def register_handler( + self, + name: str, + handler: Callable, + patterns: List[str], + depends_on: Optional[List[str]] = None, + retry_count: int = 3, + retry_delay: float = 1.0, + enabled: bool = True + ) -> None: + """Register an event handler with the registry. + + Args: + name: Unique name for the handler + handler: The handler function (async or sync) + patterns: List of event type patterns to match (supports wildcards) + depends_on: List of handler names this handler depends on + retry_count: Number of retry attempts on failure + retry_delay: Delay between retries in seconds + enabled: Whether the handler is enabled + """ + if name in self._handlers: + logger.warning(f"Handler '{name}' already registered, overwriting") + + depends_on = depends_on or [] + + # Validate dependencies + for dep in depends_on: + if dep not in self._handlers and dep != name: + logger.warning(f"Handler '{name}' depends on unregistered handler '{dep}'") + + handler_info = HandlerInfo( + name=name, + handler=handler, + patterns=patterns, + depends_on=depends_on, + retry_count=retry_count, + retry_delay=retry_delay, + enabled=enabled + ) + + self._handlers[name] = handler_info + self._update_dependency_graph(name, depends_on) + self._clear_cache() + + logger.debug(f"Registered handler '{name}' for patterns {patterns}") + + def unregister_handler(self, name: str) -> bool: + """Unregister a handler by name. + + Args: + name: Name of the handler to unregister + + Returns: + bool: True if handler was found and removed + """ + if name in self._handlers: + del self._handlers[name] + self._remove_from_dependency_graph(name) + self._clear_cache() + logger.debug(f"Unregistered handler '{name}'") + return True + return False + + def get_handlers_for_event(self, event_type: str) -> tuple[List[HandlerInfo], List[HandlerInfo]]: + """Get handlers that can process the given event type, split by dependency. + + Args: + event_type: The event type to match + + Returns: + tuple: (independent_handlers, dependent_handlers) + """ + if event_type in self._execution_order_cache: + return self._execution_order_cache[event_type] + + # Find matching handlers + matching_handlers = [ + handler for handler in self._handlers.values() + if handler.enabled and handler.matches_event(event_type) + ] + + # Split handlers by dependency + independent_handlers = [] + dependent_handlers = [] + + for handler in matching_handlers: + if handler.depends_on: + dependent_handlers.append(handler) + else: + independent_handlers.append(handler) + + # Resolve execution order for dependent handlers only + if dependent_handlers: + dependent_handlers = self._resolve_execution_order(dependent_handlers) + + result = (independent_handlers, dependent_handlers) + + # Cache the result + self._execution_order_cache[event_type] = result + + return result + + def get_handler_info(self, name: str) -> Optional[HandlerInfo]: + """Get information about a specific handler. + + Args: + name: Name of the handler + + Returns: + Optional[HandlerInfo]: Handler information if found + """ + return self._handlers.get(name) + + def list_handlers(self) -> List[HandlerInfo]: + """Get list of all registered handlers. + + Returns: + List[HandlerInfo]: All registered handlers + """ + return list(self._handlers.values()) + + def enable_handler(self, name: str) -> bool: + """Enable a handler. + + Args: + name: Name of the handler to enable + + Returns: + bool: True if handler was found and enabled + """ + if name in self._handlers: + self._handlers[name].enabled = True + self._clear_cache() + return True + return False + + def disable_handler(self, name: str) -> bool: + """Disable a handler. + + Args: + name: Name of the handler to disable + + Returns: + bool: True if handler was found and disabled + """ + if name in self._handlers: + self._handlers[name].enabled = False + self._clear_cache() + return True + return False + + def _update_dependency_graph(self, name: str, depends_on: List[str]) -> None: + """Update the dependency graph for a handler.""" + self._dependency_graph[name] = set(depends_on) + + def _remove_from_dependency_graph(self, name: str) -> None: + """Remove a handler from the dependency graph.""" + # Remove the handler itself + if name in self._dependency_graph: + del self._dependency_graph[name] + + # Remove dependencies on this handler + for handler_deps in self._dependency_graph.values(): + handler_deps.discard(name) + + def _resolve_execution_order(self, handlers: List[HandlerInfo]) -> List[HandlerInfo]: + """Resolve the execution order based on dependencies. + + Args: + handlers: List of handlers to order + + Returns: + List[HandlerInfo]: Handlers in dependency order + """ + if not handlers: + return [] + + # Create a subgraph with only the handlers we're interested in + handler_names = {h.name for h in handlers} + subgraph = { + name: deps & handler_names + for name, deps in self._dependency_graph.items() + if name in handler_names + } + + # Topological sort + ordered = [] + visited = set() + temp_visited = set() + + def visit(name: str) -> None: + if name in temp_visited: + logger.warning(f"Circular dependency detected involving handler '{name}'") + return + if name in visited: + return + + temp_visited.add(name) + + # Visit dependencies first + for dep in subgraph.get(name, set()): + if dep in handler_names: + visit(dep) + + temp_visited.remove(name) + visited.add(name) + ordered.append(name) + + # Visit all handlers + for handler in handlers: + if handler.name not in visited: + visit(handler.name) + + # Convert back to HandlerInfo objects + handler_map = {h.name: h for h in handlers} + result = [] + + for name in ordered: + if name in handler_map: + result.append(handler_map[name]) + + return result + + def _clear_cache(self) -> None: + """Clear the execution order cache.""" + self._execution_order_cache.clear() + + +# Global registry instance +_global_registry = EventHandlerRegistry() + + +def get_global_registry() -> EventHandlerRegistry: + """Get the global event handler registry.""" + return _global_registry + + +def event_handler( + patterns: str | List[str], + depends_on: Optional[List[str]] = None, + retry_count: int = 3, + retry_delay: float = 1.0, + name: Optional[str] = None, + enabled: bool = True +): + """Decorator for registering event handlers. + + Args: + patterns: Event type pattern(s) to match (supports wildcards) + depends_on: List of handler names this handler depends on (optional) + retry_count: Number of retry attempts on failure + retry_delay: Delay between retries in seconds + name: Custom name for the handler (defaults to function name) + enabled: Whether the handler is enabled + + Example: + @event_handler("user.*") + async def handle_user_events(event: BaseEvent) -> bool: + return True + + @event_handler(["agent.step.*", "agent.complete"], depends_on=["logger"]) + async def handle_agent_events(event: BaseEvent) -> bool: + return True + """ + if isinstance(patterns, str): + patterns = [patterns] + + def decorator(func: Callable) -> Callable: + handler_name = name or func.__name__ + + # Register the handler + _global_registry.register_handler( + name=handler_name, + handler=func, + patterns=patterns, + depends_on=depends_on, + retry_count=retry_count, + retry_delay=retry_delay, + enabled=enabled + ) + + # Add metadata to the function + func._event_handler_info = { + 'name': handler_name, + 'patterns': patterns, + 'depends_on': depends_on, + 'retry_count': retry_count, + 'retry_delay': retry_delay, + 'enabled': enabled + } + + return func + + return decorator diff --git a/app/event/init.py b/app/event/init.py new file mode 100644 index 000000000..797962dfb --- /dev/null +++ b/app/event/init.py @@ -0,0 +1,255 @@ +"""Event system initialization and configuration module. + +This module provides utilities for initializing and configuring the event system +in the OpenManus project. It handles bus setup, middleware configuration, +and global event handlers registration. +""" + +import asyncio +from typing import Optional, Dict, Any + +from app.logger import logger +from app.event import ( + SimpleEventBus, + ChainableEventBus, + get_global_bus, + set_global_bus, + create_default_middleware_chain, + event_handler +) + + +class EventSystemConfig: + """Configuration class for the event system.""" + + def __init__( + self, + bus_type: str = "simple", # "simple" or "chainable" + max_concurrent_events: int = 100, + enable_logging: bool = False, + enable_retry: bool = True, + enable_error_isolation: bool = True, + enable_metrics: bool = True, + log_level: str = "INFO" + ): + self.bus_type = bus_type + self.max_concurrent_events = max_concurrent_events + self.enable_logging = enable_logging + self.enable_retry = enable_retry + self.enable_error_isolation = enable_error_isolation + self.enable_metrics = enable_metrics + self.log_level = log_level + + +def initialize_event_system(config: Optional[EventSystemConfig] = None) -> None: + """Initialize the global event system with the given configuration. + + Args: + config: Event system configuration. If None, uses default configuration. + """ + if config is None: + config = EventSystemConfig() + + logger.info(f"Initializing event system with {config.bus_type} bus...") + + # Create middleware chain + middleware_chain = create_default_middleware_chain( + enable_logging=config.enable_logging, + enable_retry=config.enable_retry, + enable_error_isolation=config.enable_error_isolation, + enable_metrics=config.enable_metrics + ) + + # Create and configure the event bus + if config.bus_type == "chainable": + bus = ChainableEventBus( + name="GlobalChainableEventBus", + max_concurrent_events=config.max_concurrent_events, + middleware_chain=middleware_chain + ) + else: + bus = SimpleEventBus( + name="GlobalSimpleEventBus", + max_concurrent_events=config.max_concurrent_events, + middleware_chain=middleware_chain + ) + + # Set as global bus + set_global_bus(bus) + + logger.info(f"Event system initialized successfully with {config.bus_type} bus") + + +def get_event_system_status() -> Dict[str, Any]: + """Get the current status of the event system. + + Returns: + Dict containing event system status information + """ + try: + bus = get_global_bus() + stats = bus.get_event_stats() + metrics = bus.get_metrics() + + return { + "initialized": True, + "bus_type": bus.__class__.__name__, + "bus_name": bus.name, + "stats": stats, + "metrics": metrics + } + except Exception as e: + return { + "initialized": False, + "error": str(e) + } + + +# Global event handlers that can be registered automatically +@event_handler("system.*", name="system_logger") +async def log_system_events(event): + """Log all system events for debugging and monitoring.""" + component = event.data.get('component', 'unknown') + logger.info(f"System Event: {event.event_type} from {component}") + return True + + +@event_handler("agent.*", name="agent_monitor") +async def monitor_agent_events(event): + """Monitor agent events for performance tracking.""" + agent_name = event.data.get('agent_name', 'unknown') + step_number = event.data.get('step_number', '') + step_info = f" (step {step_number})" if step_number else "" + logger.info(f"Agent Event: {event.event_type} from '{agent_name}'{step_info}") + return True + + +@event_handler("tool.*", name="tool_tracker") +async def track_tool_events(event): + """Track tool execution events.""" + tool_name = event.data.get('tool_name', 'unknown') + logger.info(f"Tool Event: {event.event_type} for tool '{tool_name}'") + return True + + +@event_handler("conversation.*", name="conversation_logger") +async def log_conversation_events(event): + """Log conversation events for session tracking.""" + conv_id = event.data.get('conversation_id', 'unknown') + user_id = event.data.get('user_id', 'unknown') + logger.info(f"Conversation Event: {event.event_type} (conv: {conv_id}, user: {user_id})") + return True + + +@event_handler("*", name="global_metrics") +async def collect_global_metrics(event): + """Collect global metrics for all events.""" + # This could be extended to send metrics to external systems + logger.debug(f"Event processed: {event.event_type} (id: {event.event_id})") + return True + + +def register_default_handlers() -> None: + """Register default global event handlers. + + This function is called automatically during initialization to register + system-level event handlers for logging, monitoring, and metrics collection. + """ + logger.info("Default event handlers registered successfully") + + +async def shutdown_event_system() -> None: + """Gracefully shutdown the event system. + + This should be called during application shutdown to ensure all events + are processed and resources are cleaned up properly. + """ + try: + bus = get_global_bus() + + # Wait for active events to complete (with timeout) + max_wait = 30 # seconds + wait_time = 0 + + while wait_time < max_wait: + stats = bus.get_event_stats() + active_events = stats.get('active_events', 0) + + if active_events == 0: + break + + logger.info(f"Waiting for {active_events} active events to complete...") + await asyncio.sleep(1) + wait_time += 1 + + # Force cleanup if needed + if hasattr(bus, 'shutdown'): + await bus.shutdown() + + logger.info("Event system shutdown completed") + + except Exception as e: + logger.error(f"Error during event system shutdown: {str(e)}") + + +# Convenience functions for common event publishing +async def publish_agent_step_start(agent_name: str, agent_type: str, step_number: int, + conversation_id: Optional[str] = None) -> bool: + """Convenience function to publish agent step start event.""" + from app.event import create_agent_step_start_event, publish_event + + event = create_agent_step_start_event( + agent_name=agent_name, + agent_type=agent_type, + step_number=step_number, + conversation_id=conversation_id + ) + return await publish_event(event) + + +async def publish_tool_execution(tool_name: str, tool_type: str, status: str, + parameters: Dict[str, Any], + conversation_id: Optional[str] = None) -> bool: + """Convenience function to publish tool execution event.""" + from app.event import create_tool_execution_event, publish_event + + event = create_tool_execution_event( + tool_name=tool_name, + tool_type=tool_type, + status=status, + parameters=parameters, + conversation_id=conversation_id + ) + return await publish_event(event) + + +async def publish_system_error(component: str, error_type: str, error_message: str, + conversation_id: Optional[str] = None) -> bool: + """Convenience function to publish system error event.""" + from app.event import create_system_error_event, publish_event + + event = create_system_error_event( + component=component, + error_type=error_type, + error_message=error_message, + conversation_id=conversation_id + ) + return await publish_event(event) + + +# Auto-initialization flag +_auto_initialized = False + + +def ensure_event_system_initialized(config: Optional[EventSystemConfig] = None) -> None: + """Ensure the event system is initialized (idempotent). + + This function can be called multiple times safely. It will only initialize + the event system once. + """ + global _auto_initialized + + if not _auto_initialized: + initialize_event_system(config) + register_default_handlers() + _auto_initialized = True diff --git a/app/event/interfaces/__init__.py b/app/event/interfaces/__init__.py new file mode 100644 index 000000000..9dbb1b7a5 --- /dev/null +++ b/app/event/interfaces/__init__.py @@ -0,0 +1,51 @@ +"""Interface layer for the event system. + +This module provides the main public API for the event system, +including factory functions and global API functions. +""" + +# Event factory functions +from .factories import ( + create_agent_step_start_event, + create_chainable_agent_step_start_event, + create_conversation_created_event, + create_user_input_event, + create_interrupt_event, + create_tool_execution_event, + create_chainable_tool_execution_request_event, + create_system_error_event +) + +# Global API functions +from .global_api import ( + get_global_bus, + set_global_bus, + publish_event, + subscribe_handler, + unsubscribe_handler, + get_bus_stats, + event_handler, + get_global_registry +) + +__all__ = [ + # Event factory functions + "create_agent_step_start_event", + "create_chainable_agent_step_start_event", + "create_conversation_created_event", + "create_user_input_event", + "create_interrupt_event", + "create_tool_execution_event", + "create_chainable_tool_execution_request_event", + "create_system_error_event", + + # Global API functions + "get_global_bus", + "set_global_bus", + "publish_event", + "subscribe_handler", + "unsubscribe_handler", + "get_bus_stats", + "event_handler", + "get_global_registry" +] diff --git a/app/event/interfaces/factories.py b/app/event/interfaces/factories.py new file mode 100644 index 000000000..98cf00a97 --- /dev/null +++ b/app/event/interfaces/factories.py @@ -0,0 +1,44 @@ +"""Event factory functions. + +This module provides convenient factory functions for creating domain events. +All factory functions are re-exported from their respective domain modules. +""" + +# Import factory functions from domain modules +from app.event.domain.agent import ( + create_agent_step_start_event, + create_chainable_agent_step_start_event +) + +from app.event.domain.conversation import ( + create_conversation_created_event, + create_user_input_event, + create_interrupt_event +) + +from app.event.domain.tool import ( + create_tool_execution_event, + create_chainable_tool_execution_request_event +) + +from app.event.domain.system import ( + create_system_error_event +) + +__all__ = [ + # Agent event factories + "create_agent_step_start_event", + "create_chainable_agent_step_start_event", + + # Conversation event factories + "create_conversation_created_event", + "create_user_input_event", + "create_interrupt_event", + + # Tool event factories + "create_tool_execution_event", + "create_chainable_tool_execution_request_event", + + # System event factories + "create_system_error_event" +] diff --git a/app/event/interfaces/global_api.py b/app/event/interfaces/global_api.py new file mode 100644 index 000000000..d69411276 --- /dev/null +++ b/app/event/interfaces/global_api.py @@ -0,0 +1,34 @@ +"""Global event system API. + +This module provides the main public API for the event system, +including global bus functions and handler registration. +""" + +# Import global functions from infrastructure +from app.event.infrastructure.bus import ( + get_global_bus, + set_global_bus, + publish_event, + subscribe_handler, + unsubscribe_handler, + get_bus_stats +) + +from app.event.infrastructure.registry import ( + event_handler, + get_global_registry +) + +__all__ = [ + # Global bus functions + "get_global_bus", + "set_global_bus", + "publish_event", + "subscribe_handler", + "unsubscribe_handler", + "get_bus_stats", + + # Handler registration + "event_handler", + "get_global_registry" +] diff --git a/app/flow/base.py b/app/flow/base.py index dc57b3996..c27426992 100644 --- a/app/flow/base.py +++ b/app/flow/base.py @@ -1,9 +1,12 @@ from abc import ABC, abstractmethod from typing import Dict, List, Optional, Union +import uuid -from pydantic import BaseModel +from pydantic import BaseModel, Field from app.agent.base import BaseAgent +from app.event.init import ensure_event_system_initialized +from app.logger import logger class BaseFlow(BaseModel, ABC): @@ -13,6 +16,11 @@ class BaseFlow(BaseModel, ABC): tools: Optional[List] = None primary_agent_key: Optional[str] = None + # Event system integration + conversation_id: Optional[str] = Field(default=None, description="Conversation ID for event tracking") + enable_events: bool = Field(default=True, description="Whether to publish flow events") + flow_name: str = Field(default="BaseFlow", description="Name of the flow for event identification") + class Config: arbitrary_types_allowed = True @@ -36,9 +44,24 @@ def __init__( # Set the agents dictionary data["agents"] = agents_dict + # Generate conversation ID if not provided + if "conversation_id" not in data or data["conversation_id"] is None: + data["conversation_id"] = str(uuid.uuid4()) + + # Set flow name if not provided + if "flow_name" not in data: + data["flow_name"] = self.__class__.__name__ + # Initialize using BaseModel's init super().__init__(**data) + # Initialize event system if events are enabled + if self.enable_events: + ensure_event_system_initialized() + + # Propagate conversation ID to agents + self._propagate_conversation_id() + @property def primary_agent(self) -> Optional[BaseAgent]: """Get the primary agent for the flow""" @@ -51,7 +74,111 @@ def get_agent(self, key: str) -> Optional[BaseAgent]: def add_agent(self, key: str, agent: BaseAgent) -> None: """Add a new agent to the flow""" self.agents[key] = agent + # Propagate conversation ID to new agent + if hasattr(agent, 'set_conversation_id') and self.conversation_id: + agent.set_conversation_id(self.conversation_id) @abstractmethod async def execute(self, input_text: str) -> str: """Execute the flow with given input""" + pass + + # Event system integration methods + + def _propagate_conversation_id(self) -> None: + """Propagate conversation ID to all agents.""" + if self.conversation_id: + for agent in self.agents.values(): + if hasattr(agent, 'set_conversation_id'): + agent.set_conversation_id(self.conversation_id) + + def set_conversation_id(self, conversation_id: str) -> None: + """Set the conversation ID for the flow and all agents.""" + self.conversation_id = conversation_id + self._propagate_conversation_id() + + def enable_event_publishing(self, enabled: bool = True) -> None: + """Enable or disable event publishing for the flow.""" + self.enable_events = enabled + if enabled: + ensure_event_system_initialized() + + async def publish_flow_start_event(self, input_text: str) -> bool: + """Publish flow execution start event.""" + if not self.enable_events: + return False + + try: + from app.event import BaseEvent, publish_event + + event = BaseEvent( + event_type="flow.execution.start", + data={ + "flow_name": self.flow_name, + "flow_type": self.__class__.__name__, + "conversation_id": self.conversation_id, + "input_text": input_text, + "agent_count": len(self.agents), + "primary_agent": self.primary_agent_key + }, + source=self.flow_name + ) + return await publish_event(event) + except Exception as e: + logger.warning(f"Failed to publish flow start event: {e}") + return False + + async def publish_flow_complete_event(self, result: str, success: bool = True) -> bool: + """Publish flow execution complete event.""" + if not self.enable_events: + return False + + try: + from app.event import BaseEvent, publish_event + + event = BaseEvent( + event_type="flow.execution.complete", + data={ + "flow_name": self.flow_name, + "flow_type": self.__class__.__name__, + "conversation_id": self.conversation_id, + "result": result, + "success": success + }, + source=self.flow_name + ) + return await publish_event(event) + except Exception as e: + logger.warning(f"Failed to publish flow complete event: {e}") + return False + + async def publish_custom_flow_event(self, event_type: str, data: dict) -> bool: + """Publish a custom flow event. + + Args: + event_type: Type of the event (e.g., "flow.custom.decision") + data: Event data dictionary + + Returns: + bool: True if event was published successfully + """ + if not self.enable_events: + return False + + try: + from app.event import BaseEvent, publish_event + + event = BaseEvent( + event_type=event_type, + data={ + "flow_name": self.flow_name, + "flow_type": self.__class__.__name__, + "conversation_id": self.conversation_id, + **data + }, + source=self.flow_name + ) + return await publish_event(event) + except Exception as e: + logger.warning(f"Failed to publish custom flow event {event_type}: {e}") + return False diff --git a/app/tool/__init__.py b/app/tool/__init__.py index b1d25e24c..f9861ebe5 100644 --- a/app/tool/__init__.py +++ b/app/tool/__init__.py @@ -2,6 +2,7 @@ from app.tool.bash import Bash from app.tool.browser_use_tool import BrowserUseTool from app.tool.create_chat_completion import CreateChatCompletion +from app.tool.crawl4ai import Crawl4aiTool from app.tool.planning import PlanningTool from app.tool.str_replace_editor import StrReplaceEditor from app.tool.terminate import Terminate @@ -13,6 +14,7 @@ "BaseTool", "Bash", "BrowserUseTool", + "Crawl4aiTool", "Terminate", "StrReplaceEditor", "WebSearch", diff --git a/app/tool/base.py b/app/tool/base.py index ba4084db9..40414a941 100644 --- a/app/tool/base.py +++ b/app/tool/base.py @@ -1,20 +1,53 @@ from abc import ABC, abstractmethod from typing import Any, Dict, Optional +import time from pydantic import BaseModel, Field +from app.event.init import ensure_event_system_initialized +from app.logger import logger + class BaseTool(ABC, BaseModel): name: str description: str parameters: Optional[dict] = None + # Event system integration + enable_events: bool = Field(default=True, description="Whether to publish tool execution events") + conversation_id: Optional[str] = Field(default=None, description="Current conversation ID for event tracking") + class Config: arbitrary_types_allowed = True async def __call__(self, **kwargs) -> Any: - """Execute the tool with given parameters.""" - return await self.execute(**kwargs) + """Execute the tool with given parameters and publish events.""" + if self.enable_events: + ensure_event_system_initialized() + + # Publish tool execution start event + start_time = time.time() + if self.enable_events: + await self._publish_tool_start_event(kwargs) + + try: + result = await self.execute(**kwargs) + execution_time = time.time() - start_time + + # Publish tool execution success event + if self.enable_events: + await self._publish_tool_complete_event(kwargs, result, True, execution_time) + + return result + + except Exception as e: + execution_time = time.time() - start_time + + # Publish tool execution failure event + if self.enable_events: + await self._publish_tool_complete_event(kwargs, str(e), False, execution_time) + + raise @abstractmethod async def execute(self, **kwargs) -> Any: @@ -31,6 +64,90 @@ def to_param(self) -> Dict: }, } + # Event system integration methods + + def set_conversation_id(self, conversation_id: str) -> None: + """Set the conversation ID for event tracking.""" + self.conversation_id = conversation_id + + def enable_event_publishing(self, enabled: bool = True) -> None: + """Enable or disable event publishing.""" + self.enable_events = enabled + if enabled: + ensure_event_system_initialized() + + async def _publish_tool_start_event(self, parameters: Dict[str, Any]) -> bool: + """Publish tool execution start event.""" + try: + from app.event import create_tool_execution_event, publish_event + + event = create_tool_execution_event( + tool_name=self.name, + tool_type=self.__class__.__name__, + status="started", + parameters=parameters, + conversation_id=self.conversation_id + ) + return await publish_event(event) + except Exception as e: + logger.warning(f"Failed to publish tool start event: {e}") + return False + + async def _publish_tool_complete_event(self, parameters: Dict[str, Any], result: Any, + success: bool, execution_time: float) -> bool: + """Publish tool execution complete event.""" + try: + from app.event import BaseEvent, publish_event + + event = BaseEvent( + event_type="tool.execution.complete", + data={ + "tool_name": self.name, + "tool_type": self.__class__.__name__, + "parameters": parameters, + "result": str(result) if result is not None else None, + "success": success, + "execution_time": execution_time, + "conversation_id": self.conversation_id + }, + source=self.name + ) + return await publish_event(event) + except Exception as e: + logger.warning(f"Failed to publish tool complete event: {e}") + return False + + async def publish_custom_tool_event(self, event_type: str, data: dict) -> bool: + """Publish a custom tool event. + + Args: + event_type: Type of the event (e.g., "tool.custom.progress") + data: Event data dictionary + + Returns: + bool: True if event was published successfully + """ + if not self.enable_events: + return False + + try: + from app.event import BaseEvent, publish_event + + event = BaseEvent( + event_type=event_type, + data={ + "tool_name": self.name, + "tool_type": self.__class__.__name__, + "conversation_id": self.conversation_id, + **data + }, + source=self.name + ) + return await publish_event(event) + except Exception as e: + logger.warning(f"Failed to publish custom tool event {event_type}: {e}") + return False + class ToolResult(BaseModel): """Represents the result of a tool execution.""" diff --git a/app/tool/crawl4ai.py b/app/tool/crawl4ai.py new file mode 100644 index 000000000..71987492f --- /dev/null +++ b/app/tool/crawl4ai.py @@ -0,0 +1,249 @@ + +""" +Crawl4AI Web Crawler Tool for OpenManus + +This tool integrates Crawl4AI, a high-performance web crawler designed for LLMs and AI agents, +providing fast, precise, and AI-ready data extraction with clean Markdown generation. +""" + +import asyncio +from typing import Union, List +from urllib.parse import urlparse + +from app.logger import logger +from app.tool.base import BaseTool, ToolResult + + +class Crawl4aiTool(BaseTool): + """ + Web crawler tool powered by Crawl4AI. + + Provides clean markdown extraction optimized for AI processing. + """ + + name: str = "crawl4ai" + description: str = """Web crawler that extracts clean, AI-ready content from web pages. + + Features: + - Extracts clean markdown content optimized for LLMs + - Handles JavaScript-heavy sites and dynamic content + - Supports multiple URLs in a single request + - Fast and reliable with built-in error handling + + Perfect for content analysis, research, and feeding web content to AI models.""" + + parameters: dict = { + "type": "object", + "properties": { + "urls": { + "type": "array", + "items": {"type": "string"}, + "description": "(required) List of URLs to crawl. Can be a single URL or multiple URLs.", + "minItems": 1 + }, + "timeout": { + "type": "integer", + "description": "(optional) Timeout in seconds for each URL. Default is 30.", + "default": 30, + "minimum": 5, + "maximum": 120 + }, + "bypass_cache": { + "type": "boolean", + "description": "(optional) Whether to bypass cache and fetch fresh content. Default is false.", + "default": False + }, + "word_count_threshold": { + "type": "integer", + "description": "(optional) Minimum word count for content blocks. Default is 10.", + "default": 10, + "minimum": 1 + } + }, + "required": ["urls"] + } + + async def execute( + self, + urls: Union[str, List[str]], + timeout: int = 30, + bypass_cache: bool = False, + word_count_threshold: int = 10, + ) -> ToolResult: + """ + Execute web crawling for the specified URLs. + + Args: + urls: Single URL string or list of URLs to crawl + timeout: Timeout in seconds for each URL + bypass_cache: Whether to bypass cache + word_count_threshold: Minimum word count for content blocks + + Returns: + ToolResult with crawl results + """ + # Normalize URLs to list + if isinstance(urls, str): + url_list = [urls] + else: + url_list = urls + + # Validate URLs + valid_urls = [] + for url in url_list: + if self._is_valid_url(url): + valid_urls.append(url) + else: + logger.warning(f"Invalid URL skipped: {url}") + + if not valid_urls: + return ToolResult(error="No valid URLs provided") + + try: + # Import crawl4ai components + from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode + + # Configure browser settings + browser_config = BrowserConfig( + headless=True, + verbose=False, + browser_type="chromium", + ignore_https_errors=True, + java_script_enabled=True + ) + + # Configure crawler settings + run_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS if bypass_cache else CacheMode.ENABLED, + word_count_threshold=word_count_threshold, + process_iframes=True, + remove_overlay_elements=True, + excluded_tags=['script', 'style'], + page_timeout=timeout * 1000, # Convert to milliseconds + verbose=False, + wait_until="domcontentloaded" + ) + + results = [] + successful_count = 0 + failed_count = 0 + + # Process each URL + async with AsyncWebCrawler(config=browser_config) as crawler: + for url in valid_urls: + try: + logger.info(f"🕷️ Crawling URL: {url}") + start_time = asyncio.get_event_loop().time() + + result = await crawler.arun(url=url, config=run_config) + + end_time = asyncio.get_event_loop().time() + execution_time = end_time - start_time + + if result.success: + # Count words in markdown + word_count = 0 + if hasattr(result, 'markdown') and result.markdown: + word_count = len(result.markdown.split()) + + # Count links + links_count = 0 + if hasattr(result, 'links') and result.links: + internal_links = result.links.get('internal', []) + external_links = result.links.get('external', []) + links_count = len(internal_links) + len(external_links) + + # Count images + images_count = 0 + if hasattr(result, 'media') and result.media: + images = result.media.get('images', []) + images_count = len(images) + + results.append({ + 'url': url, + 'success': True, + 'status_code': getattr(result, 'status_code', 200), + 'title': result.metadata.get('title') if result.metadata else None, + 'markdown': result.markdown if hasattr(result, 'markdown') else None, + 'word_count': word_count, + 'links_count': links_count, + 'images_count': images_count, + 'execution_time': execution_time + }) + successful_count += 1 + logger.info(f"✅ Successfully crawled {url} in {execution_time:.2f}s") + + else: + results.append({ + 'url': url, + 'success': False, + 'error_message': getattr(result, 'error_message', 'Unknown error'), + 'execution_time': execution_time + }) + failed_count += 1 + logger.warning(f"❌ Failed to crawl {url}") + + except Exception as e: + error_msg = f"Error crawling {url}: {str(e)}" + logger.error(error_msg) + results.append({ + 'url': url, + 'success': False, + 'error_message': error_msg + }) + failed_count += 1 + + # Format output + output_lines = [f"🕷️ Crawl4AI Results Summary:"] + output_lines.append(f"📊 Total URLs: {len(valid_urls)}") + output_lines.append(f"✅ Successful: {successful_count}") + output_lines.append(f"❌ Failed: {failed_count}") + output_lines.append("") + + for i, result in enumerate(results, 1): + output_lines.append(f"{i}. {result['url']}") + + if result['success']: + output_lines.append(f" ✅ Status: Success (HTTP {result.get('status_code', 'N/A')})") + if result.get('title'): + output_lines.append(f" 📄 Title: {result['title']}") + + if result.get('markdown'): + # Show first 300 characters of markdown content + content_preview = result['markdown'] + if len(result['markdown']) > 300: + content_preview += "..." + output_lines.append(f" 📝 Content: {content_preview}") + + output_lines.append(f" 📊 Stats: {result.get('word_count', 0)} words, {result.get('links_count', 0)} links, {result.get('images_count', 0)} images") + + if result.get('execution_time'): + output_lines.append(f" ⏱️ Time: {result['execution_time']:.2f}s") + else: + output_lines.append(f" ❌ Status: Failed") + if result.get('error_message'): + output_lines.append(f" 🚫 Error: {result['error_message']}") + + output_lines.append("") + + return ToolResult(output="\n".join(output_lines)) + + except ImportError as e: + error_msg = "Crawl4AI is not installed. Please install it with: pip install crawl4ai" + logger.error(error_msg) + return ToolResult(error=error_msg) + except Exception as e: + error_msg = f"Crawl4AI execution failed: {str(e)}" + logger.error(error_msg) + return ToolResult(error=error_msg) + + def _is_valid_url(self, url: str) -> bool: + """Validate if a URL is properly formatted.""" + try: + result = urlparse(url) + return all([result.scheme, result.netloc]) and result.scheme in ['http', 'https'] + except Exception: + return False + + + diff --git a/requirements.txt b/requirements.txt index 3324283fc..aa7e6dc93 100644 --- a/requirements.txt +++ b/requirements.txt @@ -36,6 +36,7 @@ boto3~=1.37.18 requests~=2.32.3 beautifulsoup4~=4.13.3 +crawl4ai~=0.6.3 huggingface-hub~=0.29.2 setuptools~=75.8.0 diff --git a/tests/event/test_event.py b/tests/event/test_event.py new file mode 100644 index 000000000..97c306734 --- /dev/null +++ b/tests/event/test_event.py @@ -0,0 +1,71 @@ +import sys +import asyncio +from pathlib import Path + +# Add the project root to Python path +project_root = Path(__file__).parent.parent.parent +sys.path.insert(0, str(project_root)) + +# Import from the new restructured event system +from app.event import ( + event_handler, + get_global_bus, + publish_event, + create_default_middleware_chain +) +from app.event.domain.system.test_events import TestEvent, TestAddEvent + +@event_handler("test.*", name="test_handler") +async def handle_test_events(event): + """Handle test events.""" + print(f"Handled test event: {event.event_type} - Basic test handler") + return True + +@event_handler("test.testadd") +async def handle_test_add_events(event): + """Handle test add events.""" + a = event.data.get("a", 0) + b = event.data.get("b", 0) + print(f"Addition test: {a} + {b} = {int(a) + int(b)}") + return True + +async def run(): + """Run the test scenario.""" + bus = get_global_bus() + + # Configure middleware chain + bus.middleware_chain = create_default_middleware_chain( + enable_logging=True, + enable_retry=False, + enable_error_isolation=True, + enable_metrics=True + ) + + # Create test events + print("Creating test events...") + test_event = TestEvent("4", "3") + test_add_event = TestAddEvent("4", "9") + + # Publish events + print("Publishing events...") + await publish_event(test_event) + await publish_event(test_add_event) + + # Get and display statistics + stats = bus.get_event_stats() + metrics = bus.get_metrics() + stats.update(metrics) + + print("\n=== Event System Statistics ===") + print(f"Total Events: {stats.get('total_events', 0)}") + print(f"Active Events: {stats.get('active_events', 0)}") + print(f"Registered Handlers: {stats.get('registered_handlers', 0)}") + print(f"Status Distribution: {stats.get('status_distribution', {})}") + + if 'handler_metrics' in stats: + print(f"Handler Metrics: {stats['handler_metrics']}") + +if __name__ == "__main__": + print("Starting event system test...") + asyncio.run(run()) + print("Test completed.")