Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions docs/further.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,79 @@ See the [snakemake documentation on profiles](https://snakemake.readthedocs.io/e
How and where you set configurations on factors like file size or increasing the runtime with every `attempt` of running a job (if [`--retries` is greater than `0`](https://snakemake.readthedocs.io/en/stable/executing/cli.html#snakemake.cli-get_argument_parser-behavior)).
[There are detailed examples for these in the snakemake documentation.](https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#dynamic-resources)

#### Automatic Partition Selection

The SLURM executor plugin supports automatic partition selection based on job resource requirements, via the command line option `--slurm-partition-config`. This feature allows the plugin to choose the most appropriate partition for each job, without the need to manually specify partitions for different job types. This also enables variable partition selection as a job's resource requirements change based on [dynamic resources](#dynamic-resource-specification), ensuring that jobs are always scheduled to an appropriate partition.

*Jobs that explicitly specify a `slurm_partition` resource will bypass automatic selection and use the specified partition directly.*

##### Partition Limits Specification

To enable automatic partition selection, create a YAML configuration file that defines the available partitions and their resource limits. This file should be structured as follows:

```yaml
partitions:
some_partition:
max_runtime: 100
another_partition:
...
```
Where `some_partition` and `another_partition` are the names of the partition on your cluster, according to `sinfo`.

The following limits can be defined for each partition:

| Parameter | Type | Description | Default |
| ----------------------- | --------- | ---------------------------------- | --------- |
| `max_runtime` | int | Maximum walltime in minutes | unlimited |
| `max_mem_mb` | int | Maximum total memory in MB | unlimited |
| `max_mem_mb_per_cpu` | int | Maximum memory per CPU in MB | unlimited |
| `max_cpus_per_task` | int | Maximum CPUs per task | unlimited |
| `max_nodes` | int | Maximum number of nodes | unlimited |
| `max_tasks` | int | Maximum number of tasks | unlimited |
| `max_tasks_per_node` | int | Maximum tasks per node | unlimited |
| `max_gpu` | int | Maximum number of GPUs | 0 |
| `available_gpu_models` | list[str] | List of available GPU models | none |
| `max_cpus_per_gpu` | int | Maximum CPUs per GPU | unlimited |
| `supports_mpi` | bool | Whether MPI jobs are supported | true |
| `max_mpi_tasks` | int | Maximum MPI tasks | unlimited |
| `available_constraints` | list[str] | List of available node constraints | none |

##### Example Partition Configuration

```yaml
partitions:
standard:
max_runtime: 720 # 12 hours
max_mem_mb: 64000 # 64 GB
max_cpus_per_task: 24
max_nodes: 1

highmem:
max_runtime: 1440 # 24 hours
max_mem_mb: 512000 # 512 GB
max_mem_mb_per_cpu: 16000
max_cpus_per_task: 48
max_nodes: 1

gpu:
max_runtime: 2880 # 48 hours
max_mem_mb: 128000 # 128 GB
max_cpus_per_task: 32
max_gpu: 8
available_gpu_models: ["a100", "v100", "rtx3090"]
max_cpus_per_gpu: 8
```

##### How Partition Selection Works

When automatic partition selection is enabled, the plugin evaluates each job's resource requirements against the defined partition limits to ensure the job is placed on a partition that can accommodate all of its requirements. When multiple partitions are compatible, the plugin uses a scoring algorithm that favors partitions with limits closer to the job's needs, preventing jobs from being assigned to partitions with excessively high resource limits.

The scoring algorithm calculates a score by summing the ratios of requested resources to partition limits (e.g., if a job requests 8 CPUs and a partition allows 16, this contributes 0.5 to the score). Higher scores indicate better resource utilization, so a job requesting 8 CPUs would prefer a 16-CPU partition (score 0.5) over a 64-CPU partition (score 0.125).

##### Fallback Behavior

If no suitable partition is found based on the job's resource requirements, the plugin falls back to the default SLURM behavior, which typically uses the cluster's default partition or any partition specified explicitly in the job's resources.


#### Standard Resources

Expand Down
23 changes: 22 additions & 1 deletion snakemake_executor_plugin_slurm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
)
from .efficiency_report import create_efficiency_report
from .submit_string import get_submit_command
from .partitions import read_partition_file, get_best_partition


@dataclass
Expand Down Expand Up @@ -111,6 +112,13 @@ class ExecutorSettings(ExecutorSettingsBase):
"required": False,
},
)
partition_config: Optional[Path] = field(
default=None,
metadata={
"help": "Path to YAML file defining partition limits for dynamic "
"partition selection. When provided, jobs will be dynamically "
"assigned to the best-fitting partition based on "
"See documentation for complete list of available limits.",
efficiency_report: bool = field(
Comment on lines 118 to 127
Copy link
Contributor

@coderabbitai coderabbitai bot Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Close the partition_config field definition and finish the help text.

As written, the metadata={... block never closes, so the module won’t even parse. At the same time the help string still stops mid‑sentence (previously flagged). Please close the dict/field and complete the description:

-        metadata={
-            "help": "Path to YAML file defining partition limits for dynamic "
-            "partition selection. When provided, jobs will be dynamically "
-            "assigned to the best-fitting partition based on "
-            "See documentation for complete list of available limits.",
+        metadata={
+            "help": "Path to YAML file defining partition limits for dynamic "
+            "partition selection. When provided, jobs will be dynamically "
+            "assigned to the best-fitting partition based on their resource "
+            "requirements. See documentation for complete list of available limits.",
         },
     )
🧰 Tools
🪛 Ruff (0.14.2)

122-122: Expected ',', found '='

(invalid-syntax)

🤖 Prompt for AI Agents
In snakemake_executor_plugin_slurm/__init__.py around lines 115 to 122, the
partition_config field's metadata dict and field call are left unclosed and the
help string is incomplete; close the metadata dict and the field(...) call,
finish the help text to a full sentence (e.g. "based on resource requirements
and limits specified in the YAML file; see documentation for available
limits."), and ensure proper comma/paren balancing so the module parses.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai - that issue should be fixed. Isn't it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Extract the partition_config field definition to verify its current state
sed -n '114,122p' snakemake_executor_plugin_slurm/__init__.py

echo "---"
echo "Checking for Python syntax errors in the file:"
python -m py_compile snakemake_executor_plugin_slurm/__init__.py 2>&1 || echo "Syntax check complete"

Length of output: 370


🏁 Script executed:

#!/bin/bash
# Extract more lines to see the complete partition_config field definition
sed -n '114,125p' snakemake_executor_plugin_slurm/__init__.py

Length of output: 465


@cmeesters, good news and almost there! The structural issue is fixed ✓ – the field definition is now properly closed and the file compiles without syntax errors.

However, the help text is still incomplete. It currently reads:

"...assigned to the best-fitting partition based on "
"See documentation for complete list of available limits."

The phrase "based on " ends mid-thought. You need to complete it before moving to the next sentence. Here's a suggested fix:

metadata={
    "help": "Path to YAML file defining partition limits for dynamic "
    "partition selection. When provided, jobs will be dynamically "
    "assigned to the best-fitting partition based on their resource "
    "requirements. See documentation for complete list of available limits.",
},

Just add "their resource requirements. " (or similar phrasing) to bridge the gap between "based on" and "See documentation".


🧬 Code Graph Analysis Results
  • File: snakemake_executor_plugin_slurm/efficiency_report.py, lines 231-260
def create_efficiency_report(e_threshold, run_uuid, e_report_path, logger):
    """
    Fetch sacct job data for a Snakemake workflow
    and compute efficiency metrics.
    """
    lines = get_sacct_data(run_uuid, logger)

    if lines is None or not lines:
        return None

    df = parse_sacct_data(lines, e_threshold, run_uuid, logger)

    # we construct a path object to allow for a customi
    # logdir, if specified
    p = Path()

    # Save the report to a CSV file
    logfile = f"efficiency_report_{run_uuid}.csv"
    if e_report_path:
        logfile = Path(e_report_path) / logfile
    else:
        logfile = p.cwd() / logfile
    # ensure the directory exists
    logfile.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(logfile)

    # write out the efficiency report at normal verbosity in any case
    logger.info(f"Efficiency report for workflow {run_uuid} saved to {logfile}.")
    # state directory contents for debugging purposes
    logger.debug(f"Current directory contents in '{p.cwd()}': {os.listdir(p.cwd())}")
  • File: snakemake_executor_plugin_slurm/validation.py, lines 48-75
def validate_slurm_extra(job):
    """
    Validate that slurm_extra doesn't contain executor-managed options.

    Args:
        job: Snakemake job object with resources attribute

    Raises:
        WorkflowError: If forbidden SLURM options are found in slurm_extra
    """
    # skip testing if no slurm_extra is set
    slurm_extra = getattr(job.resources, "slurm_extra", None)
    if not slurm_extra:
        return

    forbidden_options = get_forbidden_slurm_options()

    for pattern, option_name in forbidden_options.items():
        if re.search(pattern, slurm_extra):
            raise WorkflowError(
                f"The --{option_name.replace(' ', '-')} option is not "
                f"allowed in the 'slurm_extra' parameter. "
                f"The {option_name} is set by the snakemake executor plugin "
                f"and must not be overwritten. "
                f"Please use the appropriate snakemake resource "
                f"specification instead. "
                f"Consult the documentation for proper resource configuration."
            )
  • File: snakemake_executor_plugin_slurm/partitions.py, lines 13-27
def read_partition_file(partition_file: Path) -> List["Partition"]:
    with open(partition_file, "r") as f:
        out = []
        partitions_dict = yaml.safe_load(f)["partitions"]
        for partition_name, partition_config in partitions_dict.items():
            if not partition_name or not partition_name.strip():
                raise KeyError("Partition name cannot be empty")

            out.append(
                Partition(
                    name=partition_name,
                    limits=PartitionLimits(**partition_config),
                )
            )
        return out
  • File: snakemake_executor_plugin_slurm/partitions.py, lines 30-54
def get_best_partition(
    candidate_partitions: List["Partition"],
    job: JobExecutorInterface,
    logger: LoggerExecutorInterface,
) -> Optional[str]:
    scored_partitions = [
        (p, score)
        for p in candidate_partitions
        if (score := p.score_job_fit(job)) is not None
    ]

    if scored_partitions:
        best_partition, best_score = max(scored_partitions, key=lambda x: x[1])
        partition = best_partition.name
        logger.warning(
            f"Auto-selected partition '{partition}' for job {job.name} "
            f"with score {best_score:.3f}"
        )
        return partition
    else:
        logger.warning(
            f"No suitable partition found for job {job.name} based on "
            f"resource requirements. Falling back to default behavior."
        )
        return None
  • File: snakemake_executor_plugin_slurm/utils.py, lines 13-23
def delete_slurm_environment():
    """
    Function to delete all environment variables
    starting with 'SLURM_'. The parent shell will
    still have this environment. This is needed to
    submit within a SLURM job context to avoid
    conflicting environments.
    """
    for var in os.environ:
        if var.startswith("SLURM_"):
            del os.environ[var]
  • File: snakemake_executor_plugin_slurm/utils.py, lines 26-48
def delete_empty_dirs(path: Path) -> None:
    """
    Function to delete all empty directories in a given path.
    This is needed to clean up the working directory after
    a job has sucessfully finished. This function is needed because
    the shutil.rmtree() function does not delete empty
    directories.
    """
    if not path.is_dir():
        return

    # Process subdirectories first (bottom-up)
    for child in path.iterdir():
        if child.is_dir():
            delete_empty_dirs(child)

    try:
        # Check if directory is now empty after processing children
        if not any(path.iterdir()):
            path.rmdir()
    except (OSError, FileNotFoundError) as e:
        # Provide more context in the error message
        raise OSError(f"Failed to remove empty directory {path}: {e}") from e
  • File: snakemake_executor_plugin_slurm/utils.py, lines 51-123
def set_gres_string(job: JobExecutorInterface) -> str:
    """
    Function to set the gres string for the SLURM job
    based on the resources requested in the job.
    """
    # generic resources (GRES) arguments can be of type
    # "string:int" or "string:string:int"
    gres_re = re.compile(r"^[a-zA-Z0-9_]+(:[a-zA-Z0-9_]+)?:\d+$")
    # gpu model arguments can be of type "string"
    gpu_model_re = re.compile(r"^[a-zA-Z0-9_]+$")
    # any arguments should not start and end with ticks or
    # quotation marks:
    string_check = re.compile(r"^[^'\"].*[^'\"]$")
    # The Snakemake resources can be only be of type "int",
    # hence no further regex is needed.

    gpu_string = None
    if job.resources.get("gpu"):
        gpu_string = str(job.resources.get("gpu"))

    gpu_model = None
    if job.resources.get("gpu_model"):
        gpu_model = job.resources.get("gpu_model")

    # ensure that gres is not set, if gpu and gpu_model are set
    if job.resources.get("gres") and gpu_string:
        raise WorkflowError(
            "GRES and GPU are set. Please only set one of them.", rule=job.rule
        )
    elif not job.resources.get("gres") and not gpu_model and not gpu_string:
        return ""

    if job.resources.get("gres"):
        # Validate GRES format (e.g., "gpu:1", "gpu:tesla:2")
        gres = job.resources.gres
        if not gres_re.match(gres):
            if not string_check.match(gres):
                raise WorkflowError(
                    "GRES format should not be a nested string (start "
                    "and end with ticks or quotation marks). "
                    "Expected format: "
                    "'<name>:<number>' or '<name>:<type>:<number>' "
                    "(e.g., 'gpu:1' or 'gpu:tesla:2')"
                )
            else:
                raise WorkflowError(
                    f"Invalid GRES format: {gres}. Expected format: "
                    "'<name>:<number>' or '<name>:<type>:<number>' "
                    "(e.g., 'gpu:1' or 'gpu:tesla:2')"
                )
        return f" --gres={job.resources.gres}"

    if gpu_model and gpu_string:
        # validate GPU model format
        if not gpu_model_re.match(gpu_model):
            if not string_check.match(gpu_model):
                raise WorkflowError(
                    "GPU model format should not be a nested string (start "
                    "and end with ticks or quotation marks). "
                    "Expected format: '<name>' (e.g., 'tesla')"
                )
            else:
                raise WorkflowError(
                    f"Invalid GPU model format: {gpu_model}."
                    " Expected format: '<name>' (e.g., 'tesla')"
                )
        return f" --gpus={gpu_model}:{gpu_string}"
    elif gpu_model and not gpu_string:
        raise WorkflowError("GPU model is set, but no GPU number is given")
    elif gpu_string:
        # we assume here, that the validator ensures that the 'gpu_string'
        # is an integer
        return f" --gpus={gpu_string}"
  • File: snakemake_executor_plugin_slurm/submit_string.py, lines 19-130
def get_submit_command(job, params):
    """
    Return the submit command for the job.
    """
    # Convert params dict to a SimpleNamespace for attribute-style access
    params = SimpleNamespace(**params)

    call = (
        "sbatch "
        "--parsable "
        f"--job-name {safe_quote(params.run_uuid)} "
        f"--output {safe_quote(params.slurm_logfile)} "
        "--export=ALL "
        f"--comment {safe_quote(params.comment_str)}"
    )

    # No accout or partition checking is required, here.
    # Checking is done in the submit function.

    # here, only the string is used, as it already contains
    # "-A '{account_name}'"
    call += f" {params.account}"
    # here, only the string is used, as it already contains
    # "- p '{partition_name}'"
    call += f" {params.partition}"

    if job.resources.get("clusters"):
        call += f" --clusters {safe_quote(job.resources.clusters)}"

    if job.resources.get("runtime"):
        call += f" -t {safe_quote(job.resources.runtime)}"

    # Both, constraint and qos are optional.
    # If not set, they will not be added to the sbatch call.
    # If explicitly set to an empty string,
    # `--constraint ''` or `--qos ''` will be added.
    constraint = job.resources.get("constraint")
    if constraint is not None:
        call += f" -C {safe_quote(constraint)}"

    qos = job.resources.get("qos")
    if qos is not None:
        call += f" --qos={safe_quote(qos)}"

    if job.resources.get("mem_mb_per_cpu"):
        call += f" --mem-per-cpu {job.resources.mem_mb_per_cpu}"
    elif job.resources.get("mem_mb"):
        call += f" --mem {job.resources.mem_mb}"

    if job.resources.get("nodes", False):
        call += f" --nodes={job.resources.get('nodes', 1)}"

    gpu_job = job.resources.get("gpu") or "gpu" in job.resources.get("gres", "")
    if gpu_job:
        # fixes #316 - allow unsetting of tasks per gpu
        # apparently, python's internal process manangement interferes with SLURM
        # e.g. for pytorch
        ntasks_per_gpu = job.resources.get("tasks_per_gpu")
        if ntasks_per_gpu is None:
            ntasks_per_gpu = job.resources.get("tasks")
        if ntasks_per_gpu is None:
            ntasks_per_gpu = 1

        if ntasks_per_gpu >= 1:
            call += f" --ntasks-per-gpu={ntasks_per_gpu}"
    else:
        # fixes #40 - set ntasks regardless of mpi, because
        # SLURM v22.05 will require it for all jobs
        # if the job is a MPI job, ntasks will be set later
        if not job.resources.get("mpi", False):
            call += f" --ntasks={job.resources.get('tasks') or 1}"

    # if the job is an MPI job, we need to have some task setting:
    if job.resources.get("mpi", False):
        if not job.resources.get("tasks_per_node") and not job.resources.get("tasks"):
            raise WorkflowError(
                "For MPI jobs, please specify either "
                "'tasks_per_node' or 'tasks' (at least one is required)."
            )
        # raise an error if both task settings are used
        if job.resources.get("tasks_per_node") and job.resources.get("tasks"):
            raise WorkflowError(
                "For MPI jobs, please specify either 'tasks_per_node' or 'tasks', "
                "but not both."
            )
        if job.resources.get("tasks_per_node"):
            if job.resources.get("tasks_per_node") <= 1:
                raise WorkflowError(
                    "For MPI jobs, 'tasks_per_node' must be greater than 1."
                )
            call += f" --ntasks-per-node={job.resources.tasks_per_node}"
        elif job.resources.get("tasks"):
            if job.resources.get("tasks") == 1:
                raise WorkflowError("For MPI jobs, 'tasks' must be greater than 1.")
            call += f" --ntasks={job.resources.tasks}"
        # nodes CAN be set independently of tasks or tasks_per_node
        # this is at a user's discretion. The nodes flag might already
        # be set above, if the user specified it.

    # we need to set cpus-per-task OR cpus-per-gpu, the function
    # will return a string with the corresponding value
    call += f" {get_cpu_setting(job, gpu_job)}"
    if job.resources.get("slurm_extra"):
        call += f" {job.resources.slurm_extra}"

    # ensure that workdir is set correctly
    # use short argument as this is the same in all slurm versions
    # (see https://github.com/snakemake/snakemake/issues/2014)
    if params.workdir:
        call += f" -D {safe_quote(params.workdir)}"

    return call

🧠 Learnings used
Learnt from: cmeesters
PR: snakemake/snakemake-executor-plugin-slurm#249
File: snakemake_executor_plugin_slurm/submit_string.py:21-29
Timestamp: 2025-03-31T11:52:05.373Z
Learning: In the snakemake-executor-plugin-slurm project, account and partition parameters are validated in the submit function before reaching the get_submit_command function. By design, these parameters are either empty strings or already properly formatted with SLURM parameter tags, making additional validation in get_submit_command unnecessary.

Learnt from: johanneskoester
PR: snakemake/snakemake-executor-plugin-slurm#173
File: docs/further.md:96-96
Timestamp: 2025-03-10T15:20:51.829Z
Learning: PR #173 in snakemake-executor-plugin-slurm implements GPU job support by adding resources: `gres` for generic resource specifications (e.g., 'gpu:1'), `gpu`/`gpus` for specifying GPU counts, and `gpu_model`/`gpu_manufacturer` for specifying GPU types, allowing users to request GPU resources directly rather than having to use slurm_extra.

Learnt from: cmeesters
PR: snakemake/snakemake-executor-plugin-slurm#0
File: :0-0
Timestamp: 2025-01-13T09:54:22.950Z
Learning: PR #173 (adding gres resource specification) depends on PR #28 in snakemake-executor-plugin-slurm-jobstep repository, as changes were required in the cpu-settings function of the jobstep-Executor module.

Learnt from: cmeesters
PR: snakemake/snakemake-executor-plugin-slurm#178
File: snakemake_executor_plugin_slurm/__init__.py:0-0
Timestamp: 2024-12-11T14:17:08.749Z
Learning: In the `snakemake-executor-plugin-slurm` project, when handling exceptions in `snakemake_executor_plugin_slurm/__init__.py`, prefer concise error messages and avoid unnecessary verbosity or exception chaining when it's not necessary.

default=False,
metadata={
Expand Down Expand Up @@ -199,6 +207,12 @@ def __post_init__(self, test_mode: bool = False):
if self.workflow.executor_settings.logdir
else Path(".snakemake/slurm_logs").resolve()
)
self._partitions = (
read_partition_file(self.workflow.executor_settings.partition_config)
if self.workflow.executor_settings.partition_config
else None
)
atexit.register(self.clean_old_logs)

def shutdown(self) -> None:
"""
Expand Down Expand Up @@ -303,6 +317,8 @@ def run_job(self, job: JobExecutorInterface):
if job.resources.get("slurm_extra"):
self.check_slurm_extra(job)

# NOTE removed partition from below, such that partition
# selection can benefit from resource checking as the call is built up.
job_params = {
"run_uuid": self.run_uuid,
"slurm_logfile": slurm_logfile,
Expand Down Expand Up @@ -350,6 +366,7 @@ def run_job(self, job: JobExecutorInterface):
"Probably not what you want."
)


exec_job = self.format_job_exec(job)

# and finally the job to execute with all the snakemake parameters
Expand Down Expand Up @@ -706,9 +723,13 @@ def get_partition_arg(self, job: JobExecutorInterface):
returns a default partition, if applicable
else raises an error - implicetly.
"""
partition = None
if job.resources.get("slurm_partition"):
partition = job.resources.slurm_partition
else:
elif self._partitions:
partition = get_best_partition(self._partitions, job, self.logger)
# we didnt get a partition yet so try fallback.
if not partition:
if self._fallback_partition is None:
self._fallback_partition = self.get_default_partition(job)
partition = self._fallback_partition
Expand Down
Loading