Skip to content

Commit aff60b6

Browse files
committed
Add timeout to process_visit.
This change protects against extremely long (possibly hours) delays when performing network I/O. This involves reinstating the WORKER_TIMEOUT environment variable, which was inadvertantly removed in the migration from Gunicorn to Keda. Unlike the original worker-based implementation, this one lets the process try to clean up (through normal exception handling) instead of a hard kill.
1 parent c64d454 commit aff60b6

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

python/activator/activator.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@
6060
instrument_name = os.environ["RUBIN_INSTRUMENT"]
6161
# The skymap to use in the central repo
6262
skymap = os.environ["SKYMAP"]
63+
# The maximum time to spend on a single visit.
64+
global_timeout = os.environ.get("WORKER_TIMEOUT", None)
6365
# URI to the main repository to contain processing results
6466
write_repo = os.environ["CENTRAL_REPO"]
6567
# URI to the main repository containing calibs and templates
@@ -535,7 +537,14 @@ async def _manage_process_visit(expected_visit: FannedOutVisit):
535537
This function encapsulates the use of asyncio by process_visit, while
536538
delegating all actual processing to `_process_visit_or_cancel`.
537539
"""
538-
await _process_visit_or_cancel(expected_visit)
540+
try:
541+
async with asyncio.timeout(global_timeout): # No timeout if None
542+
await _process_visit_or_cancel(expected_visit)
543+
except TimeoutError:
544+
_log.error(f"Processing of group {expected_visit.group} detector {expected_visit.detector} "
545+
f"failed to complete in {global_timeout} s. "
546+
"Outputs have *NOT* been written.")
547+
raise
539548

540549

541550
async def _process_visit_or_cancel(expected_visit: FannedOutVisit):

0 commit comments

Comments
 (0)