From 4897ddf4abf7b7fa23e3727ce954f7acafe41940 Mon Sep 17 00:00:00 2001 From: Christian Meesters Date: Tue, 12 Aug 2025 19:15:37 +0200 Subject: [PATCH 1/3] refactor: first step to outsource query command --- .../job_status_query.py | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 snakemake_executor_plugin_slurm/job_status_query.py diff --git a/snakemake_executor_plugin_slurm/job_status_query.py b/snakemake_executor_plugin_slurm/job_status_query.py new file mode 100644 index 0000000..3253567 --- /dev/null +++ b/snakemake_executor_plugin_slurm/job_status_query.py @@ -0,0 +1,66 @@ +import shlex +from datetime import datetime, timedelta + + + +def query_job_status_sacct(jobuid, timeout: int = 30) -> Dict[str, JobStatus]: + """ + Query job status using sacct command + + Args: + job_ids: List of SLURM job IDs + timeout: Timeout in seconds for subprocess call + + Returns: + Dictionary mapping job ID to JobStatus object + """ + if not jobuid: + return {} + + # We use this sacct syntax for argument 'starttime' to keep it compatible + # with slurm < 20.11 + sacct_starttime = f"{datetime.now() - timedelta(days=2):%Y-%m-%dT%H:00}" + # previously we had + # f"--starttime now-2days --endtime now --name {self.run_uuid}" + # in line 218 - once v20.11 is definitively not in use any more, + # the more readable version ought to be re-adapted + + try: + # -X: only show main job, no substeps + query_command = f"""sacct -X --parsable2 \ + --clusters all \ + --noheader --format=JobIdRaw,State \ + --starttime {sacct_starttime} \ + --endtime now --name {self.run_uuid}""" + + # for better redability in verbose output + query_command = " ".join(shlex.split(query_command)) + + return query_command + +def query_job_status_squeue(job_ids: List[str], timeout: int = 30) -> Dict[str, JobStatus]: + """ + Query job status using squeue command (newer SLURM functionality) + + Args: + job_ids: List of SLURM job IDs + timeout: Timeout in seconds for subprocess call + + Returns: + Dictionary mapping job ID to JobStatus object + """ + if not job_ids: + return {} + + try: + # Build squeue command + query_command = """ + squeue \ + --format=%i|%T \ + --states=all \ + --noheader \ + --name {self.run_uuid} + """ + query_command = shlex.split(query_command) + + return query_command \ No newline at end of file From 44c42d7447a37f7999ede488622006e8e9a42283 Mon Sep 17 00:00:00 2001 From: meesters Date: Wed, 13 Aug 2025 16:25:57 +0200 Subject: [PATCH 2/3] refactor: working on the interface - attempt to make an option optional --- snakemake_executor_plugin_slurm/__init__.py | 358 ++++++++++++------ .../job_status_query.py | 136 +++++-- 2 files changed, 355 insertions(+), 139 deletions(-) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 64502de..0f274a0 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -5,6 +5,7 @@ import csv from io import StringIO +import logging import os from pathlib import Path import re @@ -32,122 +33,219 @@ delete_empty_dirs, set_gres_string, ) +from .job_status_query import ( + get_min_job_age, + is_sacct_available, + should_enable_status_command_option, +) from .efficiency_report import create_efficiency_report from .submit_string import get_submit_command +from .job_status_query import ( + should_enable_status_command_option, + get_min_job_age, + is_query_tool_available, +) + + +def _get_status_command_default(): + """Get smart default for status_command based on cluster configuration.""" + should_enable = should_enable_status_command_option() + sacct_available = is_query_tool_available("sacct") + squeue_available = is_query_tool_available("squeue") + + if should_enable and sacct_available: + return "sacct" + elif not should_enable and squeue_available: + return "squeue" + else: # neither command is available or reliable + raise WorkflowError( + "No suitable job status query tool available: " + "'sacct' seems missing and 'squeue' can only display a job status " + f"for finished jobs for {get_min_job_age()} seconds." + ) + + +def _create_executor_settings_class(): + """Create ExecutorSettings class with conditional fields based on cluster configuration.""" + should_enable = should_enable_status_command_option() + + # Base fields that are always present + base_fields = { + "logdir": ( + Optional[Path], + field( + default=None, + metadata={ + "help": "Per default the SLURM log directory is relative to " + "the working directory." + "This flag allows to set an alternative directory.", + "env_var": False, + "required": False, + }, + ), + ), + "keep_successful_logs": ( + bool, + field( + default=False, + metadata={ + "help": "Per default SLURM log files will be deleted upon sucessful " + "completion of a job. Whenever a SLURM job fails, its log " + "file will be preserved. " + "This flag allows to keep all SLURM log files, even those " + "of successful jobs.", + "env_var": False, + "required": False, + }, + ), + ), + "delete_logfiles_older_than": ( + Optional[int], + field( + default=10, + metadata={ + "help": "Per default SLURM log files in the SLURM log directory " + "of a workflow will be deleted after 10 days. For this, " + "best leave the default log directory unaltered. " + "Setting this flag allows to change this behaviour. " + "If set to <=0, no old files will be deleted. ", + }, + ), + ), + "init_seconds_before_status_checks": ( + Optional[int], + field( + default=40, + metadata={ + "help": "Defines the time in seconds before the first status " + "check is performed after job submission.", + "env_var": False, + "required": False, + }, + ), + ), + "status_attempts": ( + Optional[int], + field( + default=5, + metadata={ + "help": "Defines the number of attempts to query the status of " + "all active jobs. If the status query fails, the next attempt " + "will be performed after the next status check interval." + "The default is 5 status attempts before giving up. The maximum " + "time between status checks is 180 seconds.", + "env_var": False, + "required": False, + }, + ), + ), + "requeue": ( + bool, + field( + default=False, + metadata={ + "help": "Allow requeuing preempted of failed jobs, " + "if no cluster default. Results in " + "`sbatch ... --requeue ...` " + "This flag has no effect, if not set.", + "env_var": False, + "required": False, + }, + ), + ), + "no_account": ( + bool, + field( + default=False, + metadata={ + "help": "Do not use any account for submission. " + "This flag has no effect, if not set.", + "env_var": False, + "required": False, + }, + ), + ), + "efficiency_report": ( + bool, + field( + default=False, + metadata={ + "help": "Generate an efficiency report at the end of the workflow. " + "This flag has no effect, if not set.", + "env_var": False, + "required": False, + }, + ), + ), + "efficiency_report_path": ( + Optional[Path], + field( + default=None, + metadata={ + "help": "Path to the efficiency report file. " + "If not set, the report will be written to " + "the current working directory with the name " + "'efficiency_report_.csv'. " + "This flag has no effect, if not set.", + "env_var": False, + "required": False, + }, + ), + ), + "efficiency_threshold": ( + Optional[float], + field( + default=0.8, + metadata={ + "help": "The efficiency threshold for the efficiency report. " + "Jobs with an efficiency below this threshold will be reported. " + "This flag has no effect, if not set.", + }, + ), + ), + "reservation": ( + Optional[str], + field( + default=None, + metadata={ + "help": "If set, the given reservation will be used for job submission.", + "env_var": False, + "required": False, + }, + ), + ), + } + + # Add status_command field only if needed + if should_enable: + base_fields["status_command"] = ( + Optional[str], + field( + default_factory=_get_status_command_default, + metadata={ + "help": "Allows to choose between 'sacct' (the default) and 'squeue'.", + "env_var": False, + "required": False, + }, + ), + ) + + def post_init(self): + """Validate settings after initialization.""" + # Note: Validation with logging is handled in the Executor class + # where self.logger is available. This method is kept for potential + # future validation that doesn't require logging. + pass + + # Add the __post_init__ method + base_fields["__post_init__"] = post_init + # Create the class dynamically + return dataclass(type("ExecutorSettings", (ExecutorSettingsBase,), base_fields)) -@dataclass -class ExecutorSettings(ExecutorSettingsBase): - logdir: Optional[Path] = field( - default=None, - metadata={ - "help": "Per default the SLURM log directory is relative to " - "the working directory." - "This flag allows to set an alternative directory.", - "env_var": False, - "required": False, - }, - ) - keep_successful_logs: bool = field( - default=False, - metadata={ - "help": "Per default SLURM log files will be deleted upon sucessful " - "completion of a job. Whenever a SLURM job fails, its log " - "file will be preserved. " - "This flag allows to keep all SLURM log files, even those " - "of successful jobs.", - "env_var": False, - "required": False, - }, - ) - delete_logfiles_older_than: Optional[int] = field( - default=10, - metadata={ - "help": "Per default SLURM log files in the SLURM log directory " - "of a workflow will be deleted after 10 days. For this, " - "best leave the default log directory unaltered. " - "Setting this flag allows to change this behaviour. " - "If set to <=0, no old files will be deleted. ", - }, - ) - init_seconds_before_status_checks: Optional[int] = field( - default=40, - metadata={ - "help": "Defines the time in seconds before the first status " - "check is performed after job submission.", - "env_var": False, - "required": False, - }, - ) - status_attempts: Optional[int] = field( - default=5, - metadata={ - "help": "Defines the number of attempts to query the status of " - "all active jobs. If the status query fails, the next attempt " - "will be performed after the next status check interval." - "The default is 5 status attempts before giving up. The maximum " - "time between status checks is 180 seconds.", - "env_var": False, - "required": False, - }, - ) - requeue: bool = field( - default=False, - metadata={ - "help": "Allow requeuing preempted of failed jobs, " - "if no cluster default. Results in " - "`sbatch ... --requeue ...` " - "This flag has no effect, if not set.", - "env_var": False, - "required": False, - }, - ) - no_account: bool = field( - default=False, - metadata={ - "help": "Do not use any account for submission. " - "This flag has no effect, if not set.", - "env_var": False, - "required": False, - }, - ) - efficiency_report: bool = field( - default=False, - metadata={ - "help": "Generate an efficiency report at the end of the workflow. " - "This flag has no effect, if not set.", - "env_var": False, - "required": False, - }, - ) - efficiency_report_path: Optional[Path] = field( - default=None, - metadata={ - "help": "Path to the efficiency report file. " - "If not set, the report will be written to " - "the current working directory with the name " - "'efficiency_report_.csv'. " - "This flag has no effect, if not set.", - "env_var": False, - "required": False, - }, - ) - efficiency_threshold: Optional[float] = field( - default=0.8, - metadata={ - "help": "The efficiency threshold for the efficiency report. " - "Jobs with an efficiency below this threshold will be reported. " - "This flag has no effect, if not set.", - }, - ) - reservation: Optional[str] = field( - default=None, - metadata={ - "help": "If set, the given reservation will be used for job submission.", - "env_var": False, - "required": False, - }, - ) + +# Create the actual ExecutorSettings class +ExecutorSettings = _create_executor_settings_class() # Required: @@ -192,6 +290,42 @@ def __post_init__(self, test_mode: bool = False): else Path(".snakemake/slurm_logs").resolve() ) + # Validate status_command configuration if the field exists + self._validate_status_command_settings() + + def _validate_status_command_settings(self): + """Validate and provide feedback about status_command configuration.""" + if hasattr(self.workflow.executor_settings, "status_command"): + status_command = self.workflow.executor_settings.status_command + if status_command: + min_job_age = get_min_job_age() + sacct_available = is_sacct_available() + + if not sacct_available and status_command == "sacct": + self.logger.warning( + "The 'sacct' command is not available on this system. " + "Using 'squeue' instead for job status queries." + ) + elif sacct_available and min_job_age is not None: + if min_job_age < 43200 and status_command == "sacct": + self.logger.warning( + f"MinJobAge is {min_job_age} seconds (< 43200s). " + "This may cause 'sacct' to report inaccurate job states and the status_command option may be unreliable." + ) + elif min_job_age >= 43200 and status_command == "squeue": + self.logger.warning( + f"MinJobAge is {min_job_age} seconds (>= 43200s). " + "The 'squeue' command should work reliably within 12 hours run time." + ) + + def get_status_command(self): + """Get the status command to use, with fallback logic.""" + if hasattr(self.workflow.executor_settings, "status_command"): + return self.workflow.executor_settings.status_command + else: + # Fallback: determine the best command based on cluster configuration + return _get_status_command_default() + def shutdown(self) -> None: """ Shutdown the executor. diff --git a/snakemake_executor_plugin_slurm/job_status_query.py b/snakemake_executor_plugin_slurm/job_status_query.py index 3253567..c41e376 100644 --- a/snakemake_executor_plugin_slurm/job_status_query.py +++ b/snakemake_executor_plugin_slurm/job_status_query.py @@ -1,22 +1,108 @@ import shlex +import subprocess +import re from datetime import datetime, timedelta +def get_min_job_age(): + """ + Runs 'scontrol show config', parses the output, and extracts the MinJobAge value. + Returns the value as an integer (seconds), or None if not found or parse error. + Handles various time units: s/sec/secs/seconds, h/hours, or no unit (assumes seconds). + """ + try: + cmd = "scontrol show config" + cmd = shlex.split(cmd) + output = subprocess.check_output(cmd, text=True, stderr=subprocess.PIPE) + except subprocess.CalledProcessError: + return None + + for line in output.splitlines(): + if line.strip().startswith("MinJobAge"): + # Example: MinJobAge = 300 sec + # MinJobAge = 1h + # MinJobAge = 3600 + parts = line.split("=") + if len(parts) < 2: + continue + value_part = parts[1].strip() + + # Use regex to parse value and optional unit + # Pattern matches: number + optional whitespace + optional unit + match = re.match(r"^(\d+)\s*([a-zA-Z]*)", value_part) + if not match: + continue + + value_str = match.group(1) + unit = match.group(2).lower() if match.group(2) else "" + + try: + value = int(value_str) + + # Convert to seconds based on unit + if unit in ("h", "hour", "hours"): + return value * 3600 + elif unit in ("s", "sec", "secs", "second", "seconds", ""): + return value + else: + # Unknown unit, assume seconds + return value + + except ValueError: + return None + return None + + +def is_query_tool_available(tool_name): + """ + Check if the sacct command is available on the system. + Returns True if sacct is available, False otherwise. + """ + cmd = f"which {tool_name}" + cmd = shlex.split(cmd) + try: + subprocess.check_output(cmd, stderr=subprocess.PIPE) + return True + except subprocess.CalledProcessError: + return False + -def query_job_status_sacct(jobuid, timeout: int = 30) -> Dict[str, JobStatus]: +def should_enable_status_command_option(): + """ + Determine if the status_command option should be available based on: + 1. MinJobAge configuration (if very low, squeue might not work well) + 2. Availability of sacct command + + Returns True if the option should be available, False otherwise. + """ + min_job_age = get_min_job_age() + sacct_available = is_sacct_available() + + # If MinJobAge is very low (e.g., > 43200 seconds), squeue might work for job status queries + # Howver, `sacct` is the preferred command for job status queries: + # The SLURM accounting database will answer queries for a huge number of jobs + # more reliably than `squeue`, which might not be configured to show past jobs + # on every cluster. + if ( + min_job_age is not None and min_job_age > 43200 and sacct_available + ): # 43200 seconds = 12 hours + return True + + # In other cases, sacct should work fine and the option might not be needed + return False + + +def query_job_status_sacct(jobuid) -> list: """ Query job status using sacct command - + Args: job_ids: List of SLURM job IDs timeout: Timeout in seconds for subprocess call - + Returns: Dictionary mapping job ID to JobStatus object """ - if not jobuid: - return {} - # We use this sacct syntax for argument 'starttime' to keep it compatible # with slurm < 20.11 sacct_starttime = f"{datetime.now() - timedelta(days=2):%Y-%m-%dT%H:00}" @@ -25,42 +111,38 @@ def query_job_status_sacct(jobuid, timeout: int = 30) -> Dict[str, JobStatus]: # in line 218 - once v20.11 is definitively not in use any more, # the more readable version ought to be re-adapted - try: - # -X: only show main job, no substeps - query_command = f"""sacct -X --parsable2 \ + # -X: only show main job, no substeps + query_command = f"""sacct -X --parsable2 \ --clusters all \ --noheader --format=JobIdRaw,State \ --starttime {sacct_starttime} \ - --endtime now --name {self.run_uuid}""" - - # for better redability in verbose output - query_command = " ".join(shlex.split(query_command)) + --endtime now --name {jobuid}""" + + # for better redability in verbose output + query_command = " ".join(shlex.split(query_command)) + + return query_command - return query_command -def query_job_status_squeue(job_ids: List[str], timeout: int = 30) -> Dict[str, JobStatus]: +def query_job_status_squeue(jobuid) -> list: """ Query job status using squeue command (newer SLURM functionality) - + Args: job_ids: List of SLURM job IDs timeout: Timeout in seconds for subprocess call - + Returns: Dictionary mapping job ID to JobStatus object """ - if not job_ids: - return {} - - try: - # Build squeue command - query_command = """ - squeue \ + # Build squeue command + query_command = """ + squeue \ --format=%i|%T \ --states=all \ --noheader \ - --name {self.run_uuid} + --name {jobuid} """ - query_command = shlex.split(query_command) + query_command = shlex.split(query_command) - return query_command \ No newline at end of file + return query_command From 82cae3f9407aa24dbfb008979e8467e7523d9c66 Mon Sep 17 00:00:00 2001 From: meesters Date: Mon, 18 Aug 2025 14:05:06 +0200 Subject: [PATCH 3/3] feat: adjusting dynamiccally adjust for query wait times instead of a staticcally fullfilled minjobage --- snakemake_executor_plugin_slurm/__init__.py | 27 ++++++++++++------- .../job_status_query.py | 12 ++++++--- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 0f274a0..15dc978 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -35,7 +35,7 @@ ) from .job_status_query import ( get_min_job_age, - is_sacct_available, + is_query_tool_available, should_enable_status_command_option, ) from .efficiency_report import create_efficiency_report @@ -67,7 +67,9 @@ def _get_status_command_default(): def _create_executor_settings_class(): """Create ExecutorSettings class with conditional fields based on cluster configuration.""" - should_enable = should_enable_status_command_option() + # Use a conservative default threshold for class creation (3 * 40s = 120s) + # The actual validation in the executor will use the configured init_seconds_before_status_checks + should_enable = should_enable_status_command_option(min_threshold_seconds=120) # Base fields that are always present base_fields = { @@ -299,7 +301,12 @@ def _validate_status_command_settings(self): status_command = self.workflow.executor_settings.status_command if status_command: min_job_age = get_min_job_age() - sacct_available = is_sacct_available() + sacct_available = is_query_tool_available("sacct") + + # Calculate dynamic threshold: 3 times the initial status check interval + # The plugin starts with 40 seconds and increases, so we use (3 * 10) + 40 = 70 seconds as minimum + between_status_check_seconds = getattr(self.workflow.executor_settings, 'seconds_between_status_checks', 70) + dynamic_check_threshold = 3 * between_status_check_seconds if not sacct_available and status_command == "sacct": self.logger.warning( @@ -307,15 +314,17 @@ def _validate_status_command_settings(self): "Using 'squeue' instead for job status queries." ) elif sacct_available and min_job_age is not None: - if min_job_age < 43200 and status_command == "sacct": + if min_job_age < dynamic_check_threshold and status_command == "sacct": self.logger.warning( - f"MinJobAge is {min_job_age} seconds (< 43200s). " - "This may cause 'sacct' to report inaccurate job states and the status_command option may be unreliable." + f"MinJobAge is {min_job_age} seconds (< {dynamic_checkthreshold}s). " + f"This may cause 'sacct' to report inaccurate job states and the status_command option may be unreliable. " + f"(Threshold is 3x status check interval: 3 × {initial_status_check_seconds}s = {dynamic_threshold}s)" ) - elif min_job_age >= 43200 and status_command == "squeue": + elif min_job_age >= dynamic_check_threshold and status_command == "squeue": self.logger.warning( - f"MinJobAge is {min_job_age} seconds (>= 43200s). " - "The 'squeue' command should work reliably within 12 hours run time." + f"MinJobAge is {min_job_age} seconds (>= {dynamic_threshold}s). " + f"The 'squeue' command should work reliably for status queries. " + f"(Threshold is 3x status check interval: 3 × {initial_status_check_seconds}s = {dynamic_threshold}s)" ) def get_status_command(self): diff --git a/snakemake_executor_plugin_slurm/job_status_query.py b/snakemake_executor_plugin_slurm/job_status_query.py index c41e376..90167ca 100644 --- a/snakemake_executor_plugin_slurm/job_status_query.py +++ b/snakemake_executor_plugin_slurm/job_status_query.py @@ -73,19 +73,23 @@ def should_enable_status_command_option(): 1. MinJobAge configuration (if very low, squeue might not work well) 2. Availability of sacct command + Args: + min_threshold_seconds: The minimum threshold in seconds for MinJobAge to be considered sufficient. + Default is 120 seconds (3 * 40s, where 40s is the default initial status check interval). + Returns True if the option should be available, False otherwise. """ min_job_age = get_min_job_age() sacct_available = is_sacct_available() - # If MinJobAge is very low (e.g., > 43200 seconds), squeue might work for job status queries - # Howver, `sacct` is the preferred command for job status queries: + # If MinJobAge is sufficient (>= threshold), squeue might work for job status queries + # However, `sacct` is the preferred command for job status queries: # The SLURM accounting database will answer queries for a huge number of jobs # more reliably than `squeue`, which might not be configured to show past jobs # on every cluster. if ( - min_job_age is not None and min_job_age > 43200 and sacct_available - ): # 43200 seconds = 12 hours + min_job_age is not None and min_job_age >= min_threshold_seconds and sacct_available + ): return True # In other cases, sacct should work fine and the option might not be needed