diff --git a/fireworks/features/multi_launcher.py b/fireworks/features/multi_launcher.py index 8004a3bad..ff9a308dd 100644 --- a/fireworks/features/multi_launcher.py +++ b/fireworks/features/multi_launcher.py @@ -6,6 +6,7 @@ import threading import time from multiprocessing import Manager, Process +import math from fireworks.core.rocket_launcher import rapidfire from fireworks.fw_config import ( @@ -20,6 +21,7 @@ get_my_host, log_multi, ) +from warnings import warn __author__ = "Xiaohui Qu, Anubhav Jain" __copyright__ = "Copyright 2013, The Material Project & The Electrolyte Genome Project" @@ -54,7 +56,8 @@ def ping_multilaunch(port, stop_event): def rapidfire_process( - fworker, nlaunches, sleep, loglvl, port, node_list, sub_nproc, timeout, running_ids_dict, local_redirect + fworker, nlaunches, sleep, loglvl, port, node_list, sub_nproc, timeout, running_ids_dict, local_redirect, + gpu_id=None ): """ Initializes shared data with multiprocessing parameters and starts a rapidfire. @@ -70,7 +73,13 @@ def rapidfire_process( sub_nproc (int): number of processors of the sub job timeout (int): # of seconds after which to stop the rapidfire process local_redirect (bool): redirect standard input and output to local file + gpu_id (int): GPU ID to use for the sub job """ + if gpu_id is not None: + # If the sub job is using GPU, set the CUDA_VISIBLE_DEVICES environment variable + # This will limit the GPU usage to only the specified GPU + os.environ['CUDA_VISIBLE_DEVICES'] = str(gpu_id) + ds = DataServer(address=("127.0.0.1", port), authkey=DS_PASSWORD) ds.connect() launchpad = ds.LaunchPad() @@ -147,6 +156,7 @@ def start_rockets( timeout=None, running_ids_dict=None, local_redirect=False, + gpu_lists=None ): """ Create each sub job and start a rocket launch in each one @@ -168,9 +178,10 @@ def start_rockets( processes = [ Process( target=rapidfire_process, - args=(fworker, nlaunches, sleep, loglvl, port, nl, sub_nproc, timeout, running_ids_dict, local_redirect), + args=(fworker, nlaunches, sleep, loglvl, port, nl, sub_nproc, + timeout, running_ids_dict, local_redirect, gpu_id), ) - for nl, sub_nproc in zip(node_lists, sub_nproc_list) + for nl, sub_nproc, gpu_id in zip(node_lists, sub_nproc_list, gpu_lists) ] for p in processes: p.start() @@ -178,7 +189,7 @@ def start_rockets( return processes -def split_node_lists(num_jobs, total_node_list=None, ppn=24): +def split_node_lists(num_jobs, total_node_list=None, ppn=24, gpus_per_node=None): """ Parse node list and processor list from nodefile contents @@ -198,10 +209,19 @@ def split_node_lists(num_jobs, total_node_list=None, ppn=24): sub_nnodes = nnodes // num_jobs sub_nproc_list = [sub_nnodes * ppn] * num_jobs node_lists = [orig_node_list[i : i + sub_nnodes] for i in range(0, nnodes, sub_nnodes)] + + if gpus_per_node is not None: + gpu_lists = list(range(gpus_per_node)) * nnodes + else: + gpu_lists = [None] * nnodes else: sub_nproc_list = [ppn] * num_jobs node_lists = [None] * num_jobs - return node_lists, sub_nproc_list + if gpus_per_node is not None: + gpu_lists = (list(range(gpus_per_node))*math.ceil(num_jobs/gpus_per_node))[:num_jobs] + else: + gpu_lists = [None] * num_jobs + return node_lists, sub_nproc_list, gpu_lists # TODO: why is loglvl a required parameter??? Also nlaunches and sleep_time could have a sensible default?? @@ -217,6 +237,7 @@ def launch_multiprocess( timeout=None, exclude_current_node=False, local_redirect=False, + use_gpu=False ): """ Launch the jobs in the job packing mode. @@ -234,6 +255,23 @@ def launch_multiprocess( exclude_current_node: Don't use the script launching node as a compute node local_redirect (bool): redirect standard input and output to local file """ + gpus_per_node = None + cuda_devices = os.environ.get("CUDA_VISIBLE_DEVICES", None) + if use_gpu and cuda_devices is not None: + # Count the number of GPUs on each node + gpus_per_node = len(os.environ["CUDA_VISIBLE_DEVICES"].split(',')) + num_gpu = gpus_per_node + if total_node_list is not None: + # If the node list is specified, we need to multiply the number of GPUs by the + # number of nodes. Else we assume it is a single node job. + num_gpu = gpus_per_node * len(total_node_list) + if num_jobs > num_gpu: + raise ValueError(f"More jobs than GPUs requested. num_jobs={num_jobs}," + f" num_gpu={num_gpu}") + else: + warn('No node list specified, assuming the number of requested jobs is less' + ' than the number of total GPUs available.') + # parse node file contents if exclude_current_node: host = get_my_host() @@ -244,7 +282,7 @@ def launch_multiprocess( total_node_list.remove(host) else: log_multi(l_logger, "The current node is not in the node list, keep the node list as is") - node_lists, sub_nproc_list = split_node_lists(num_jobs, total_node_list, ppn) + node_lists, sub_nproc_list, gpu_lists = split_node_lists(num_jobs, total_node_list, ppn, gpus_per_node) # create shared dataserver ds = DataServer.setup(launchpad) @@ -264,6 +302,7 @@ def launch_multiprocess( timeout=timeout, running_ids_dict=running_ids_dict, local_redirect=local_redirect, + gpu_lists=gpu_lists ) FWData().Running_IDs = running_ids_dict diff --git a/fireworks/scripts/rlaunch_run.py b/fireworks/scripts/rlaunch_run.py index d51c0478e..3e8be9b9b 100644 --- a/fireworks/scripts/rlaunch_run.py +++ b/fireworks/scripts/rlaunch_run.py @@ -104,6 +104,7 @@ def rlaunch(argv: Optional[Sequence[str]] = None) -> int: multi_parser.add_argument( "--local_redirect", help="Redirect stdout and stderr to the launch directory", action="store_true" ) + multi_parser.add_argument("--use_gpu", help="Whether or not the job uses GPU compute", default=False, type=bool) parser.add_argument("-l", "--launchpad_file", help="path to launchpad file") parser.add_argument("-w", "--fworker_file", help="path to fworker file") @@ -187,6 +188,7 @@ def rlaunch(argv: Optional[Sequence[str]] = None) -> int: timeout=args.timeout, exclude_current_node=args.exclude_current_node, local_redirect=args.local_redirect, + use_gpu=args.use_gpu ) else: launch_rocket(launchpad, fworker, args.fw_id, args.loglvl, pdb_on_exception=args.pdb)