Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
367 changes: 255 additions & 112 deletions snakemake_executor_plugin_slurm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import csv
from io import StringIO
import logging
import os
from pathlib import Path
import re
Expand Down Expand Up @@ -32,122 +33,221 @@
delete_empty_dirs,
set_gres_string,
)
from .job_status_query import (
get_min_job_age,
is_query_tool_available,
should_enable_status_command_option,
)
from .efficiency_report import create_efficiency_report
from .submit_string import get_submit_command
from .job_status_query import (
should_enable_status_command_option,
get_min_job_age,
is_query_tool_available,
)


def _get_status_command_default():
"""Get smart default for status_command based on cluster configuration."""
should_enable = should_enable_status_command_option()
sacct_available = is_query_tool_available("sacct")
squeue_available = is_query_tool_available("squeue")

if should_enable and sacct_available:
return "sacct"
elif not should_enable and squeue_available:
return "squeue"
else: # neither command is available or reliable
raise WorkflowError(
"No suitable job status query tool available: "
"'sacct' seems missing and 'squeue' can only display a job status "
f"for finished jobs for {get_min_job_age()} seconds."
)


def _create_executor_settings_class():
"""Create ExecutorSettings class with conditional fields based on cluster configuration."""
# Use a conservative default threshold for class creation (3 * 40s = 120s)
# The actual validation in the executor will use the configured init_seconds_before_status_checks
should_enable = should_enable_status_command_option(min_threshold_seconds=120)

# Base fields that are always present
base_fields = {
"logdir": (
Optional[Path],
field(
default=None,
metadata={
"help": "Per default the SLURM log directory is relative to "
"the working directory."
"This flag allows to set an alternative directory.",
"env_var": False,
"required": False,
},
),
),
"keep_successful_logs": (
bool,
field(
default=False,
metadata={
"help": "Per default SLURM log files will be deleted upon sucessful "
"completion of a job. Whenever a SLURM job fails, its log "
"file will be preserved. "
"This flag allows to keep all SLURM log files, even those "
"of successful jobs.",
"env_var": False,
"required": False,
},
),
),
"delete_logfiles_older_than": (
Optional[int],
field(
default=10,
metadata={
"help": "Per default SLURM log files in the SLURM log directory "
"of a workflow will be deleted after 10 days. For this, "
"best leave the default log directory unaltered. "
"Setting this flag allows to change this behaviour. "
"If set to <=0, no old files will be deleted. ",
},
),
),
"init_seconds_before_status_checks": (
Optional[int],
field(
default=40,
metadata={
"help": "Defines the time in seconds before the first status "
"check is performed after job submission.",
"env_var": False,
"required": False,
},
),
),
"status_attempts": (
Optional[int],
field(
default=5,
metadata={
"help": "Defines the number of attempts to query the status of "
"all active jobs. If the status query fails, the next attempt "
"will be performed after the next status check interval."
"The default is 5 status attempts before giving up. The maximum "
"time between status checks is 180 seconds.",
"env_var": False,
"required": False,
},
),
),
"requeue": (
bool,
field(
default=False,
metadata={
"help": "Allow requeuing preempted of failed jobs, "
"if no cluster default. Results in "
"`sbatch ... --requeue ...` "
"This flag has no effect, if not set.",
"env_var": False,
"required": False,
},
),
),
"no_account": (
bool,
field(
default=False,
metadata={
"help": "Do not use any account for submission. "
"This flag has no effect, if not set.",
"env_var": False,
"required": False,
},
),
),
"efficiency_report": (
bool,
field(
default=False,
metadata={
"help": "Generate an efficiency report at the end of the workflow. "
"This flag has no effect, if not set.",
"env_var": False,
"required": False,
},
),
),
"efficiency_report_path": (
Optional[Path],
field(
default=None,
metadata={
"help": "Path to the efficiency report file. "
"If not set, the report will be written to "
"the current working directory with the name "
"'efficiency_report_<run_uuid>.csv'. "
"This flag has no effect, if not set.",
"env_var": False,
"required": False,
},
),
),
"efficiency_threshold": (
Optional[float],
field(
default=0.8,
metadata={
"help": "The efficiency threshold for the efficiency report. "
"Jobs with an efficiency below this threshold will be reported. "
"This flag has no effect, if not set.",
},
),
),
"reservation": (
Optional[str],
field(
default=None,
metadata={
"help": "If set, the given reservation will be used for job submission.",
"env_var": False,
"required": False,
},
),
),
}

# Add status_command field only if needed
if should_enable:
base_fields["status_command"] = (
Optional[str],
field(
default_factory=_get_status_command_default,
metadata={
"help": "Allows to choose between 'sacct' (the default) and 'squeue'.",
"env_var": False,
"required": False,
},
),
)

def post_init(self):
"""Validate settings after initialization."""
# Note: Validation with logging is handled in the Executor class
# where self.logger is available. This method is kept for potential
# future validation that doesn't require logging.
pass

# Add the __post_init__ method
base_fields["__post_init__"] = post_init

# Create the class dynamically
return dataclass(type("ExecutorSettings", (ExecutorSettingsBase,), base_fields))


@dataclass
class ExecutorSettings(ExecutorSettingsBase):
logdir: Optional[Path] = field(
default=None,
metadata={
"help": "Per default the SLURM log directory is relative to "
"the working directory."
"This flag allows to set an alternative directory.",
"env_var": False,
"required": False,
},
)
keep_successful_logs: bool = field(
default=False,
metadata={
"help": "Per default SLURM log files will be deleted upon sucessful "
"completion of a job. Whenever a SLURM job fails, its log "
"file will be preserved. "
"This flag allows to keep all SLURM log files, even those "
"of successful jobs.",
"env_var": False,
"required": False,
},
)
delete_logfiles_older_than: Optional[int] = field(
default=10,
metadata={
"help": "Per default SLURM log files in the SLURM log directory "
"of a workflow will be deleted after 10 days. For this, "
"best leave the default log directory unaltered. "
"Setting this flag allows to change this behaviour. "
"If set to <=0, no old files will be deleted. ",
},
)
init_seconds_before_status_checks: Optional[int] = field(
default=40,
metadata={
"help": "Defines the time in seconds before the first status "
"check is performed after job submission.",
"env_var": False,
"required": False,
},
)
status_attempts: Optional[int] = field(
default=5,
metadata={
"help": "Defines the number of attempts to query the status of "
"all active jobs. If the status query fails, the next attempt "
"will be performed after the next status check interval."
"The default is 5 status attempts before giving up. The maximum "
"time between status checks is 180 seconds.",
"env_var": False,
"required": False,
},
)
requeue: bool = field(
default=False,
metadata={
"help": "Allow requeuing preempted of failed jobs, "
"if no cluster default. Results in "
"`sbatch ... --requeue ...` "
"This flag has no effect, if not set.",
"env_var": False,
"required": False,
},
)
no_account: bool = field(
default=False,
metadata={
"help": "Do not use any account for submission. "
"This flag has no effect, if not set.",
"env_var": False,
"required": False,
},
)
efficiency_report: bool = field(
default=False,
metadata={
"help": "Generate an efficiency report at the end of the workflow. "
"This flag has no effect, if not set.",
"env_var": False,
"required": False,
},
)
efficiency_report_path: Optional[Path] = field(
default=None,
metadata={
"help": "Path to the efficiency report file. "
"If not set, the report will be written to "
"the current working directory with the name "
"'efficiency_report_<run_uuid>.csv'. "
"This flag has no effect, if not set.",
"env_var": False,
"required": False,
},
)
efficiency_threshold: Optional[float] = field(
default=0.8,
metadata={
"help": "The efficiency threshold for the efficiency report. "
"Jobs with an efficiency below this threshold will be reported. "
"This flag has no effect, if not set.",
},
)
reservation: Optional[str] = field(
default=None,
metadata={
"help": "If set, the given reservation will be used for job submission.",
"env_var": False,
"required": False,
},
)
# Create the actual ExecutorSettings class
ExecutorSettings = _create_executor_settings_class()


# Required:
Expand Down Expand Up @@ -192,6 +292,49 @@ def __post_init__(self, test_mode: bool = False):
else Path(".snakemake/slurm_logs").resolve()
)

# Validate status_command configuration if the field exists
self._validate_status_command_settings()

def _validate_status_command_settings(self):
"""Validate and provide feedback about status_command configuration."""
if hasattr(self.workflow.executor_settings, "status_command"):
status_command = self.workflow.executor_settings.status_command
if status_command:
min_job_age = get_min_job_age()
sacct_available = is_query_tool_available("sacct")

# Calculate dynamic threshold: 3 times the initial status check interval
# The plugin starts with 40 seconds and increases, so we use (3 * 10) + 40 = 70 seconds as minimum
between_status_check_seconds = getattr(self.workflow.executor_settings, 'seconds_between_status_checks', 70)
dynamic_check_threshold = 3 * between_status_check_seconds

if not sacct_available and status_command == "sacct":
self.logger.warning(
"The 'sacct' command is not available on this system. "
"Using 'squeue' instead for job status queries."
)
elif sacct_available and min_job_age is not None:
if min_job_age < dynamic_check_threshold and status_command == "sacct":
self.logger.warning(
f"MinJobAge is {min_job_age} seconds (< {dynamic_checkthreshold}s). "
f"This may cause 'sacct' to report inaccurate job states and the status_command option may be unreliable. "
f"(Threshold is 3x status check interval: 3 × {initial_status_check_seconds}s = {dynamic_threshold}s)"
)
elif min_job_age >= dynamic_check_threshold and status_command == "squeue":
self.logger.warning(
f"MinJobAge is {min_job_age} seconds (>= {dynamic_threshold}s). "
f"The 'squeue' command should work reliably for status queries. "
f"(Threshold is 3x status check interval: 3 × {initial_status_check_seconds}s = {dynamic_threshold}s)"
)

def get_status_command(self):
"""Get the status command to use, with fallback logic."""
if hasattr(self.workflow.executor_settings, "status_command"):
return self.workflow.executor_settings.status_command
else:
# Fallback: determine the best command based on cluster configuration
return _get_status_command_default()

def shutdown(self) -> None:
"""
Shutdown the executor.
Expand Down
Loading
Loading