- 
                Notifications
    You must be signed in to change notification settings 
- Fork 1.2k
[OPIK-2618] [BE] Isolated Subprocess Executor #3693
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
          
     Merged
      
      
            thiagohora
  merged 47 commits into
  main
from
thiagoh/OPIK-2618-isolated-subprocess-executor
  
      
      
   
  Oct 21, 2025 
      
    
  
     Merged
                    Changes from all commits
      Commits
    
    
            Show all changes
          
          
            47 commits
          
        
        Select commit
          Hold shift + click to select a range
      
      c71aaee
              
                [OPIK-2618] [BE] Java Python integration queue optimization studio
              
              
                thiagohora 812af37
              
                Fix implementation
              
              
                thiagohora 88e1b17
              
                OPIK-2618: Address PR review comments (cleanup and docs sync)
              
              
                thiagohora 7646ac7
              
                OPIK-2618: Address unresolved PR comments (import, docs, javadoc)
              
              
                thiagohora 7504994
              
                OPIK-2618: Apply @andrescrz review items (docs move, TTL 14d, snake_c…
              
              
                thiagohora 8cbdbb7
              
                OPIK-2618: Replace process_hello_world with process_optimizer_job acr…
              
              
                thiagohora 35bc399
              
                OPIK-2618: Align TTL defaults to config (no in-code defaults), rely o…
              
              
                thiagohora c8be166
              
                OPIK-2618: Address remaining review items (UUID v7 IDs, simplify Redi…
              
              
                thiagohora 9358b5c
              
                OPIK-2618: Begin splitting rq_worker into smaller modules; re-export …
              
              
                thiagohora c382871
              
                OPIK-2618: Extract MetricsWorker and NoOpDeathPenalty to opik_backend…
              
              
                thiagohora eb0cd0a
              
                OPIK-2618: Use Instant for QueueMessage timestamps; honor provided In…
              
              
                thiagohora ded8562
              
                OPIK-2618: Document immutable defaults on JobBuilder (args/kwargs)
              
              
                thiagohora df7d98c
              
                OPIK-2618: Add queues.useJacksonSerialization flag and surface in con…
              
              
                thiagohora c34b22b
              
                OPIK-2618: Introduce MapStruct RqJobMapper and use in RqJobUtils
              
              
                thiagohora 685e968
              
                OPIK-2618: Add fallback short join in RqWorkerManager.stop for forcef…
              
              
                thiagohora d52854c
              
                OPIK-2618: Make Redis health_check_interval configurable via REDIS_HE…
              
              
                thiagohora 151e2d5
              
                OPIK-2618: Let client decode Redis responses (decode_responses=True) …
              
              
                thiagohora f6547b8
              
                OPIK-2618: Use Lombok on JobStatus (@Getter, @RequiredArgsConstructor)
              
              
                thiagohora 89dc074
              
                OPIK-2618: Simplify RQ worker manager - single ping at startup, no re…
              
              
                thiagohora 15c444f
              
                OPIK-2618: Remove unused _connect_with_backoff and backoff settings; …
              
              
                thiagohora b251749
              
                OPIK-2618: Add /health/liveness and /health/readiness endpoints; Redi…
              
              
                thiagohora edb28c0
              
                OPIK-2618: Introduce shared Redis client singleton; reuse in worker m…
              
              
                thiagohora 560e206
              
                OPIK-2618: Delegate Redis connection to shared singleton; remove work…
              
              
                thiagohora af8df6f
              
                OPIK-2618: DRY RQ_WORKER_ENABLED access via is_rq_worker_enabled(); g…
              
              
                thiagohora c827851
              
                Address comments
              
              
                thiagohora f59dda8
              
                OPIK-2618: Add utils package marker to fix imports
              
              
                thiagohora 11a163a
              
                OPIK-2618: Simplify RqJobMapper - rely on same-name mapping; only con…
              
              
                thiagohora 0c003b3
              
                Fix tests
              
              
                thiagohora 29d3a6e
              
                Fix tests
              
              
                thiagohora c3cc583
              
                Fix tests
              
              
                thiagohora 48b8309
              
                [OPIK-2618] Implement IsolatedSubprocessExecutor with lifecycle manag…
              
              
                thiagohora 69d2585
              
                Merge branch 'main' into thiagoh/OPIK-2618-isolated-subprocess-executor
              
              
                thiagohora 2e65908
              
                Revision 2: Add stack memory limit (20MB) to prevent infinite recursi…
              
              
                thiagohora 8041421
              
                Merge branch 'thiagoh/OPIK-2618-isolated-subprocess-executor' of http…
              
              
                thiagohora c767df3
              
                Revision 3: Fix Copilot review comments - add exception variables and…
              
              
                thiagohora f54aa76
              
                Revision 4: Address additional Copilot comments - use 'raise' instead…
              
              
                thiagohora 2753331
              
                Revision 5: Address nitpick comments - shorten inline comment and ext…
              
              
                thiagohora 63da946
              
                Revision 6: Improve readability by using newline variable instead of …
              
              
                thiagohora 78338b0
              
                Update apps/opik-python-backend/src/opik_backend/executor_isolated.py
              
              
                thiagohora 371664b
              
                Remove doc
              
              
                thiagohora d04c749
              
                Merge branch 'thiagoh/OPIK-2618-isolated-subprocess-executor' of http…
              
              
                thiagohora 321cadd
              
                Merge branch 'main' into thiagoh/OPIK-2618-isolated-subprocess-executor
              
              
                thiagohora 17c624a
              
                Revision 2: Remove dynamic code injection from IsolatedSubprocessExec…
              
              
                thiagohora 3eed123
              
                Revision 3: Add test for executing Python file contents
              
              
                thiagohora c8fc9f2
              
                Revert "Revision 3: Add test for executing Python file contents"
              
              
                thiagohora eb1c32f
              
                Revision 4: Refactor executor to accept file paths instead of inline …
              
              
                thiagohora 11898fa
              
                Revision 5: Address all PR code review comments
              
              
                thiagohora File filter
Filter by extension
Conversations
          Failed to load comments.   
        
        
          
      Loading
        
  Jump to
        
          Jump to file
        
      
      
          Failed to load files.   
        
        
          
      Loading
        
  Diff view
Diff view
There are no files selected for viewing
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              
        
          
  
    
      
          
            351 changes: 351 additions & 0 deletions
          
          351 
        
  apps/opik-python-backend/src/opik_backend/executor_isolated.py
  
  
      
      
   
        
      
      
    
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              | Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,351 @@ | ||
| """Isolated subprocess executor with environment variable scoping.""" | ||
| import json | ||
| import logging | ||
| import os | ||
| import resource | ||
| import subprocess | ||
| import sys | ||
| import time | ||
| from typing import Optional | ||
|  | ||
| from opik_backend.executor import CodeExecutorBase | ||
|  | ||
| logger = logging.getLogger(__name__) | ||
|  | ||
| # Metrics setup | ||
| from opentelemetry import metrics | ||
|  | ||
| meter = metrics.get_meter("isolated_executor") | ||
| isolated_creation_histogram = meter.create_histogram( | ||
| name="isolated_subprocess_creation_latency", | ||
| description="Latency of isolated subprocess creation in milliseconds", | ||
| unit="ms", | ||
| ) | ||
| isolated_execution_histogram = meter.create_histogram( | ||
| name="isolated_subprocess_execution_latency", | ||
| description="Latency of isolated code execution in milliseconds", | ||
| unit="ms", | ||
| ) | ||
|  | ||
| # Memory limit for subprocesses in bytes (20MB) | ||
| SUBPROCESS_MEMORY_LIMIT_BYTES = 20 * 1024 * 1024 # 20MB | ||
|  | ||
|  | ||
| def _calculate_latency_ms(start_time): | ||
| """Calculate elapsed time in milliseconds.""" | ||
| return (time.time() - start_time) * 1000 | ||
|  | ||
|  | ||
| def _set_memory_limit(): | ||
| """Set memory limit for subprocess to 20MB. | ||
|  | ||
| Uses RLIMIT_STACK to limit only stack size. | ||
| This prevents deeply nested calls and excessive local variables | ||
| while allowing the Python interpreter and runtime heap to function normally. | ||
| """ | ||
| try: | ||
| # RLIMIT_STACK limits stack size only (local variables, call stack depth) | ||
| # Prevents stack overflow from deeply nested recursion | ||
| # Does NOT limit heap or runtime data structures | ||
| resource.setrlimit(resource.RLIMIT_STACK, (SUBPROCESS_MEMORY_LIMIT_BYTES, SUBPROCESS_MEMORY_LIMIT_BYTES)) | ||
| except Exception as e: | ||
| logger.warning(f"Failed to set stack memory limit: {e}") | ||
|  | ||
|  | ||
| class IsolatedSubprocessExecutor: | ||
| """ | ||
| Executes Python code in isolated subprocesses with environment variable scoping. | ||
|  | ||
| Each execution creates a fresh subprocess, ensuring that: | ||
| - Environment variables are scoped to each execution (no leakage between concurrent runs) | ||
| - Subprocesses are completely independent | ||
| - No shared state exists between executions | ||
| - Resources are properly cleaned up after each execution | ||
|  | ||
| This differs from ProcessExecutor which maintains a pool of reusable workers. | ||
| Use this when you need true isolation with custom environment variables per execution. | ||
| """ | ||
|  | ||
| def __init__(self, timeout_secs: int = 30): | ||
| """ | ||
| Initialize the isolated subprocess executor. | ||
|  | ||
| Args: | ||
| timeout_secs: Timeout for each execution in seconds | ||
| """ | ||
| self.timeout_secs = timeout_secs | ||
| self.logger = logging.getLogger(__name__) | ||
| self._active_processes = [] # Track active processes for cleanup | ||
| self._teardown_callbacks = [] # Callbacks to run on teardown | ||
|  | ||
| def execute( | ||
| self, | ||
| file_path: str, | ||
| data: dict, | ||
| env_vars: Optional[dict] = None, | ||
| timeout_secs: Optional[int] = None, | ||
| payload_type: Optional[str] = None, | ||
| ) -> dict: | ||
| """ | ||
| Execute Python file in an isolated subprocess with scoped environment variables. | ||
|  | ||
| Each call creates a fresh subprocess with its own isolated environment. | ||
| Environment variables passed in env_vars are scoped to the subprocess | ||
| and don't affect other concurrent executions. | ||
|  | ||
| Args: | ||
| file_path: Path to Python file to execute (e.g., '/path/to/metric.py') | ||
| data: Data dictionary to pass to the file via stdin | ||
| env_vars: Environment variables to scope to this subprocess (optional). | ||
| These override/augment the parent environment for this execution only. | ||
| timeout_secs: Execution timeout in seconds (uses default if not provided) | ||
| payload_type: Type of payload being executed (e.g., 'trace_thread') | ||
|  | ||
| Returns: | ||
| dict: Result dictionary with format: | ||
| - {"scores": [...]} on success | ||
| - {"code": error_code, "error": message} on failure | ||
| """ | ||
| timeout_secs = timeout_secs or self.timeout_secs | ||
| start_time = time.time() | ||
| process = None # Initialize to None for exception handling | ||
| result = None | ||
|  | ||
| try: | ||
| # Prepare environment for subprocess | ||
| subprocess_env = self._prepare_environment(env_vars) | ||
|  | ||
| # Create subprocess with python to execute the file directly | ||
| process = subprocess.Popen( | ||
| [sys.executable, "-u", file_path], | ||
| stdin=subprocess.PIPE, | ||
| stdout=subprocess.PIPE, | ||
| stderr=subprocess.PIPE, | ||
| env=subprocess_env, | ||
| text=True, | ||
| preexec_fn=_set_memory_limit, # Apply memory limit to subprocess | ||
| ) | ||
|  | ||
| # Track active process | ||
| self._active_processes.append(process) | ||
|  | ||
| creation_latency = _calculate_latency_ms(start_time) | ||
| isolated_creation_histogram.record(creation_latency) | ||
| self.logger.debug( | ||
| f"Created isolated subprocess. pid={process.pid}, creation_latency_ms={creation_latency:.3f}" | ||
| ) | ||
|  | ||
| # Execute code in subprocess | ||
| result = self._execute_in_subprocess( | ||
| process, data, payload_type, timeout_secs | ||
| ) | ||
|  | ||
| execution_latency = _calculate_latency_ms(start_time) | ||
| isolated_execution_histogram.record(execution_latency) | ||
| self.logger.debug( | ||
| f"Isolated subprocess execution completed. total_latency_ms={execution_latency:.3f}" | ||
| ) | ||
|  | ||
| return result | ||
|  | ||
| except subprocess.TimeoutExpired: | ||
| self.logger.error( | ||
| f"Subprocess execution timed out. timeout_secs={timeout_secs}" | ||
| ) | ||
| return { | ||
| "code": 500, | ||
| "error": f"Execution timed out after {timeout_secs} seconds", | ||
| } | ||
| except Exception as e: | ||
| self.logger.error( | ||
| f"Error during subprocess execution. error={str(e)}", exc_info=True | ||
| ) | ||
| return {"code": 500, "error": f"Failed to execute file: {str(e)}"} | ||
| finally: | ||
| # Always remove process from active list and measure total latency | ||
| self._remove_active_process(process) | ||
| total_latency = _calculate_latency_ms(start_time) | ||
| self.logger.debug( | ||
| f"Subprocess execution finished. total_latency_ms={total_latency:.3f}" | ||
| ) | ||
|  | ||
| def _remove_active_process(self, process: Optional[subprocess.Popen]) -> None: | ||
| """Remove process from active processes list if it exists.""" | ||
| if process in self._active_processes: | ||
| self._active_processes.remove(process) | ||
|  | ||
| def _prepare_environment(self, env_vars: Optional[dict] = None) -> dict: | ||
| """ | ||
| Prepare environment variables for the subprocess. | ||
|  | ||
| Starts with a copy of the parent environment and applies overrides. | ||
| This ensures the subprocess has all necessary environment variables | ||
| while allowing specific variables to be scoped to this execution. | ||
|  | ||
| Args: | ||
| env_vars: Environment variables to override/add | ||
|  | ||
| Returns: | ||
| dict: Complete environment dictionary for the subprocess | ||
| """ | ||
| env = os.environ.copy() | ||
| if env_vars: | ||
| env.update(env_vars) | ||
| return env | ||
|  | ||
| def _execute_in_subprocess( | ||
| self, | ||
| process: subprocess.Popen, | ||
| data: dict, | ||
| payload_type: Optional[str], | ||
| timeout_secs: int, | ||
| ) -> dict: | ||
| """ | ||
| Execute file in the subprocess and collect result. | ||
|  | ||
| Passes data as JSON via stdin and collects stdout for results. | ||
|  | ||
| Args: | ||
| process: Subprocess Popen instance | ||
| data: Data dictionary | ||
| payload_type: Type of payload | ||
| timeout_secs: Execution timeout | ||
|  | ||
| Returns: | ||
| dict: Execution result | ||
| """ | ||
| # Prepare input as JSON to pass via stdin | ||
| input_json = json.dumps( | ||
| { | ||
| "data": data, | ||
| "payload_type": payload_type, | ||
| } | ||
| ) | ||
|  | ||
| try: | ||
| # Send data via stdin, collect stdout | ||
| stdout, stderr = process.communicate( | ||
| input=input_json, | ||
| timeout=timeout_secs, | ||
| ) | ||
|  | ||
| # Parse result from stdout | ||
| if process.returncode == 0: | ||
| try: | ||
| result = json.loads(stdout.strip()) | ||
| return result | ||
| except json.JSONDecodeError as e: | ||
| self.logger.error(f"Failed to parse subprocess output: {stdout}") | ||
| return { | ||
| "code": 500, | ||
| "error": f"Invalid JSON response from subprocess: {str(e)}", | ||
| } | ||
| else: | ||
| self.logger.error( | ||
| f"Subprocess exited with code {process.returncode}. stderr: {stderr}" | ||
| ) | ||
| return { | ||
| "code": 500, | ||
| "error": f"Subprocess execution failed: {stderr}", | ||
| } | ||
|  | ||
| except subprocess.TimeoutExpired: | ||
| process.kill() | ||
| try: | ||
| process.wait(timeout=2) | ||
| except subprocess.TimeoutExpired: | ||
| process.kill() | ||
| raise | ||
|  | ||
| def register_teardown_callback(self, callback): | ||
| """ | ||
| Register a callback to run during teardown. | ||
|  | ||
| Args: | ||
| callback: A callable that takes no arguments and runs cleanup logic | ||
| """ | ||
| self._teardown_callbacks.append(callback) | ||
|  | ||
| def kill_process(self, pid: int, timeout: int = 2): | ||
| """ | ||
| Terminate a specific process by PID. | ||
|  | ||
| Args: | ||
| pid: Process ID to terminate | ||
| timeout: Seconds to wait before force killing | ||
|  | ||
| Returns: | ||
| bool: True if process was terminated, False if not found | ||
| """ | ||
| try: | ||
| process = next((p for p in self._active_processes if p.pid == pid), None) | ||
| if process is None: | ||
| self.logger.warning(f"Process with PID {pid} not found in active processes") | ||
| return False | ||
|  | ||
| process.terminate() | ||
| try: | ||
| process.wait(timeout=timeout) | ||
| self.logger.info(f"Process {pid} terminated gracefully") | ||
| except subprocess.TimeoutExpired: | ||
| process.kill() | ||
| process.wait() | ||
| self.logger.warning(f"Process {pid} force killed after timeout") | ||
|  | ||
| self._active_processes.remove(process) | ||
| return True | ||
| except Exception as e: | ||
| self.logger.error(f"Error killing process {pid}: {e}") | ||
| return False | ||
|  | ||
| def kill_all_processes(self, timeout: int = 2): | ||
| """ | ||
| Terminate all active processes. | ||
|  | ||
| Args: | ||
| timeout: Seconds to wait before force killing each process | ||
| """ | ||
| for process in list(self._active_processes): | ||
| try: | ||
| process.terminate() | ||
| except Exception as e: | ||
| self.logger.error(f"Error terminating process {process.pid}: {e}") | ||
|  | ||
| for process in list(self._active_processes): | ||
| try: | ||
| process.wait(timeout=timeout) | ||
| except subprocess.TimeoutExpired: | ||
| process.kill() | ||
| process.wait() | ||
| except Exception as e: | ||
| self.logger.error(f"Error waiting for process {process.pid}: {e}") | ||
|  | ||
| self._active_processes.clear() | ||
| self.logger.info("All active processes terminated") | ||
|  | ||
| def teardown(self): | ||
| """ | ||
| Run cleanup logic including killing all processes and executing teardown callbacks. | ||
| """ | ||
| self.logger.info("Running teardown...") | ||
|  | ||
| # Kill all active processes | ||
| self.kill_all_processes() | ||
|  | ||
| # Execute all teardown callbacks | ||
| for callback in self._teardown_callbacks: | ||
| try: | ||
| callback() | ||
| except Exception as e: | ||
| self.logger.error(f"Error in teardown callback: {e}") | ||
|  | ||
| self.logger.info("Teardown complete") | ||
|  | ||
| def __enter__(self): | ||
| """Context manager entry.""" | ||
| return self | ||
|  | ||
| def __exit__(self, exc_type, exc_val, exc_tb): | ||
| """Context manager exit - runs teardown.""" | ||
| self.teardown() | ||
| return False | 
      
      Oops, something went wrong.
        
    
  
  Add this suggestion to a batch that can be applied as a single commit.
  This suggestion is invalid because no changes were made to the code.
  Suggestions cannot be applied while the pull request is closed.
  Suggestions cannot be applied while viewing a subset of changes.
  Only one suggestion per line can be applied in a batch.
  Add this suggestion to a batch that can be applied as a single commit.
  Applying suggestions on deleted lines is not supported.
  You must change the existing code in this line in order to create a valid suggestion.
  Outdated suggestions cannot be applied.
  This suggestion has been applied or marked resolved.
  Suggestions cannot be applied from pending reviews.
  Suggestions cannot be applied on multi-line comments.
  Suggestions cannot be applied while the pull request is queued to merge.
  Suggestion cannot be applied right now. Please check back later.
  
    
  
    
Uh oh!
There was an error while loading. Please reload this page.