Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
c71aaee
[OPIK-2618] [BE] Java Python integration queue optimization studio
thiagohora Oct 15, 2025
812af37
Fix implementation
thiagohora Oct 15, 2025
88e1b17
OPIK-2618: Address PR review comments (cleanup and docs sync)
thiagohora Oct 15, 2025
7646ac7
OPIK-2618: Address unresolved PR comments (import, docs, javadoc)
thiagohora Oct 15, 2025
7504994
OPIK-2618: Apply @andrescrz review items (docs move, TTL 14d, snake_c…
thiagohora Oct 16, 2025
8cbdbb7
OPIK-2618: Replace process_hello_world with process_optimizer_job acr…
thiagohora Oct 16, 2025
35bc399
OPIK-2618: Align TTL defaults to config (no in-code defaults), rely o…
thiagohora Oct 16, 2025
c8be166
OPIK-2618: Address remaining review items (UUID v7 IDs, simplify Redi…
thiagohora Oct 16, 2025
9358b5c
OPIK-2618: Begin splitting rq_worker into smaller modules; re-export …
thiagohora Oct 16, 2025
c382871
OPIK-2618: Extract MetricsWorker and NoOpDeathPenalty to opik_backend…
thiagohora Oct 16, 2025
eb0cd0a
OPIK-2618: Use Instant for QueueMessage timestamps; honor provided In…
thiagohora Oct 16, 2025
ded8562
OPIK-2618: Document immutable defaults on JobBuilder (args/kwargs)
thiagohora Oct 16, 2025
df7d98c
OPIK-2618: Add queues.useJacksonSerialization flag and surface in con…
thiagohora Oct 16, 2025
c34b22b
OPIK-2618: Introduce MapStruct RqJobMapper and use in RqJobUtils
thiagohora Oct 16, 2025
685e968
OPIK-2618: Add fallback short join in RqWorkerManager.stop for forcef…
thiagohora Oct 16, 2025
d52854c
OPIK-2618: Make Redis health_check_interval configurable via REDIS_HE…
thiagohora Oct 16, 2025
151e2d5
OPIK-2618: Let client decode Redis responses (decode_responses=True) …
thiagohora Oct 16, 2025
f6547b8
OPIK-2618: Use Lombok on JobStatus (@Getter, @RequiredArgsConstructor)
thiagohora Oct 16, 2025
89dc074
OPIK-2618: Simplify RQ worker manager - single ping at startup, no re…
thiagohora Oct 16, 2025
15c444f
OPIK-2618: Remove unused _connect_with_backoff and backoff settings; …
thiagohora Oct 16, 2025
b251749
OPIK-2618: Add /health/liveness and /health/readiness endpoints; Redi…
thiagohora Oct 16, 2025
edb28c0
OPIK-2618: Introduce shared Redis client singleton; reuse in worker m…
thiagohora Oct 16, 2025
560e206
OPIK-2618: Delegate Redis connection to shared singleton; remove work…
thiagohora Oct 16, 2025
af8df6f
OPIK-2618: DRY RQ_WORKER_ENABLED access via is_rq_worker_enabled(); g…
thiagohora Oct 16, 2025
c827851
Address comments
thiagohora Oct 16, 2025
f59dda8
OPIK-2618: Add utils package marker to fix imports
thiagohora Oct 16, 2025
11a163a
OPIK-2618: Simplify RqJobMapper - rely on same-name mapping; only con…
thiagohora Oct 16, 2025
0c003b3
Fix tests
thiagohora Oct 16, 2025
29d3a6e
Fix tests
thiagohora Oct 16, 2025
c3cc583
Fix tests
thiagohora Oct 16, 2025
48b8309
[OPIK-2618] Implement IsolatedSubprocessExecutor with lifecycle manag…
thiagohora Oct 17, 2025
69d2585
Merge branch 'main' into thiagoh/OPIK-2618-isolated-subprocess-executor
thiagohora Oct 17, 2025
2e65908
Revision 2: Add stack memory limit (20MB) to prevent infinite recursi…
thiagohora Oct 17, 2025
8041421
Merge branch 'thiagoh/OPIK-2618-isolated-subprocess-executor' of http…
thiagohora Oct 17, 2025
c767df3
Revision 3: Fix Copilot review comments - add exception variables and…
thiagohora Oct 17, 2025
f54aa76
Revision 4: Address additional Copilot comments - use 'raise' instead…
thiagohora Oct 17, 2025
2753331
Revision 5: Address nitpick comments - shorten inline comment and ext…
thiagohora Oct 17, 2025
63da946
Revision 6: Improve readability by using newline variable instead of …
thiagohora Oct 17, 2025
78338b0
Update apps/opik-python-backend/src/opik_backend/executor_isolated.py
thiagohora Oct 20, 2025
371664b
Remove doc
thiagohora Oct 20, 2025
d04c749
Merge branch 'thiagoh/OPIK-2618-isolated-subprocess-executor' of http…
thiagohora Oct 20, 2025
321cadd
Merge branch 'main' into thiagoh/OPIK-2618-isolated-subprocess-executor
thiagohora Oct 20, 2025
17c624a
Revision 2: Remove dynamic code injection from IsolatedSubprocessExec…
thiagohora Oct 21, 2025
3eed123
Revision 3: Add test for executing Python file contents
thiagohora Oct 21, 2025
c8fc9f2
Revert "Revision 3: Add test for executing Python file contents"
thiagohora Oct 21, 2025
eb1c32f
Revision 4: Refactor executor to accept file paths instead of inline …
thiagohora Oct 21, 2025
11898fa
Revision 5: Address all PR code review comments
thiagohora Oct 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions apps/opik-python-backend/src/opik_backend/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,16 @@ def close_redis_client(exception):
try:
client = get_redis_client()
client.close()
except Exception:
pass
except Exception as e:
app.logger.warning(f"Error closing Redis client: {e}")

# Also close on process exit
def _close_redis_on_exit():
try:
client = get_redis_client()
client.close()
except Exception:
pass
except Exception as e:
app.logger.warning(f"Error closing Redis client: {e}")

atexit.register(_close_redis_on_exit)

Expand Down
351 changes: 351 additions & 0 deletions apps/opik-python-backend/src/opik_backend/executor_isolated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,351 @@
"""Isolated subprocess executor with environment variable scoping."""
import json
import logging
import os
import resource
import subprocess
import sys
import time
from typing import Optional

from opik_backend.executor import CodeExecutorBase

logger = logging.getLogger(__name__)

# Metrics setup
from opentelemetry import metrics

meter = metrics.get_meter("isolated_executor")
isolated_creation_histogram = meter.create_histogram(
name="isolated_subprocess_creation_latency",
description="Latency of isolated subprocess creation in milliseconds",
unit="ms",
)
isolated_execution_histogram = meter.create_histogram(
name="isolated_subprocess_execution_latency",
description="Latency of isolated code execution in milliseconds",
unit="ms",
)

# Memory limit for subprocesses in bytes (20MB)
SUBPROCESS_MEMORY_LIMIT_BYTES = 20 * 1024 * 1024 # 20MB


def _calculate_latency_ms(start_time):
"""Calculate elapsed time in milliseconds."""
return (time.time() - start_time) * 1000


def _set_memory_limit():
"""Set memory limit for subprocess to 20MB.

Uses RLIMIT_STACK to limit only stack size.
This prevents deeply nested calls and excessive local variables
while allowing the Python interpreter and runtime heap to function normally.
"""
try:
# RLIMIT_STACK limits stack size only (local variables, call stack depth)
# Prevents stack overflow from deeply nested recursion
# Does NOT limit heap or runtime data structures
resource.setrlimit(resource.RLIMIT_STACK, (SUBPROCESS_MEMORY_LIMIT_BYTES, SUBPROCESS_MEMORY_LIMIT_BYTES))
except Exception as e:
logger.warning(f"Failed to set stack memory limit: {e}")


class IsolatedSubprocessExecutor:
"""
Executes Python code in isolated subprocesses with environment variable scoping.

Each execution creates a fresh subprocess, ensuring that:
- Environment variables are scoped to each execution (no leakage between concurrent runs)
- Subprocesses are completely independent
- No shared state exists between executions
- Resources are properly cleaned up after each execution

This differs from ProcessExecutor which maintains a pool of reusable workers.
Use this when you need true isolation with custom environment variables per execution.
"""

def __init__(self, timeout_secs: int = 30):
"""
Initialize the isolated subprocess executor.

Args:
timeout_secs: Timeout for each execution in seconds
"""
self.timeout_secs = timeout_secs
self.logger = logging.getLogger(__name__)
self._active_processes = [] # Track active processes for cleanup
self._teardown_callbacks = [] # Callbacks to run on teardown

def execute(
self,
file_path: str,
data: dict,
env_vars: Optional[dict] = None,
timeout_secs: Optional[int] = None,
payload_type: Optional[str] = None,
) -> dict:
"""
Execute Python file in an isolated subprocess with scoped environment variables.

Each call creates a fresh subprocess with its own isolated environment.
Environment variables passed in env_vars are scoped to the subprocess
and don't affect other concurrent executions.

Args:
file_path: Path to Python file to execute (e.g., '/path/to/metric.py')
data: Data dictionary to pass to the file via stdin
env_vars: Environment variables to scope to this subprocess (optional).
These override/augment the parent environment for this execution only.
timeout_secs: Execution timeout in seconds (uses default if not provided)
payload_type: Type of payload being executed (e.g., 'trace_thread')

Returns:
dict: Result dictionary with format:
- {"scores": [...]} on success
- {"code": error_code, "error": message} on failure
"""
timeout_secs = timeout_secs or self.timeout_secs
start_time = time.time()
process = None # Initialize to None for exception handling
result = None

try:
# Prepare environment for subprocess
subprocess_env = self._prepare_environment(env_vars)

# Create subprocess with python to execute the file directly
process = subprocess.Popen(
[sys.executable, "-u", file_path],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=subprocess_env,
text=True,
preexec_fn=_set_memory_limit, # Apply memory limit to subprocess
)

# Track active process
self._active_processes.append(process)

creation_latency = _calculate_latency_ms(start_time)
isolated_creation_histogram.record(creation_latency)
self.logger.debug(
f"Created isolated subprocess. pid={process.pid}, creation_latency_ms={creation_latency:.3f}"
)

# Execute code in subprocess
result = self._execute_in_subprocess(
process, data, payload_type, timeout_secs
)

execution_latency = _calculate_latency_ms(start_time)
isolated_execution_histogram.record(execution_latency)
self.logger.debug(
f"Isolated subprocess execution completed. total_latency_ms={execution_latency:.3f}"
)

return result

except subprocess.TimeoutExpired:
self.logger.error(
f"Subprocess execution timed out. timeout_secs={timeout_secs}"
)
return {
"code": 500,
"error": f"Execution timed out after {timeout_secs} seconds",
}
except Exception as e:
self.logger.error(
f"Error during subprocess execution. error={str(e)}", exc_info=True
)
return {"code": 500, "error": f"Failed to execute file: {str(e)}"}
finally:
# Always remove process from active list and measure total latency
self._remove_active_process(process)
total_latency = _calculate_latency_ms(start_time)
self.logger.debug(
f"Subprocess execution finished. total_latency_ms={total_latency:.3f}"
)

def _remove_active_process(self, process: Optional[subprocess.Popen]) -> None:
"""Remove process from active processes list if it exists."""
if process in self._active_processes:
self._active_processes.remove(process)

def _prepare_environment(self, env_vars: Optional[dict] = None) -> dict:
"""
Prepare environment variables for the subprocess.

Starts with a copy of the parent environment and applies overrides.
This ensures the subprocess has all necessary environment variables
while allowing specific variables to be scoped to this execution.

Args:
env_vars: Environment variables to override/add

Returns:
dict: Complete environment dictionary for the subprocess
"""
env = os.environ.copy()
if env_vars:
env.update(env_vars)
return env

def _execute_in_subprocess(
self,
process: subprocess.Popen,
data: dict,
payload_type: Optional[str],
timeout_secs: int,
) -> dict:
"""
Execute file in the subprocess and collect result.

Passes data as JSON via stdin and collects stdout for results.

Args:
process: Subprocess Popen instance
data: Data dictionary
payload_type: Type of payload
timeout_secs: Execution timeout

Returns:
dict: Execution result
"""
# Prepare input as JSON to pass via stdin
input_json = json.dumps(
{
"data": data,
"payload_type": payload_type,
}
)

try:
# Send data via stdin, collect stdout
stdout, stderr = process.communicate(
input=input_json,
timeout=timeout_secs,
)

# Parse result from stdout
if process.returncode == 0:
try:
result = json.loads(stdout.strip())
return result
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse subprocess output: {stdout}")
return {
"code": 500,
"error": f"Invalid JSON response from subprocess: {str(e)}",
}
else:
self.logger.error(
f"Subprocess exited with code {process.returncode}. stderr: {stderr}"
)
return {
"code": 500,
"error": f"Subprocess execution failed: {stderr}",
}

except subprocess.TimeoutExpired:
process.kill()
try:
process.wait(timeout=2)
except subprocess.TimeoutExpired:
process.kill()
raise

def register_teardown_callback(self, callback):
"""
Register a callback to run during teardown.

Args:
callback: A callable that takes no arguments and runs cleanup logic
"""
self._teardown_callbacks.append(callback)

def kill_process(self, pid: int, timeout: int = 2):
"""
Terminate a specific process by PID.

Args:
pid: Process ID to terminate
timeout: Seconds to wait before force killing

Returns:
bool: True if process was terminated, False if not found
"""
try:
process = next((p for p in self._active_processes if p.pid == pid), None)
if process is None:
self.logger.warning(f"Process with PID {pid} not found in active processes")
return False

process.terminate()
try:
process.wait(timeout=timeout)
self.logger.info(f"Process {pid} terminated gracefully")
except subprocess.TimeoutExpired:
process.kill()
process.wait()
self.logger.warning(f"Process {pid} force killed after timeout")

self._active_processes.remove(process)
return True
except Exception as e:
self.logger.error(f"Error killing process {pid}: {e}")
return False

def kill_all_processes(self, timeout: int = 2):
"""
Terminate all active processes.

Args:
timeout: Seconds to wait before force killing each process
"""
for process in list(self._active_processes):
try:
process.terminate()
except Exception as e:
self.logger.error(f"Error terminating process {process.pid}: {e}")

for process in list(self._active_processes):
try:
process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
except Exception as e:
self.logger.error(f"Error waiting for process {process.pid}: {e}")

self._active_processes.clear()
self.logger.info("All active processes terminated")

def teardown(self):
"""
Run cleanup logic including killing all processes and executing teardown callbacks.
"""
self.logger.info("Running teardown...")

# Kill all active processes
self.kill_all_processes()

# Execute all teardown callbacks
for callback in self._teardown_callbacks:
try:
callback()
except Exception as e:
self.logger.error(f"Error in teardown callback: {e}")

self.logger.info("Teardown complete")

def __enter__(self):
"""Context manager entry."""
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - runs teardown."""
self.teardown()
return False
Loading