diff --git a/openhands/runtime/base.py b/openhands/runtime/base.py index 6e27cffa80b6..0a776ec8a97f 100644 --- a/openhands/runtime/base.py +++ b/openhands/runtime/base.py @@ -857,7 +857,7 @@ def get_microagents_from_selected_repo( # If the instructions file is not found in the workspace root, try to load it from the repo root self.log( 'debug', - f'.openhands_instructions not present, trying to load from repository {microagents_dir=}', + f'.openhands_instructions not present, trying to load from repository microagents_dir={microagents_dir}', ) obs = self.read( FileReadAction(path=str(repo_root / '.openhands_instructions')) diff --git a/poetry.lock b/poetry.lock index 376e5c759871..f050c9205811 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.4 and should not be changed by hand. [[package]] name = "aiofiles" @@ -2257,15 +2257,15 @@ files = [ [[package]] name = "e2b" -version = "1.7.0" +version = "2.0.0" description = "E2B SDK that give agents cloud environments" optional = true python-versions = "<4.0,>=3.9" groups = ["main"] markers = "extra == \"third-party-runtimes\"" files = [ - {file = "e2b-1.7.0-py3-none-any.whl", hash = "sha256:6bd3d935249fcf5684494a97178d4d58446b4ed4018ac09087e4000046e82aab"}, - {file = "e2b-1.7.0.tar.gz", hash = "sha256:7783408c2cdf7aee9b088d31759364f2b13b21100cc4e132ba36fd84cfc72e31"}, + {file = "e2b-2.0.0-py3-none-any.whl", hash = "sha256:a6621b905cb2a883a9c520736ae98343a6184fc90c29b4f2f079d720294a0df0"}, + {file = "e2b-2.0.0.tar.gz", hash = "sha256:4d033d937b0a09b8428e73233321a913cbaef8e7299fc731579c656e9d53a144"}, ] [package.dependencies] @@ -2273,10 +2273,28 @@ attrs = ">=23.2.0" httpcore = ">=1.0.5,<2.0.0" httpx = ">=0.27.0,<1.0.0" packaging = ">=24.1" -protobuf = ">=5.29.4,<6.0.0" +protobuf = ">=4.21.0" python-dateutil = ">=2.8.2" typing-extensions = ">=4.1.0" +[[package]] +name = "e2b-code-interpreter" +version = "2.0.0" +description = "E2B Code Interpreter - Stateful code execution" +optional = true +python-versions = "<4.0,>=3.9" +groups = ["main"] +markers = "extra == \"third-party-runtimes\"" +files = [ + {file = "e2b_code_interpreter-2.0.0-py3-none-any.whl", hash = "sha256:273642d4dd78f09327fb1553fe4f7ddcf17892b78f98236e038d29985e42dca5"}, + {file = "e2b_code_interpreter-2.0.0.tar.gz", hash = "sha256:19136916be8de60bfd0a678742501d1d0335442bb6e86405c7dd6f98059b73c4"}, +] + +[package.dependencies] +attrs = ">=21.3.0" +e2b = ">=2.0.0,<3.0.0" +httpx = ">=0.20.0,<1.0.0" + [[package]] name = "english-words" version = "2.0.1" @@ -11845,9 +11863,9 @@ cffi = {version = ">=1.11", markers = "platform_python_implementation == \"PyPy\ cffi = ["cffi (>=1.11)"] [extras] -third-party-runtimes = ["daytona", "e2b", "modal", "runloop-api-client"] +third-party-runtimes = ["daytona", "e2b-code-interpreter", "modal", "runloop-api-client"] [metadata] lock-version = "2.1" python-versions = "^3.12,<3.14" -content-hash = "a0ae2cee596dde71f89c06e9669efda58ee8f8f019fad3dbe9df068005c32904" +content-hash = "5135db5c5c744f7b2aab0ccb6921343d2268d8ef950e024ddc3bce25c597140a" diff --git a/pyproject.toml b/pyproject.toml index ce480e3084d7..dda3c4453c7c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,14 +96,14 @@ memory-profiler = "^0.61.0" jupyter_kernel_gateway = "*" # Third-party runtime dependencies (optional) -e2b = { version = ">=1.0.5,<1.8.0", optional = true } modal = { version = ">=0.66.26,<1.2.0", optional = true } runloop-api-client = { version = "0.50.0", optional = true } daytona = { version = "0.24.2", optional = true } httpx-aiohttp = "^0.1.8" +e2b-code-interpreter = { version = "^2.0.0", optional = true } [tool.poetry.extras] -third_party_runtimes = [ "e2b", "modal", "runloop-api-client", "daytona" ] +third_party_runtimes = [ "e2b-code-interpreter", "modal", "runloop-api-client", "daytona" ] [tool.poetry.group.dev] optional = true diff --git a/third_party/runtime/impl/e2b/e2b_runtime.py b/third_party/runtime/impl/e2b/e2b_runtime.py index bd43219da1f5..92ec319edfde 100644 --- a/third_party/runtime/impl/e2b/e2b_runtime.py +++ b/third_party/runtime/impl/e2b/e2b_runtime.py @@ -1,32 +1,50 @@ +import os from typing import Callable from openhands.core.config import OpenHandsConfig +from openhands.core.logger import openhands_logger as logger from openhands.events.action import ( + BrowseURLAction, + BrowseInteractiveAction, + CmdRunAction, + FileEditAction, FileReadAction, FileWriteAction, + IPythonRunCellAction, ) from openhands.events.observation import ( + BrowserOutputObservation, + CmdOutputObservation, ErrorObservation, + FileEditObservation, FileReadObservation, FileWriteObservation, + IPythonRunCellObservation, Observation, ) from openhands.events.stream import EventStream from openhands.integrations.provider import PROVIDER_TOKEN_TYPE +from openhands.llm.llm_registry import LLMRegistry from openhands.runtime.impl.action_execution.action_execution_client import ( ActionExecutionClient, ) -from third_party.runtime.impl.e2b.filestore import E2BFileStore -from third_party.runtime.impl.e2b.sandbox import E2BSandbox from openhands.runtime.plugins import PluginRequirement +from openhands.runtime.runtime_status import RuntimeStatus from openhands.runtime.utils.files import insert_lines, read_lines +from openhands.utils.async_utils import call_sync_from_async +from third_party.runtime.impl.e2b.filestore import E2BFileStore +from third_party.runtime.impl.e2b.sandbox import E2BBox, E2BSandbox class E2BRuntime(ActionExecutionClient): + # Class-level cache for sandbox IDs + _sandbox_id_cache: dict[str, str] = {} + def __init__( self, config: OpenHandsConfig, event_stream: EventStream, + llm_registry: LLMRegistry, sid: str = "default", plugins: list[PluginRequirement] | None = None, env_vars: dict[str, str] | None = None, @@ -37,42 +55,348 @@ def __init__( git_provider_tokens: PROVIDER_TOKEN_TYPE | None = None, sandbox: E2BSandbox | None = None, ): + if config.workspace_base is not None: + logger.warning( + "Setting workspace_base is not supported in the E2B runtime. " + "E2B provides its own isolated filesystem." + ) + super().__init__( - config, - event_stream, - sid, - plugins, - env_vars, - status_callback, - attach_to_existing, - headless_mode, - user_id, - git_provider_tokens, + config=config, + event_stream=event_stream, + llm_registry=llm_registry, + sid=sid, + plugins=plugins, + env_vars=env_vars, + status_callback=status_callback, + attach_to_existing=attach_to_existing, + headless_mode=headless_mode, + user_id=user_id, + git_provider_tokens=git_provider_tokens, ) - if sandbox is None: - self.sandbox = E2BSandbox(config.sandbox) - if not isinstance(self.sandbox, E2BSandbox): - raise ValueError("E2BRuntime requires an E2BSandbox") - self.file_store = E2BFileStore(self.sandbox.filesystem) + self.sandbox = sandbox + self.file_store = None + self.api_url = None + self._action_server_port = 8000 + self._runtime_initialized = False + + @property + def action_execution_server_url(self) -> str: + """Return the URL of the action execution server.""" + if not self.api_url: + raise RuntimeError("E2B runtime not connected. Call connect() first.") + return self.api_url + + async def connect(self) -> None: + """Initialize E2B sandbox and start action execution server.""" + self.set_runtime_status(RuntimeStatus.STARTING_RUNTIME) + + try: + if self.attach_to_existing and self.sandbox is None: + try: + cached_sandbox_id = self.__class__._sandbox_id_cache.get(self.sid) + + if cached_sandbox_id: + try: + self.sandbox = E2BBox(self.config.sandbox, sandbox_id=cached_sandbox_id) + logger.info(f"Successfully attached to existing E2B sandbox: {cached_sandbox_id}") + except Exception as e: + logger.warning(f"Failed to connect to cached sandbox {cached_sandbox_id}: {e}") + del self.__class__._sandbox_id_cache[self.sid] + self.sandbox = None + + except Exception as e: + logger.warning(f"Failed to attach to existing sandbox: {e}. Will create a new one.") + + # Create E2B sandbox if not provided + if self.sandbox is None: + try: + self.sandbox = E2BSandbox(self.config.sandbox) + sandbox_id = self.sandbox.sandbox.sandbox_id + logger.info(f"E2B sandbox created with ID: {sandbox_id}") + + self.__class__._sandbox_id_cache[self.sid] = sandbox_id + except Exception as e: + logger.error(f"Failed to create E2B sandbox: {e}") + raise + + if not isinstance(self.sandbox, (E2BSandbox, E2BBox)): + raise ValueError("E2BRuntime requires an E2BSandbox or E2BBox") + + self.file_store = E2BFileStore(self.sandbox.filesystem) + + # E2B doesn't use action execution server - set dummy URL + self.api_url = "direct://e2b-sandbox" + + workspace_dir = self.config.workspace_mount_path_in_sandbox + if workspace_dir: + try: + exit_code, output = self.sandbox.execute(f"sudo mkdir -p {workspace_dir}") + if exit_code == 0: + self.sandbox.execute(f"sudo chmod 777 {workspace_dir}") + logger.info(f"Created workspace directory: {workspace_dir}") + else: + logger.warning(f"Failed to create workspace directory: {output}") + except Exception as e: + logger.warning(f"Failed to create workspace directory: {e}") + + await call_sync_from_async(self.setup_initial_env) + + self._runtime_initialized = True + self.set_runtime_status(RuntimeStatus.READY) + logger.info("E2B runtime connected successfully") + + except Exception as e: + logger.error(f"Failed to connect E2B runtime: {e}") + self.set_runtime_status(RuntimeStatus.FAILED) + raise + + async def close(self) -> None: + """Close the E2B runtime.""" + if self._runtime_closed: + return + + self._runtime_closed = True + + if self.sandbox: + try: + + if not self.attach_to_existing: + self.sandbox.close() + if self.sid in self.__class__._sandbox_id_cache: + del self.__class__._sandbox_id_cache[self.sid] + logger.info("E2B sandbox closed and removed from cache") + else: + logger.info("E2B runtime connection closed, sandbox kept running for reuse") + + except Exception as e: + logger.error(f"Error closing E2B sandbox: {e}") + + parent_close = super().close() + if parent_close is not None: + await parent_close + + def run(self, action: CmdRunAction) -> Observation: + """Execute command using E2B's native execute method.""" + if self.sandbox is None: + return ErrorObservation("E2B sandbox not initialized") + + try: + timeout = action.timeout if action.timeout else self.config.sandbox.timeout + exit_code, output = self.sandbox.execute(action.command, timeout=timeout) + return CmdOutputObservation( + content=output, + command=action.command, + exit_code=exit_code + ) + except Exception as e: + return ErrorObservation(f"Failed to execute command: {e}") + + def run_ipython(self, action: IPythonRunCellAction) -> Observation: + """Execute IPython code using E2B's code interpreter.""" + if self.sandbox is None: + return ErrorObservation("E2B sandbox not initialized") + + try: + result = self.sandbox.sandbox.run_code(action.code) + + outputs = [] + if hasattr(result, 'results') and result.results: + for r in result.results: + if hasattr(r, 'text') and r.text: + outputs.append(r.text) + elif hasattr(r, 'html') and r.html: + outputs.append(r.html) + elif hasattr(r, 'png') and r.png: + outputs.append(f"[Image data: {len(r.png)} bytes]") + + if hasattr(result, 'error') and result.error: + return ErrorObservation(f"IPython error: {result.error}") + + return IPythonRunCellObservation( + content='\n'.join(outputs) if outputs else '', + code=action.code + ) + except Exception as e: + return ErrorObservation(f"Failed to execute IPython code: {e}") def read(self, action: FileReadAction) -> Observation: - content = self.file_store.read(action.path) - lines = read_lines(content.split("\n"), action.start, action.end) - code_view = "".join(lines) - return FileReadObservation(code_view, path=action.path) + if self.file_store is None: + return ErrorObservation("E2B file store not initialized. Call connect() first.") + + try: + content = self.file_store.read(action.path) + lines = read_lines(content.split("\n"), action.start, action.end) + code_view = "".join(lines) + return FileReadObservation(code_view, path=action.path) + except Exception as e: + return ErrorObservation(f"Failed to read file: {e}") def write(self, action: FileWriteAction) -> Observation: - if action.start == 0 and action.end == -1: - self.file_store.write(action.path, action.content) - return FileWriteObservation(content="", path=action.path) - files = self.file_store.list(action.path) - if action.path in files: - all_lines = self.file_store.read(action.path).split("\n") - new_file = insert_lines( - action.content.split("\n"), all_lines, action.start, action.end + if self.file_store is None: + return ErrorObservation("E2B file store not initialized. Call connect() first.") + + try: + if action.start == 0 and action.end == -1: + self.file_store.write(action.path, action.content) + return FileWriteObservation(content="", path=action.path) + + files = self.file_store.list(action.path) + if action.path in files: + all_lines = self.file_store.read(action.path).split("\n") + new_file = insert_lines( + action.content.split("\n"), all_lines, action.start, action.end + ) + self.file_store.write(action.path, "".join(new_file)) + return FileWriteObservation("", path=action.path) + else: + # Create a new file + self.file_store.write(action.path, action.content) + return FileWriteObservation(content="", path=action.path) + except Exception as e: + return ErrorObservation(f"Failed to write file: {e}") + + def edit(self, action: FileEditAction) -> Observation: + """Edit a file using E2B's file system.""" + if self.file_store is None: + return ErrorObservation("E2B file store not initialized. Call connect() first.") + + try: + if action.path in self.file_store.list(action.path): + content = self.file_store.read(action.path) + else: + return ErrorObservation(f"File {action.path} not found") + + lines = content.split('\n') + if action.start < 0 or action.end > len(lines): + return ErrorObservation(f"Invalid line range: {action.start}-{action.end}") + + new_lines = lines[:action.start] + action.content.split('\n') + lines[action.end:] + new_content = '\n'.join(new_lines) + + self.file_store.write(action.path, new_content) + + return FileEditObservation( + content='', + path=action.path, + old_content='\n'.join(lines[action.start:action.end]), + start=action.start, + end=action.end ) - self.file_store.write(action.path, "".join(new_file)) - return FileWriteObservation("", path=action.path) - else: - # FIXME: we should create a new file here - return ErrorObservation(f"File not found: {action.path}") + except Exception as e: + return ErrorObservation(f"Failed to edit file: {e}") + + def browse(self, action: BrowseURLAction) -> Observation: + """Browse a URL using E2B's browser capabilities.""" + if self.sandbox is None: + return ErrorObservation("E2B sandbox not initialized") + + try: + exit_code, output = self.sandbox.execute(f"curl -s -L '{action.url}'") + if exit_code != 0: + exit_code, output = self.sandbox.execute(f"wget -qO- '{action.url}'") + + if exit_code != 0: + return ErrorObservation(f"Failed to fetch URL: {output}") + + return BrowserOutputObservation( + content=output, + url=action.url, + screenshot=None, + error=None if exit_code == 0 else output + ) + except Exception as e: + return ErrorObservation(f"Failed to browse URL: {e}") + + def browse_interactive(self, action: BrowseInteractiveAction) -> Observation: + """Interactive browsing is not supported in E2B.""" + return ErrorObservation( + "Interactive browsing is not supported in E2B runtime. " + "Use browse() for simple URL fetching or consider using a different runtime." + ) + + def list_files(self, path: str | None = None) -> list[str]: + """List files in the sandbox.""" + if self.sandbox is None: + logger.warning("Cannot list files: E2B sandbox not initialized") + return [] + + if path is None: + path = self.config.workspace_mount_path_in_sandbox or '/workspace' + + try: + exit_code, output = self.sandbox.execute(f"find {path} -maxdepth 1 -type f -o -type d") + if exit_code == 0: + files = [line.strip() for line in output.strip().split('\n') if line.strip()] + return [f.replace(path + '/', '') if f.startswith(path + '/') else f for f in files] + else: + logger.warning(f"Failed to list files in {path}: {output}") + return [] + except Exception as e: + logger.warning(f"Error listing files: {e}") + return [] + + def add_env_vars(self, env_vars: dict[str, str]) -> None: + """Add environment variables to the E2B sandbox.""" + if self.sandbox is None: + logger.warning("Cannot add env vars: E2B sandbox not initialized") + return + + if not hasattr(self, '_env_vars'): + self._env_vars = {} + self._env_vars.update(env_vars) + + for key, value in env_vars.items(): + try: + escaped_value = value.replace("'", "'\"'\"'") + cmd = f"export {key}='{escaped_value}'" + self.sandbox.execute(cmd) + logger.debug(f"Set env var: {key}") + except Exception as e: + logger.warning(f"Failed to set env var {key}: {e}") + + def get_working_directory(self) -> str: + """Get the current working directory.""" + if self.sandbox is None: + return self.config.workspace_mount_path_in_sandbox or '/workspace' + try: + exit_code, output = self.sandbox.execute("pwd") + if exit_code == 0: + return output.strip() + except Exception: + pass + return self.config.workspace_mount_path_in_sandbox or '/workspace' + + def get_mcp_config(self, extra_stdio_servers: list | None = None) -> dict: + """Get MCP configuration for E2B runtime.""" + return { + 'stdio_servers': extra_stdio_servers or [] + } + + def check_if_alive(self) -> None: + """Check if the E2B sandbox is alive.""" + if self.sandbox is None: + raise RuntimeError("E2B sandbox not initialized") + return + + def copy_to(self, host_src: str, sandbox_dest: str, recursive: bool = False) -> None: + """Copy files to the E2B sandbox.""" + if self.sandbox is None: + raise RuntimeError("E2B sandbox not initialized") + self.sandbox.copy_to(host_src, sandbox_dest, recursive) + + def get_vscode_token(self) -> str: + """E2B doesn't support VSCode integration.""" + return "" + + @classmethod + def setup(cls, config: OpenHandsConfig, headless_mode: bool = False) -> None: + """Set up the E2B runtime environment.""" + logger.info("E2B runtime setup called") + pass + + @classmethod + def teardown(cls, config: OpenHandsConfig) -> None: + """Tear down the E2B runtime environment.""" + logger.info("E2B runtime teardown called") + pass diff --git a/third_party/runtime/impl/e2b/sandbox.py b/third_party/runtime/impl/e2b/sandbox.py index 842dfeb091e8..ff5cbdbe079a 100644 --- a/third_party/runtime/impl/e2b/sandbox.py +++ b/third_party/runtime/impl/e2b/sandbox.py @@ -3,7 +3,7 @@ import tarfile from glob import glob -from e2b import Sandbox as E2BSandbox +from e2b_code_interpreter import Sandbox from e2b.exceptions import TimeoutException from openhands.core.config import SandboxConfig @@ -19,7 +19,7 @@ class E2BBox: def __init__( self, config: SandboxConfig, - template: str = "openhands", + sandbox_id: str | None = None, ): self.config = copy.deepcopy(config) self.initialize_plugins: bool = config.initialize_plugins @@ -30,20 +30,41 @@ def __init__( raise ValueError( "E2B_API_KEY environment variable is required for E2B runtime" ) + + # Read custom E2B domain if provided + e2b_domain = os.getenv("E2B_DOMAIN") + if e2b_domain: + logger.info(f'Using custom E2B domain: {e2b_domain}') - self.sandbox = E2BSandbox( - api_key=e2b_api_key, - template=template, - # It's possible to stream stdout and stderr from sandbox and from each process - on_stderr=lambda x: logger.debug(f"E2B sandbox stderr: {x}"), - on_stdout=lambda x: logger.debug(f"E2B sandbox stdout: {x}"), - cwd=self._cwd, # Default workdir inside sandbox - ) - logger.debug(f'Started E2B sandbox with ID "{self.sandbox.id}"') + # E2B v2 requires using create() method or connect to existing + try: + # Configure E2B client with custom domain if provided + create_kwargs = {} + connect_kwargs = {} + + if e2b_domain: + # Set up custom domain configuration + # Note: This depends on E2B SDK version and may need adjustment + os.environ['E2B_API_URL'] = f'https://{e2b_domain}' + logger.info(f'Set E2B_API_URL to https://{e2b_domain}') + + if sandbox_id: + # Connect to existing sandbox + self.sandbox = Sandbox.connect(sandbox_id, **connect_kwargs) + logger.info(f'Connected to existing E2B sandbox with ID "{sandbox_id}"') + else: + # Create new sandbox (e2b-code-interpreter doesn't need template) + self.sandbox = Sandbox.create(**create_kwargs) + sandbox_id = self.sandbox.sandbox_id + logger.info(f'Created E2B sandbox with ID "{sandbox_id}"') + except Exception as e: + logger.error(f"Failed to create/connect E2B sandbox: {e}") + raise @property def filesystem(self): - return self.sandbox.filesystem + # E2B v2 uses 'files' instead of 'filesystem' + return getattr(self.sandbox, 'files', None) or getattr(self.sandbox, 'filesystem', None) def _archive(self, host_src: str, recursive: bool = False): if recursive: @@ -70,21 +91,23 @@ def _archive(self, host_src: str, recursive: bool = False): def execute(self, cmd: str, timeout: int | None = None) -> tuple[int, str]: timeout = timeout if timeout is not None else self.config.timeout - process = self.sandbox.process.start(cmd, env_vars=self._env) + + # E2B code-interpreter uses commands.run() try: - process_output = process.wait(timeout=timeout) + result = self.sandbox.commands.run(cmd) + output = "" + if hasattr(result, 'stdout') and result.stdout: + output += result.stdout + if hasattr(result, 'stderr') and result.stderr: + output += result.stderr + exit_code = getattr(result, 'exit_code', 0) or 0 + return exit_code, output except TimeoutException: - logger.debug("Command timed out, killing process...") - process.kill() + logger.debug("Command timed out") return -1, f'Command: "{cmd}" timed out' - - logs = [m.line for m in process_output.messages] - logs_str = "\n".join(logs) - if process.exit_code is None: - return -1, logs_str - - assert process_output.exit_code is not None - return process_output.exit_code, logs_str + except Exception as e: + logger.error(f"Command execution failed: {e}") + return -1, str(e) def copy_to(self, host_src: str, sandbox_dest: str, recursive: bool = False): """Copies a local file or directory to the sandbox.""" @@ -98,24 +121,28 @@ def copy_to(self, host_src: str, sandbox_dest: str, recursive: bool = False): uploaded_path = self.sandbox.upload_file(tar_file) # Check if sandbox_dest exists. If not, create it. - process = self.sandbox.process.start_and_wait(f"test -d {sandbox_dest}") - if process.exit_code != 0: - self.sandbox.filesystem.make_dir(sandbox_dest) + exit_code, _ = self.execute(f"test -d {sandbox_dest}") + if exit_code != 0: + self.execute(f"mkdir -p {sandbox_dest}") # Extract the archive into the destination and delete the archive - process = self.sandbox.process.start_and_wait( + exit_code, output = self.execute( f"sudo tar -xf {uploaded_path} -C {sandbox_dest} && sudo rm {uploaded_path}" ) - if process.exit_code != 0: + if exit_code != 0: raise Exception( - f"Failed to extract {uploaded_path} to {sandbox_dest}: {process.stderr}" + f"Failed to extract {uploaded_path} to {sandbox_dest}: {output}" ) # Delete the local archive os.remove(tar_filename) def close(self): - self.sandbox.close() + # E2B v2 uses kill() instead of close() + if hasattr(self.sandbox, 'kill'): + self.sandbox.kill() + elif hasattr(self.sandbox, 'close'): + self.sandbox.close() def get_working_directory(self): return self.sandbox.cwd