Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions templates/configs/compute/bon_echo/a40_4x.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
nodes: 1
gpus_per_node: 4
cpus_per_task: 32
mem_gb: 64
timeout_min: 60
slurm:
partition: a40
25 changes: 22 additions & 3 deletions templates/src/llm/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
### LLM training templates
# LLM Training Templates

This directory includes templates for LLM training tasks:
This directory includes templates for language-model workloads:

- [text_classification](text_classification/): Fine-tunes a small Transformer on AG News using Hugging Face Trainer.
- [text_classification](text_classification/): fine-tunes a small LLM on AG News via Hugging Face Trainer.
- [finetune_distributed](finetune_distributed/): distributed finetuning example adapted from VectorLM (https://github.com/VectorInstitute/vectorlm).

## Finetune Distributed (DDP/FSDP)

Run the distributed template:
```bash
uv run python -m llm.finetune_distributed.launch \
compute=bon_echo/a40_4x \
+trainer.dist.mode=fsdp --multirun
```
You can choose **DDP** or **FSDP** mode by setting the `+trainer.dist.mode` argument (`ddp` or `fsdp`).

A few points to clarify for this template:
- **`launch.py`** is the Hydra entrypoint; it merges config layers and hands the resolved config to Submitit.
- **`distributed_launcher.py`** is a Submitit helper; it shells out to `torch.distributed.run` so that torchrun controls per-rank workers without re-entering Hydra (the same pattern used in VectorLM).
- **`train.py`** is the torchrun worker; it loads the saved config, builds tokenizer, dataloaders, model, and optimizer, and then delegates to the Trainer.
- **`trainer_core.py`** is a minimal trainer (adapted from VectorLM’s `trainer.py`); it handles gradient accumulation, checkpointing, optional evaluation, and works with either DDP or FSDP.

Hydra and Submitit resolve and submit jobs once. Torchrun (DDP/FSDP) needs to own process creation per GPU. Launching `torch.distributed.run` in a subprocess is the standard Hydra + Submitit approach: it avoids nested Hydra invocations, keeps the Hydra run directory stable for requeues and checkpoints, and makes local debugging under `torchrun` straightforward.
1 change: 1 addition & 0 deletions templates/src/llm/finetune_distributed/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""LLM training template: Fine-tuning using distributed training."""
35 changes: 35 additions & 0 deletions templates/src/llm/finetune_distributed/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
trainer:
model:
name: "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
gradient_checkpointing: false

data:
name: "karpathy/tiny_shakespeare"
text_key: "text"
max_length: 512
trust_remote_code: true

train:
num_train_epochs: 1
per_device_train_batch_size: 1
per_device_eval_batch_size: 1
gradient_accumulation_steps: 8
learning_rate: 2.0e-5
weight_decay: 0.0
logging_steps: 20
eval_steps: 200
save_steps: 200
eval_strategy: "steps"

dist:
mode: "fsdp" # none | ddp | fsdp
backend: "nccl"
bf16: false
fp16: true
fsdp: "full_shard auto_wrap"
fsdp_config:
use_orig_params: true
limit_all_gathers: true
forward_prefetch: true
sync_module_states: true
activation_checkpointing: false
115 changes: 115 additions & 0 deletions templates/src/llm/finetune_distributed/distributed_launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""Launch script for checkpointable distributed finetuning with Hydra + Submitit."""

from __future__ import annotations

import os
import subprocess
import sys

import submitit
from omegaconf import OmegaConf


def _under_torchrun() -> bool:
return "LOCAL_RANK" in os.environ or "TORCHELASTIC_RUN_ID" in os.environ


def _running_inside_slurm() -> bool:
return "SLURM_JOB_ID" in os.environ


def _slurm_world():
nnodes = int(os.environ.get("SLURM_NNODES", "1"))
node_rank = int(os.environ.get("SLURM_NODEID", "0"))
nodelist = os.environ.get("SLURM_NODELIST") or os.environ.get("SLURM_JOB_NODELIST")
if not nodelist:
master_addr = "127.0.0.1"
else:
out = subprocess.check_output(["scontrol", "show", "hostnames", nodelist])
master_addr = out.decode().splitlines()[0].strip()
master_port = os.environ.get("MASTER_PORT", "29500")
return nnodes, node_rank, master_addr, master_port


def _resolve_work_dir(cfg) -> str:
env_dir = os.environ.get("HYDRA_LAUNCHER_RUN_DIR") or os.environ.get(
"HYDRA_RUN_DIR"
)
if env_dir:
return env_dir
work_dir = getattr(cfg, "work_dir", None)
if isinstance(work_dir, str) and "${" not in work_dir:
return work_dir
return os.getcwd()


def _save_resolved_config(cfg, work_dir: str) -> str:
OmegaConf.set_struct(cfg, False)
cfg.work_dir = work_dir
if "paths" in cfg:
cfg.paths["work_dir"] = work_dir
cfg.paths["work_root"] = os.path.dirname(work_dir)
base = os.path.basename(work_dir)
if base.isdigit():
cfg.experiment_name = os.path.basename(os.path.dirname(work_dir))
else:
cfg.experiment_name = base
cfg_path = os.path.join(work_dir, "_fsdp_cfg.yaml")
OmegaConf.save(cfg, cfg_path)
return cfg_path


def _launch_torchrun(cfg, world_size: int, nproc_per_node: int) -> int:
if world_size <= 1 or _under_torchrun() or not _running_inside_slurm():
return 0
nnodes, node_rank, master_addr, master_port = _slurm_world()
work_dir = _resolve_work_dir(cfg)
os.makedirs(work_dir, exist_ok=True)
cfg_path = _save_resolved_config(cfg, work_dir)
cmd = [
sys.executable,
"-m",
"torch.distributed.run",
f"--nproc_per_node={nproc_per_node}",
f"--nnodes={nnodes}",
f"--node_rank={node_rank}",
f"--master_addr={master_addr}",
f"--master_port={master_port}",
"--module",
"llm.finetune_distributed.train",
"--config",
cfg_path,
]
return subprocess.run(cmd, check=False).returncode


class DistributedLauncher(submitit.helpers.Checkpointable):
"""Submitit helper that spins up torchrun or falls back to a local run."""

def __call__(self, cfg):
"""Dispatch the training job based on the selected distributed mode."""
nnodes = int(getattr(cfg.compute, "nodes", 1))
gpn = int(getattr(cfg.compute, "gpus_per_node", 1))
world_size = nnodes * gpn

if getattr(cfg.dist, "mode", "none") in {"ddp", "fsdp"}:
return _launch_torchrun(cfg, world_size, gpn)

work_dir = _resolve_work_dir(cfg)
os.makedirs(work_dir, exist_ok=True)
cfg_path = _save_resolved_config(cfg, work_dir)
cmd = [
sys.executable,
"-m",
"llm.finetune_distributed.train",
"--config",
cfg_path,
]
return subprocess.run(cmd, check=False).returncode

def checkpoint(self, *args, **kwargs):
"""Checkpoint the launcher so Submitit can requeue the job."""
return submitit.helpers.DelayedSubmission(self, *args, **kwargs)


__all__ = ["DistributedLauncher"]
41 changes: 41 additions & 0 deletions templates/src/llm/finetune_distributed/launch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Launch script for checkpointable distributed finetuning with Hydra + Submitit."""

import os

import hydra
from hydra.core.hydra_config import HydraConfig
from llm.finetune_distributed.distributed_launcher import DistributedLauncher
from omegaconf import DictConfig, OmegaConf


_CONFIG_PATH = os.path.normpath(
os.path.join(os.path.dirname(__file__), "../../../configs")
)


@hydra.main(config_path=_CONFIG_PATH, config_name="_global", version_base=None)
def main(cfg: DictConfig):
"""Hydra entrypoint that merges configs before launching training."""
local_cfg_path = os.path.join(os.path.dirname(__file__), "config.yaml")
local_cfg = OmegaConf.load(local_cfg_path)
OmegaConf.set_struct(cfg, False)
cfg = OmegaConf.merge(local_cfg, cfg)

hydra_run_dir = HydraConfig.get().runtime.output_dir
if hydra_run_dir is not None:
cfg.work_dir = hydra_run_dir
if "paths" in cfg:
cfg.paths.work_dir = hydra_run_dir
cfg.paths.work_root = os.path.dirname(hydra_run_dir)

if "trainer" in cfg:
trainer_cfg = cfg.trainer
cfg = OmegaConf.merge(cfg, trainer_cfg)
del cfg.trainer

runner = DistributedLauncher()
return runner(cfg)


if __name__ == "__main__":
main()
Loading