Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 164 additions & 27 deletions snakemake_executor_plugin_slurm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import csv
from io import StringIO
import logging
import os
from pathlib import Path
import re
Expand Down Expand Up @@ -32,76 +33,131 @@
delete_empty_dirs,
set_gres_string,
)
from .job_status_query import (
get_min_job_age,
is_query_tool_available,
should_recommend_squeue_status_command,
)
from .efficiency_report import create_efficiency_report
from .submit_string import get_submit_command


def _get_status_command_default():
"""Get smart default for status_command based on cluster configuration."""
sacct_available = is_query_tool_available("sacct")
squeue_available = is_query_tool_available("squeue")
# squeue is assumed to always be available on SLURM clusters

if not squeue_available and not sacct_available:
raise WorkflowError(
"Neither 'sacct' nor 'squeue' commands are available on this system. "
"At least one of these commands is required for job status queries."
)
if sacct_available:
return "sacct"
else:
return "squeue"


def _get_status_command_help():
"""Get help text with computed default."""
default_cmd = _get_status_command_default()
sacct_available = is_query_tool_available("sacct")
squeue_recommended = should_recommend_squeue_status_command()

base_help = "Command to query job status. Options: 'sacct', 'squeue'. "

if default_cmd == "sacct":
if sacct_available and not squeue_recommended:
info = (
"'sacct' detected and will be used "
"(MinJobAge may be too low for reliable 'squeue' usage)"
)
else:
info = "'sacct' detected and will be used"
else: # default_cmd == "squeue"
if squeue_recommended:
# cumbersome, due to black and the need to stay below 80 chars
msg_part1 = "'squeue' recommended (MinJobAge is sufficient )"
msg_part2 = " for reliable usage"
info = msg_part1 + msg_part2
elif not sacct_available:
info = (
"'sacct' not available, falling back to 'squeue'. "
"WARNING: 'squeue' may not work reliably if MinJobAge is "
"too low"
)
else:
info = (
"'squeue' will be used. "
"WARNING: MinJobAge may be too low for reliable 'squeue' usage"
)

return (
f"{base_help}Default: '{default_cmd}' ({info}). "
f"Set explicitly to override auto-detection."
)


@dataclass
class ExecutorSettings(ExecutorSettingsBase):
"""Settings for the SLURM executor plugin."""

logdir: Optional[Path] = field(
default=None,
metadata={
"help": "Per default the SLURM log directory is relative to "
"the working directory."
"the working directory. "
"This flag allows to set an alternative directory.",
"env_var": False,
"required": False,
},
)

keep_successful_logs: bool = field(
default=False,
metadata={
"help": "Per default SLURM log files will be deleted upon sucessful "
"completion of a job. Whenever a SLURM job fails, its log "
"file will be preserved. "
"help": "Per default SLURM log files will be deleted upon "
"successful completion of a job. Whenever a SLURM job fails, "
"its log file will be preserved. "
"This flag allows to keep all SLURM log files, even those "
"of successful jobs.",
"env_var": False,
"required": False,
},
)

delete_logfiles_older_than: Optional[int] = field(
default=10,
metadata={
"help": "Per default SLURM log files in the SLURM log directory "
"of a workflow will be deleted after 10 days. For this, "
"best leave the default log directory unaltered. "
"Setting this flag allows to change this behaviour. "
"If set to <=0, no old files will be deleted. ",
"If set to <=0, no old files will be deleted.",
},
)

init_seconds_before_status_checks: Optional[int] = field(
default=40,
metadata={
"help": "Defines the time in seconds before the first status "
"check is performed after job submission.",
"env_var": False,
"required": False,
},
)
status_attempts: Optional[int] = field(
default=5,
metadata={
"help": "Defines the number of attempts to query the status of "
"all active jobs. If the status query fails, the next attempt "
"will be performed after the next status check interval."
"The default is 5 status attempts before giving up. The maximum "
"time between status checks is 180 seconds.",
"env_var": False,
"required": False,
"check is performed on submitted jobs.",
},
)

requeue: bool = field(
default=False,
metadata={
"help": "Allow requeuing preempted of failed jobs, "
"help": "Requeue jobs if they fail with exit code != 0, "
"if no cluster default. Results in "
"`sbatch ... --requeue ...` "
"This flag has no effect, if not set.",
"env_var": False,
"required": False,
},
)

no_account: bool = field(
default=False,
metadata={
Expand All @@ -111,6 +167,7 @@ class ExecutorSettings(ExecutorSettingsBase):
"required": False,
},
)

efficiency_report: bool = field(
default=False,
metadata={
Expand All @@ -120,6 +177,7 @@ class ExecutorSettings(ExecutorSettingsBase):
"required": False,
},
)

efficiency_report_path: Optional[Path] = field(
default=None,
metadata={
Expand All @@ -132,14 +190,39 @@ class ExecutorSettings(ExecutorSettingsBase):
"required": False,
},
)

efficiency_threshold: Optional[float] = field(
default=0.8,
metadata={
"help": "The efficiency threshold for the efficiency report. "
"Jobs with an efficiency below this threshold will be reported. "
"This flag has no effect, if not set.",
"help": "Threshold for efficiency report. "
"Jobs with efficiency below this threshold will be reported.",
"env_var": False,
"required": False,
},
)

status_command: Optional[str] = field(
default_factory=_get_status_command_default,
metadata={
"help": _get_status_command_help(),
"env_var": False,
"required": False,
},
)
Comment on lines 210 to 217
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Do not call _get_status_command_help() in metadata; use a static string.

The call evaluates at import time and triggers detections. Replace with the static help string above.

Apply this diff:

-    status_command: Optional[str] = field(
-        default_factory=_get_status_command_default,
-        metadata={
-            "help": _get_status_command_help(),
-            "env_var": False,
-            "required": False,
-        },
-    )
+    status_command: Optional[str] = field(
+        default_factory=_get_status_command_default,
+        metadata={
+            "help": _get_status_command_help(),
+            "env_var": False,
+            "required": False,
+        },
+    )

Note: after applying the previous change, _get_status_command_help() is pure and safe to call here.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In snakemake_executor_plugin_slurm/__init__.py around lines 204 to 211, the
metadata "help" value currently calls _get_status_command_help() at import time;
replace that call with the static help string used above (i.e., the literal help
text defined for status_command earlier in the file) so metadata contains a
plain string instead of invoking the function at import. Do not add any new
function calls; just paste the existing static help text into the metadata
"help" field and keep the rest of the field declaration unchanged.


status_attempts: Optional[int] = field(
default=5,
metadata={
"help": "Defines the number of attempts to query the status of "
"all active jobs. If the status query fails, the next attempt "
"will be performed after the next status check interval. "
"The default is 5 status attempts before giving up. The maximum "
"time between status checks is 180 seconds.",
"env_var": False,
"required": False,
},
)

qos: Optional[str] = field(
default=None,
metadata={
Expand All @@ -148,6 +231,7 @@ class ExecutorSettings(ExecutorSettingsBase):
"required": False,
},
)

reservation: Optional[str] = field(
default=None,
metadata={
Expand All @@ -157,6 +241,11 @@ class ExecutorSettings(ExecutorSettingsBase):
},
)

def __post_init__(self):
"""Validate settings after initialization."""
# Add any validation logic here if needed in the future
pass


# Required:
# Specify common settings shared by various executors.
Expand All @@ -176,9 +265,6 @@ class ExecutorSettings(ExecutorSettingsBase):
pass_default_resources_args=True,
pass_envvar_declarations_to_cmd=False,
auto_deploy_default_storage_provider=False,
# wait a bit until slurmdbd has job info available
init_seconds_before_status_checks=40,
pass_group_args=True,
)


Expand All @@ -200,6 +286,57 @@ def __post_init__(self, test_mode: bool = False):
else Path(".snakemake/slurm_logs").resolve()
)

# Validate status_command configuration if the field exists
self._validate_status_command_settings()

def _validate_status_command_settings(self):
"""Validate and provide feedback about status_command configuration."""
if hasattr(self.workflow.executor_settings, "status_command"):
status_command = self.workflow.executor_settings.status_command
if status_command:
min_job_age = get_min_job_age()
sacct_available = is_query_tool_available("sacct")

# Calculate dynamic threshold: 3 times the initial status check interval
# The plugin starts with 40 seconds and increases, so we use (3 * 10) + 40 = 70 seconds as minimum
between_status_check_seconds = getattr(
self.workflow.executor_settings, "seconds_between_status_checks", 70
)
dynamic_check_threshold = 3 * between_status_check_seconds

if not sacct_available and status_command == "sacct":
self.logger.warning(
"The 'sacct' command is not available on this system. "
"Using 'squeue' instead for job status queries."
)
elif sacct_available and min_job_age is not None:
if (
min_job_age < dynamic_check_threshold
and status_command == "sacct"
):
self.logger.warning(
f"MinJobAge is {min_job_age} seconds (< {dynamic_check_threshold}s). "
f"This may cause 'sacct' to report inaccurate job states and the status_command option may be unreliable. "
f"(Threshold is 3x status check interval: 3 × {between_status_check_seconds}s = {dynamic_check_threshold}s)"
)
elif (
min_job_age >= dynamic_check_threshold
and status_command == "squeue"
):
self.logger.warning(
f"MinJobAge is {min_job_age} seconds (>= {dynamic_check_threshold}s). "
f"The 'squeue' command should work reliably for status queries. "
f"(Threshold is 3x status check interval: 3 × {between_status_check_seconds}s = {dynamic_check_threshold}s)"
)

Comment on lines 301 to 353
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix miswired warnings and threshold calculation in _validate_status_command_settings.

  • MinJobAge affects squeue visibility, not sacct. Current warnings invert this.
  • Threshold should derive from init_seconds_before_status_checks (40 by default), not a non-existent seconds_between_status_checks.
  • Replace the ambiguous × character in log messages.

Apply this diff:

-                # Calculate dynamic threshold: 3 times the initial status check interval
-                # The plugin starts with 40 seconds and increases, so we use (3 * 10) + 40 = 70 seconds as minimum
-                between_status_check_seconds = getattr(
-                    self.workflow.executor_settings, "seconds_between_status_checks", 70
-                )
-                dynamic_check_threshold = 3 * between_status_check_seconds
+                # Threshold: 3x initial status check interval (default 40s)
+                initial_interval = getattr(
+                    self.workflow.executor_settings,
+                    "init_seconds_before_status_checks",
+                    40,
+                )
+                dynamic_check_threshold = 3 * initial_interval
@@
-                elif sacct_available and min_job_age is not None:
-                    if (
-                        min_job_age < dynamic_check_threshold
-                        and status_command == "sacct"
-                    ):
-                        self.logger.warning(
-                            f"MinJobAge is {min_job_age} seconds (< {dynamic_check_threshold}s). "
-                            f"This may cause 'sacct' to report inaccurate job states and the status_command option may be unreliable. "
-                            f"(Threshold is 3x status check interval: 3 × {between_status_check_seconds}s = {dynamic_check_threshold}s)"
-                        )
-                    elif (
-                        min_job_age >= dynamic_check_threshold
-                        and status_command == "squeue"
-                    ):
-                        self.logger.warning(
-                            f"MinJobAge is {min_job_age} seconds (>= {dynamic_check_threshold}s). "
-                            f"The 'squeue' command should work reliably for status queries. "
-                            f"(Threshold is 3x status check interval: 3 × {between_status_check_seconds}s = {dynamic_check_threshold}s)"
-                        )
+                elif sacct_available and min_job_age is not None:
+                    if min_job_age < dynamic_check_threshold and status_command == "squeue":
+                        self.logger.warning(
+                            f"MinJobAge is {min_job_age} seconds (< {dynamic_check_threshold}s). "
+                            "This may cause 'squeue' to miss recently finished jobs; consider using 'sacct'. "
+                            f"(Threshold is 3x initial interval: 3 x {initial_interval}s = {dynamic_check_threshold}s)"
+                        )
+                    elif min_job_age >= dynamic_check_threshold and status_command == "sacct":
+                        self.logger.warning(
+                            f"MinJobAge is {min_job_age} seconds (>= {dynamic_check_threshold}s). "
+                            "The 'squeue' command should also work reliably for status queries. "
+                            f"(Threshold is 3x initial interval: 3 x {initial_interval}s = {dynamic_check_threshold}s)"
+                        )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _validate_status_command_settings(self):
"""Validate and provide feedback about status_command configuration."""
if hasattr(self.workflow.executor_settings, "status_command"):
status_command = self.workflow.executor_settings.status_command
if status_command:
min_job_age = get_min_job_age()
sacct_available = is_query_tool_available("sacct")
# Calculate dynamic threshold: 3 times the initial status check interval
# The plugin starts with 40 seconds and increases, so we use (3 * 10) + 40 = 70 seconds as minimum
between_status_check_seconds = getattr(
self.workflow.executor_settings, "seconds_between_status_checks", 70
)
dynamic_check_threshold = 3 * between_status_check_seconds
if not sacct_available and status_command == "sacct":
self.logger.warning(
"The 'sacct' command is not available on this system. "
"Using 'squeue' instead for job status queries."
)
elif sacct_available and min_job_age is not None:
if (
min_job_age < dynamic_check_threshold
and status_command == "sacct"
):
self.logger.warning(
f"MinJobAge is {min_job_age} seconds (< {dynamic_check_threshold}s). "
f"This may cause 'sacct' to report inaccurate job states and the status_command option may be unreliable. "
f"(Threshold is 3x status check interval: 3 × {between_status_check_seconds}s = {dynamic_check_threshold}s)"
)
elif (
min_job_age >= dynamic_check_threshold
and status_command == "squeue"
):
self.logger.warning(
f"MinJobAge is {min_job_age} seconds (>= {dynamic_check_threshold}s). "
f"The 'squeue' command should work reliably for status queries. "
f"(Threshold is 3x status check interval: 3 × {between_status_check_seconds}s = {dynamic_check_threshold}s)"
)
def _validate_status_command_settings(self):
"""Validate and provide feedback about status_command configuration."""
if hasattr(self.workflow.executor_settings, "status_command"):
status_command = self.workflow.executor_settings.status_command
if status_command:
min_job_age = get_min_job_age()
sacct_available = is_query_tool_available("sacct")
# Threshold: 3x initial status check interval (default 40s)
initial_interval = getattr(
self.workflow.executor_settings,
"init_seconds_before_status_checks",
40,
)
dynamic_check_threshold = 3 * initial_interval
if not sacct_available and status_command == "sacct":
self.logger.warning(
"The 'sacct' command is not available on this system. "
"Using 'squeue' instead for job status queries."
)
elif sacct_available and min_job_age is not None:
if min_job_age < dynamic_check_threshold and status_command == "squeue":
self.logger.warning(
f"MinJobAge is {min_job_age} seconds (< {dynamic_check_threshold}s). "
"This may cause 'squeue' to miss recently finished jobs; consider using 'sacct'. "
f"(Threshold is 3x initial interval: 3 x {initial_interval}s = {dynamic_check_threshold}s)"
)
elif min_job_age >= dynamic_check_threshold and status_command == "sacct":
self.logger.warning(
f"MinJobAge is {min_job_age} seconds (>= {dynamic_check_threshold}s). "
"The 'squeue' command should also work reliably for status queries. "
f"(Threshold is 3x initial interval: 3 x {initial_interval}s = {dynamic_check_threshold}s)"
)
🧰 Tools
🪛 Ruff (0.12.2)

320-320: String contains ambiguous × (MULTIPLICATION SIGN). Did you mean x (LATIN SMALL LETTER X)?

(RUF001)


329-329: String contains ambiguous × (MULTIPLICATION SIGN). Did you mean x (LATIN SMALL LETTER X)?

(RUF001)

def get_status_command(self):
"""Get the status command to use, with fallback logic."""
if hasattr(self.workflow.executor_settings, "status_command"):
return self.workflow.executor_settings.status_command
else:
# Fallback: determine the best command based on cluster configuration
return _get_status_command_default()

def shutdown(self) -> None:
"""
Shutdown the executor.
Expand Down
Loading
Loading