From 2d3228749bd29df9623f6162ab14cdb8ef12caeb Mon Sep 17 00:00:00 2001 From: Wilson Beebe Date: Fri, 21 Mar 2025 14:41:35 -0700 Subject: [PATCH 01/14] First draft of simple workflow to produce uncertainties on USDF --- src/kbmod_wf/wu_to_uncertainties_workflow.py | 253 +++++++++++++++++++ uncertainties_runtime_config.toml | 24 ++ 2 files changed, 277 insertions(+) create mode 100644 src/kbmod_wf/wu_to_uncertainties_workflow.py create mode 100644 uncertainties_runtime_config.toml diff --git a/src/kbmod_wf/wu_to_uncertainties_workflow.py b/src/kbmod_wf/wu_to_uncertainties_workflow.py new file mode 100644 index 00000000..7d43e5af --- /dev/null +++ b/src/kbmod_wf/wu_to_uncertainties_workflow.py @@ -0,0 +1,253 @@ +import argparse +import os + +import toml +import parsl +from parsl import python_app, File +import parsl.executors + +from kbmod_wf.utilities import ( + apply_runtime_updates, + get_resource_config, + get_executors, + get_configured_logger, +) + +@python_app( + cache=True, + executors=get_executors(["local_dev_testing", "gpu"]), # TODO verify if gpu is needed + ignore_for_cache=["logging_file"], +) +def get_uncertainties(inputs=(), outputs=(), runtime_config={}, logging_file=None): + """Loads a WorkUnit and KBMOD results and calculates the uncertaintties for those results + + Parameters + ---------- + inputs : `tuple` or `list` + Order sensitive input to the Python App. + outputs : `tuple` or `list` + Order sensitive output of the Python App. + runtime_config : `dict`, optional + Runtime configuration values. Keys ``butler_config_filepath``, + ``search_config_filepath`` and ``n_workers`` will be consumed + if they exist. + logging_file : `File` or `None`, optional + Parsl File object poiting to the output logging file. + + Returns + ------- + outputs : `tuple` or `list` + Order sensitive output of the Python App. + + Inputs + ---------- + wu_file : `File` + Parsl File object pointing to the WorkUnit. + res_file : `File` + KBMOD results file corresponding to this WorkUnit. + uuids : `list` + List of UUID hex representations corresponding to results we want to + measure uncertainties for. If empty, all results will be used. + + Outputs + ------- + workunit_path : `File` + Parsl File object poiting to the resampled WorkUnit. + """ + import json + + import numpy as np + from astropy.table import Table + + from kbmod.work_unit import WorkUnit + from kbmod.trajectory_explorer import TrajectoryExplorer + + from kbmod_wf.task_impls import calc_skypos_uncerts + from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger + logger = get_configured_logger("workflow.get_uncertainties", logging_file.filepath) + + with ErrorLogger(logger): + logger.info("Starting getting uncertainties.") + + if os.path.exists(outputs[0].filepath): + logger.info("Finished step 2. Uncertainties exist.") + return outputs + + wu_path = inputs[0].filepath + res_path = inputs[1].filepath + uuids = inputs[2] + + # Load the WorkUnit + wu = None + try: + wu = WorkUnit.from_fits(wu_path) + logger.info(f"Loaded WorkUnit from fits") + except Exception as e: + wu_filename = os.path.basename(wu_path) + wu_dir = os.path.dirname(wu_path) + wu = WorkUnit.from_sharded_fits(wu_filename, wu_dir, lazy=False) + logger.info(f"Loaded WorkUnit from sharded fits") + + # Load results from this WorkUnit + results = Table.read(res_path) + explorer = TrajectoryExplorer(wu.im_stack) + + wcs = wu.wcs + + uuids2, pgs, startt, endt = [], [], [], [] + p1ra, p1dec, sigma_p1ra, sigma_p1dec = [], [], [], [] + p2ra, p2dec, sigma_p2ra, sigma_p2dec = [], [], [], [] + likelihoods, uncerts = [], [] + + if len(uuids) > 0: + # If the user specified a list of UUIDs, filter the results to just those UUIDs + results.table = results.table[results["uuid"].isin(uuids)] + + for r in results: + # TODO Make UUIDs required + uuid = r["uuid"] if "uuid" in r.table.colnames else -1 + samples = explorer.evaluate_around_linear_trajectory( + r["x"][0], + r["y"][0], + r["vx"][0], + r["vy"][0], + pixel_radius=10, + max_ang_offset=0.785397999997775, # np.pi/4 + ang_step=1.5*0.0174533, # deg2rad + max_vel_offset=45, + vel_step=0.55, + ) + + maxl = samples["likelihood"].max() + bestfit = samples[samples["likelihood"] == maxl] + # happens when oversampling + if len(bestfit) > 1: + bestfit = bestfit[:1] + + # Get the valid obstimes that were used this result + valid_obstimes = [] + for i in range(len(r["obs_valid"])): + if r["obs_valid"][i]: + valid_obstimes.append(wu.im_stack.get_obstime(i)) + + mjd_start = min(valid_obstimes) + mjd_end = max(valid_obstimes) + + start_coord, end_coord, uncert = calc_skypos_uncerts( + samples, + mjd_start, + mjd_end, + wcs + ) + + uuids2.append(uuid) + startt.append(mjd_start) + endt.append(mjd_end) + likelihoods.append(maxl) + p1ra.append(start_coord.ra.deg) + p1dec.append(start_coord.dec.deg) + p2ra.append(end_coord.ra.deg) + p2dec.append(end_coord.dec.deg) + sigma_p1ra.append(uncert[0,0]) + sigma_p1dec.append(uncert[1,1]) + sigma_p2ra.append(uncert[2,2]) + sigma_p2dec.append(uncert[3,3]) + uncerts.append(uncert) + + t = Table({ + "likelihood": likelihoods, + "p1ra": p1ra, + "p1dec": p1dec, + "p2ra": p2ra, + "p2dec": p2dec, + "sigma_p1ra": np.sqrt(sigma_p1ra), + "sigma_p1dec": np.sqrt(sigma_p1dec), + "sigma_p2ra": np.sqrt(sigma_p2ra), + "sigma_p2dec": np.sqrt(sigma_p2dec), + "uncertainty": uncerts, + "uuid": uuids2, + "t0": startt, + "t1": endt + }) + t.write(outputs[0].filepath, format="ascii.ecsv", overwrite=True) + logger.info("Finished step 2.") + + return outputs + + + +def workflow_runner(env=None, runtime_config={}): + """This function will load and configure Parsl, and run the workflow. + + Parameters + ---------- + env : str, optional + Environment string used to define which resource configuration to use, + by default None + runtime_config : dict, optional + Dictionary of assorted runtime configuration parameters, by default {} + """ + resource_config = get_resource_config(env=env) + resource_config = apply_runtime_updates(resource_config, runtime_config) + + app_configs = runtime_config.get("apps", {}) + + dfk = parsl.load(resource_config) + if dfk: + logging_file = File(os.path.join(dfk.run_dir, "kbmod.log")) + logger = get_configured_logger("workflow.workflow_runner", logging_file.filepath) + + if runtime_config is not None: + logger.info(f"Using runtime configuration definition:\n{toml.dumps(runtime_config)}") + + logger.info("Starting workflow") + + # Get filenames from runtime config + wu_path = runtime_config.get("wu_path", None) + wu_name = os.path.basename(wu_path) + res_path = runtime_config.get("res_path", None) + uuids = runtime_config.get("uuids", []) + output_dir = runtime_config.get("output_dir", os.getcwd()) + + # create output directory if it doesn't exist + os.makedirs(output_dir, exist_ok=True) + + # run kbmod search on each reprojected WorkUnit + uncertainty_future = get_uncertainties( + inputs=[File(wu_path), File(res_path), uuids], + outputs=[File(os.path.join(output_dir, f"{wu_name}.meas"))], + runtime_config=app_configs.get("kbmod_search", {}), + logging_file=logging_file, + ) + uncertainty_future.result() + dfk.wait_for_current_tasks() + + logger.info("Workflow complete") + + parsl.clear() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--env", + type=str, + choices=["dev", "klone", "usdf"], + help="The environment to run the workflow in.", + ) + + parser.add_argument( + "--runtime-config", + type=str, + help="The complete runtime configuration filepath to use for the workflow.", + ) + + args = parser.parse_args() + + # if a runtime_config file was provided and exists, load the toml as a dict. + runtime_config = {} + if args.runtime_config is not None and os.path.exists(args.runtime_config): + with open(args.runtime_config, "r") as toml_runtime_config: + runtime_config = toml.load(toml_runtime_config) + + workflow_runner(env=args.env, runtime_config=runtime_config) diff --git a/uncertainties_runtime_config.toml b/uncertainties_runtime_config.toml new file mode 100644 index 00000000..db8a2bd5 --- /dev/null +++ b/uncertainties_runtime_config.toml @@ -0,0 +1,24 @@ +# Example parsl config associated with + + +# All values set here will be applied to the resource configuration prior to +# calling parsl.load(config). Even if the key does't exist in the resource +# config, it will be added with the value defined here. +[resource_config_modifiers] +checkpoint_mode = 'task_exit' + +# Values in the apps.XXX section will be passed as a dictionary to the corresponding +# app. e.g. apps.create_uri_manifest will be passed to the create_uri_manifest app. +[apps.get_uncertainties] + +# The path of a resampled workunit to process +wu_path = "" # TODO update + +# Path of a KBMOD results file produced from a run on the resampled workunit +res_path = "" # TODO Update + +# If the KBMOD results has a uuid column only, get uncertainties for these uuids. Otherwise, get_uncertainties for all provided results. +# uuids = [] # Add if needed + +# The output directory to write the file with uncertainties to +output_dir = "" # TODO Update From 6da225d52ba2b6cf03dbb290350d546b3c4e74cd Mon Sep 17 00:00:00 2001 From: Wilson Beebe Date: Fri, 21 Mar 2025 14:43:04 -0700 Subject: [PATCH 02/14] Update instructions --- uncertainties_runtime_config.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/uncertainties_runtime_config.toml b/uncertainties_runtime_config.toml index db8a2bd5..b86e50da 100644 --- a/uncertainties_runtime_config.toml +++ b/uncertainties_runtime_config.toml @@ -1,4 +1,6 @@ -# Example parsl config associated with +# Example parsl config associated with wu_to_uncertainties_workflow.python_app +# To run on USDF, run: +# python /[YOUR_BASE_PATH]/kbmod-wf/src/kbmod_wf/multi_night_workflow.py --runtime_config=[PATH OF THIS RUNTIME CONFIG] --env=USDF # All values set here will be applied to the resource configuration prior to From fbe9555b4f813c1bbd736d9ca201f6afac647909 Mon Sep 17 00:00:00 2001 From: Wilson Beebe Date: Tue, 25 Mar 2025 14:29:29 -0700 Subject: [PATCH 03/14] Fix uncertainty workflow import --- src/kbmod_wf/task_impls/__init__.py | 2 ++ src/kbmod_wf/task_impls/uncertainty_propagation.py | 0 src/kbmod_wf/wu_to_uncertainties_workflow.py | 2 ++ uncertainties_runtime_config.toml | 3 +-- 4 files changed, 5 insertions(+), 2 deletions(-) create mode 100644 src/kbmod_wf/task_impls/uncertainty_propagation.py diff --git a/src/kbmod_wf/task_impls/__init__.py b/src/kbmod_wf/task_impls/__init__.py index eef1ad26..9da9be25 100644 --- a/src/kbmod_wf/task_impls/__init__.py +++ b/src/kbmod_wf/task_impls/__init__.py @@ -2,6 +2,8 @@ from .kbmod_search import kbmod_search from .uri_to_ic import uri_to_ic +from .uncertainty_propagation import * + __all__ = [ "ic_to_wu", "kbmod_search", diff --git a/src/kbmod_wf/task_impls/uncertainty_propagation.py b/src/kbmod_wf/task_impls/uncertainty_propagation.py new file mode 100644 index 00000000..e69de29b diff --git a/src/kbmod_wf/wu_to_uncertainties_workflow.py b/src/kbmod_wf/wu_to_uncertainties_workflow.py index 7d43e5af..45dc0103 100644 --- a/src/kbmod_wf/wu_to_uncertainties_workflow.py +++ b/src/kbmod_wf/wu_to_uncertainties_workflow.py @@ -202,6 +202,8 @@ def workflow_runner(env=None, runtime_config={}): logger.info("Starting workflow") + runtime_config=app_configs.get("get_uncertainties", {}) + # Get filenames from runtime config wu_path = runtime_config.get("wu_path", None) wu_name = os.path.basename(wu_path) diff --git a/uncertainties_runtime_config.toml b/uncertainties_runtime_config.toml index b86e50da..ec3f8cd5 100644 --- a/uncertainties_runtime_config.toml +++ b/uncertainties_runtime_config.toml @@ -1,7 +1,6 @@ # Example parsl config associated with wu_to_uncertainties_workflow.python_app # To run on USDF, run: -# python /[YOUR_BASE_PATH]/kbmod-wf/src/kbmod_wf/multi_night_workflow.py --runtime_config=[PATH OF THIS RUNTIME CONFIG] --env=USDF - +# python /[YOUR_BASE_PATH]/kbmod-wf/src/kbmod_wf/wu_to_uncertainties_workflow.py --runtime-config=[PATH OF THIS RUNTIME CONFIG] --env=usdf # All values set here will be applied to the resource configuration prior to # calling parsl.load(config). Even if the key does't exist in the resource From fae703b32dd147c4f82ae8b3374960346bc4e02b Mon Sep 17 00:00:00 2001 From: Wilson Beebe Date: Tue, 1 Apr 2025 10:08:42 -0700 Subject: [PATCH 04/14] Switch WorkUnit.heliocentric_distance to WorkUnit.barycentric_distance (#54) (#55) --- .../task_impls/reproject_multi_chip_multi_night_from_uris.py | 2 +- src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_wu.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_from_uris.py b/src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_from_uris.py index db82feb1..e165990f 100644 --- a/src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_from_uris.py +++ b/src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_from_uris.py @@ -134,7 +134,7 @@ def reproject_workunit(self): self.logger.debug(f"Required {elapsed}[s] to transform WCS objects to EBD..") wu.org_img_meta["ebd_wcs"] = ebd_per_image_wcs - wu.heliocentric_distance = self.guess_dist + wu.barycentric_distance = self.guess_dist wu.org_img_meta["geocentric_distance"] = geocentric_dists # Reproject to a common WCS using the WCS for our patch diff --git a/src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_wu.py b/src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_wu.py index bfb2b042..1585a5ff 100644 --- a/src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_wu.py +++ b/src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_wu.py @@ -114,7 +114,7 @@ def reproject_workunit(self): self.logger.debug(f"Required {elapsed}[s] to transform WCS objects to EBD..") wu.org_img_meta["ebd_wcs"] = ebd_per_image_wcs - wu.heliocentric_distance = self.guess_dist + wu.barycentric_distance = self.guess_dist wu.org_img_meta["geocentric_distance"] = geocentric_dists # Reproject to a common WCS using the WCS for our patch From f7bd5bd89d85c14166f7236f0515b753628d6adb Mon Sep 17 00:00:00 2001 From: Colin Orion Chandler Date: Tue, 8 Apr 2025 14:03:03 -0700 Subject: [PATCH 05/14] updated usdf config (#56) Updated the configuration as needed --- src/kbmod_wf/resource_configs/usdf_configuration.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/kbmod_wf/resource_configs/usdf_configuration.py b/src/kbmod_wf/resource_configs/usdf_configuration.py index e7fec5ed..a6e89a6d 100644 --- a/src/kbmod_wf/resource_configs/usdf_configuration.py +++ b/src/kbmod_wf/resource_configs/usdf_configuration.py @@ -12,7 +12,7 @@ "gpu_max": "08:00:00", } -base_path = "/sdf/data/rubin/user/wbeebe/parsl/workflow_output" +base_path = f"{os.environ['HOME']}/rubin-user/parsl/workflow_output" account_name = "rubin:commissioning" @@ -39,7 +39,7 @@ def usdf_resource_config(): parallelism=1, nodes_per_block=1, cores_per_node=1, # perhaps should be 8??? - mem_per_node=256, # In GB + mem_per_node=256, # In GB; milano has a 256 Gb cap exclusive=False, walltime=walltimes["compute_bigmem"], scheduler_options="#SBATCH --export=ALL", # Add other options as needed @@ -51,7 +51,7 @@ def usdf_resource_config(): label="large_mem", max_workers=1, provider=SlurmProvider( - partition="milano", + partition="ampere", # or ada?; note: milano has a 256 Gb cap account=account_name, min_blocks=0, max_blocks=2, @@ -71,7 +71,7 @@ def usdf_resource_config(): label="sharded_reproject", max_workers=1, provider=SlurmProvider( - partition="milano", + partition="ampere", # or ada?; note: milano has a 256 Gb cap account=account_name, min_blocks=0, max_blocks=2, @@ -91,7 +91,7 @@ def usdf_resource_config(): label="gpu", max_workers=1, provider=SlurmProvider( - partition="ada", + partition="ampere", # or ada account=account_name, min_blocks=0, max_blocks=2, @@ -111,7 +111,7 @@ def usdf_resource_config(): label="large_gpu", max_workers=1, provider=SlurmProvider( - partition="turing", # TODO verify this + partition="ampere", # or ada; was turing, but we do not have access account=account_name, min_blocks=0, max_blocks=2, From 72cc308bfe907f234cbf287439dd51c54f9bbcbf Mon Sep 17 00:00:00 2001 From: Colin Orion Chandler Date: Tue, 8 Apr 2025 14:44:31 -0700 Subject: [PATCH 06/14] Increased max block size for the various partitions. (#58) Co-authored-by: Colin Orion Chandler --- src/kbmod_wf/resource_configs/usdf_configuration.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/kbmod_wf/resource_configs/usdf_configuration.py b/src/kbmod_wf/resource_configs/usdf_configuration.py index a6e89a6d..a1baf185 100644 --- a/src/kbmod_wf/resource_configs/usdf_configuration.py +++ b/src/kbmod_wf/resource_configs/usdf_configuration.py @@ -34,7 +34,7 @@ def usdf_resource_config(): partition="milano", account=account_name, min_blocks=0, - max_blocks=4, + max_blocks=64, init_blocks=0, parallelism=1, nodes_per_block=1, @@ -54,7 +54,7 @@ def usdf_resource_config(): partition="ampere", # or ada?; note: milano has a 256 Gb cap account=account_name, min_blocks=0, - max_blocks=2, + max_blocks=12, init_blocks=0, parallelism=1, nodes_per_block=1, @@ -74,7 +74,7 @@ def usdf_resource_config(): partition="ampere", # or ada?; note: milano has a 256 Gb cap account=account_name, min_blocks=0, - max_blocks=2, + max_blocks=12, init_blocks=0, parallelism=1, nodes_per_block=1, @@ -94,7 +94,7 @@ def usdf_resource_config(): partition="ampere", # or ada account=account_name, min_blocks=0, - max_blocks=2, + max_blocks=8, init_blocks=0, parallelism=1, nodes_per_block=1, @@ -114,7 +114,7 @@ def usdf_resource_config(): partition="ampere", # or ada; was turing, but we do not have access account=account_name, min_blocks=0, - max_blocks=2, + max_blocks=8, init_blocks=0, parallelism=1, nodes_per_block=1, From 5bf19e98d30d8e20362f48561c7ada392f677cd8 Mon Sep 17 00:00:00 2001 From: Wilson Beebe Date: Wed, 9 Apr 2025 18:44:08 -0700 Subject: [PATCH 07/14] Make Parsl Robust to Workflow Exceptions (#59) * Remove 42.0 au as a default guess distance * Catch exceptions from kbmod search futures * When reprojecting WorkUnits, use overwrite=True by default --- src/kbmod_wf/multi_night_workflow.py | 16 +++++++++++----- .../reproject_multi_chip_multi_night_wu.py | 2 +- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/kbmod_wf/multi_night_workflow.py b/src/kbmod_wf/multi_night_workflow.py index 5c080c99..bf36b1d6 100644 --- a/src/kbmod_wf/multi_night_workflow.py +++ b/src/kbmod_wf/multi_night_workflow.py @@ -84,15 +84,15 @@ def workflow_runner(env=None, runtime_config={}): reproject_futures = [] repro_wu_filenames = [] runtime_config = app_configs.get("reproject_wu", {}) + if "helio_guess_dists" not in runtime_config: + raise ValueError("No 'helio_guess_dists' were provided in the runtime config for reprojection.") + with open(create_manifest_future.result(), "r") as f: for line in f: collection_file = File(line.strip()) wu_filename = line + ".wu" # Get the requested heliocentric guess distances (in AU) for reflex correction. - # If none are provided, default to 42.0 AU. - distances = ( - runtime_config["helio_guess_dists"] if "helio_guess_dists" in runtime_config else [42.0] - ) + distances = runtime_config["helio_guess_dists"] for dist in distances: output_filename = wu_filename + f".{dist}.repro" repro_wu_filenames.append(output_filename) @@ -118,7 +118,13 @@ def workflow_runner(env=None, runtime_config={}): ) ) - [f.result() for f in search_futures] + for f in search_futures: + # Apply a blocking call to ensure that the workflow does not exit before all futures are completed. + # We use a try-catch so that any single future cannot crash the parent process. + try: + f.result() + except Exception as e: + logger.error(f"Error occurred while processing a future: {e}") logger.info("Workflow complete") diff --git a/src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_wu.py b/src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_wu.py index 1585a5ff..03903636 100644 --- a/src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_wu.py +++ b/src/kbmod_wf/task_impls/reproject_multi_chip_multi_night_wu.py @@ -72,7 +72,7 @@ def __init__( self.logger = logger kbmod._logging.basicConfig(level=self.logger.level) - self.overwrite = self.runtime_config.get("overwrite", False) + self.overwrite = self.runtime_config.get("overwrite", True) self.search_config = self.runtime_config.get("search_config", None) # Default to 8 workers if not in the config. Value must be 0 Date: Fri, 11 Apr 2025 14:19:00 -0700 Subject: [PATCH 08/14] Add option to cleanup sharded WorkUnit after search --- src/kbmod_wf/task_impls/kbmod_search.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/kbmod_wf/task_impls/kbmod_search.py b/src/kbmod_wf/task_impls/kbmod_search.py index c2552e19..e5f561d9 100644 --- a/src/kbmod_wf/task_impls/kbmod_search.py +++ b/src/kbmod_wf/task_impls/kbmod_search.py @@ -51,6 +51,7 @@ def __init__( self.logger = logger self.search_config_filepath = self.runtime_config.get("search_config_filepath", None) + self.cleanup_wu = self.runtime_config.get("cleanup_wu", False) self.results_directory = os.path.dirname(self.result_filepath) def run_search(self): @@ -92,4 +93,21 @@ def run_search(self): self.logger.info(f"Writing results to output file: {self.result_filepath}") res.write_table(self.result_filepath) + self.logger.info("Results written to file") + if self.cleanup_wu: + self.logger.info(f"Cleaning up sharded WorkUnit {self.input_wu_filepath} with {len(wu)}") + # Delete the head WorkUnit file at self.input_wu_filepath + try: + os.remove(self.input_wu_filepath) + except Exception as e: + self.logger.warning(f"Failed to remove {self.input_wu_filepath}: {e}") + # Delete the shards for this WorkUnit + for i in range(len(wu)): + shard_path = os.path.join(directory_containing_shards, f"{i}_{wu_filename}") + try: + os.remove(shard_path) + except Exception as e: + self.logger.warning(f"Failed to remove WorkUnit shard {shard_path}: {e}") + self.logger.info(f"Successfully removed WorkUnit {self.input_wu_filepath}") + return self.result_filepath From a7fb13e289eaf026b0d91d30c9310c3099dfc84a Mon Sep 17 00:00:00 2001 From: Wilson Beebe Date: Fri, 11 Apr 2025 14:32:53 -0700 Subject: [PATCH 09/14] Updated files --- src/kbmod_wf/task_impls/kbmod_search.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/kbmod_wf/task_impls/kbmod_search.py b/src/kbmod_wf/task_impls/kbmod_search.py index e5f561d9..1d2c629e 100644 --- a/src/kbmod_wf/task_impls/kbmod_search.py +++ b/src/kbmod_wf/task_impls/kbmod_search.py @@ -92,16 +92,17 @@ def run_search(self): self.logger.info(f"Writing results to output file: {self.result_filepath}") res.write_table(self.result_filepath) - self.logger.info("Results written to file") + if self.cleanup_wu: self.logger.info(f"Cleaning up sharded WorkUnit {self.input_wu_filepath} with {len(wu)}") - # Delete the head WorkUnit file at self.input_wu_filepath + # Delete the head filefor the WorkUnit try: os.remove(self.input_wu_filepath) except Exception as e: self.logger.warning(f"Failed to remove {self.input_wu_filepath}: {e}") - # Delete the shards for this WorkUnit + + # Delete the individual shards for this WorkUnit, one existing for each image. for i in range(len(wu)): shard_path = os.path.join(directory_containing_shards, f"{i}_{wu_filename}") try: From 4be91a045c985dd8a771ca5a908aa6b3220f86df Mon Sep 17 00:00:00 2001 From: Wilson Beebe Date: Mon, 14 Apr 2025 13:44:10 -0700 Subject: [PATCH 10/14] Remove date from Parsl workflow run directory (#61) --- src/kbmod_wf/resource_configs/usdf_configuration.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/kbmod_wf/resource_configs/usdf_configuration.py b/src/kbmod_wf/resource_configs/usdf_configuration.py index a1baf185..28df76f6 100644 --- a/src/kbmod_wf/resource_configs/usdf_configuration.py +++ b/src/kbmod_wf/resource_configs/usdf_configuration.py @@ -22,9 +22,9 @@ def usdf_resource_config(): app_cache=True, checkpoint_mode="task_exit", checkpoint_files=get_all_checkpoints( - os.path.join(base_path, "kbmod/workflow/run_logs", datetime.date.today().isoformat()) + os.path.join(base_path, "kbmod/workflow/run_logs") ), - run_dir=os.path.join(base_path, "kbmod/workflow/run_logs", datetime.date.today().isoformat()), + run_dir=os.path.join(base_path, "kbmod/workflow/run_logs"), retries=1, executors=[ HighThroughputExecutor( From af4f8ad3aec42c92cd1403dfd3e1e2249a76baf6 Mon Sep 17 00:00:00 2001 From: Colin Orion Chandler Date: Wed, 23 Apr 2025 11:06:08 -0700 Subject: [PATCH 11/14] Added parsl config scripts for USDF (#63) Co-authored-by: Colin Orion Chandler --- scripts/generic_parent_parsl_sbatch.sh | 63 ++++++++++ scripts/generic_runtime_config.toml | 69 +++++++++++ scripts/generic_search_config.yaml | 59 +++++++++ scripts/parsl_configurator.py | 112 ++++++++++++++++++ .../resource_configs/usdf_configuration.py | 50 +++++--- 5 files changed, 337 insertions(+), 16 deletions(-) create mode 100644 scripts/generic_parent_parsl_sbatch.sh create mode 100644 scripts/generic_runtime_config.toml create mode 100644 scripts/generic_search_config.yaml create mode 100644 scripts/parsl_configurator.py diff --git a/scripts/generic_parent_parsl_sbatch.sh b/scripts/generic_parent_parsl_sbatch.sh new file mode 100644 index 00000000..03d6fc9b --- /dev/null +++ b/scripts/generic_parent_parsl_sbatch.sh @@ -0,0 +1,63 @@ +#!/bin/bash +#SBATCH --job-name=parslParent +#SBATCH --time=48:00:00 +#SBATCH --ntasks=1 +#SBATCH --cpus-per-task=1 +#SBATCH --mem=16G +#SBATCH --gpus=0 +#SBATCH --account=rubin:commissioning +#SBATCH --partition=roma +#SBATCH --output=parsl_parent_%j.out +#SBATCH --comment="____comment____" + +# defaults should look like this, and we should plan on installing kbmod and kbmod-wf in the stack directory for sanity +rubindir="/sdf/home/c/colinc/rubin-user" +stackdir="$rubindir"/"lsst_stack_v28_0_1" +# kbmodwfdir="$stackdir"/"kbmod-wf" +kbmodwfdir="$rubindir"/"parsl/kbmod-wf" + +export GPUNODE="ampere" +if [ $# -gt 0 ];then + if [ $(echo $1 | grep -c "ada") -gt 0 ];then + export GPUNODE="ada" +# stackdir="$rubindir"/"lsst_stack_v29_0_0" + stackdir="$rubindir"/"lsst_stack_v28_0_1_ada" + kbmodwfdir="$stackdir"/"kbmod-wf" + else + export GPUNODE="ampere" + stackdir="$rubindir"/"lsst_stack_v28_0_1" + kbmodwfdir="$rubindir"/"parsl/kbmod-wf" + fi +fi + +echo "" +echo "GPUNODE is $GPUNODE" +echo "stackdir is $stackdir" +echo "kbmodwfdir is $kbmodwfdir" +echo "" + +sd="$(pwd)" + +date + +echo "$(date) hostname: $(hostname)" + +hostnamectl +nvidia-smi + +echo "" +echo "$(date) -- Loading LSST stack environment..." +time source "$stackdir"/"loadLSST.bash" + +echo "$(date) -- Running setup lsst_distrib next..." +time setup "lsst_distrib" + +nvcc --version +gcc --version + +# python "$kbmodwfdir"/"src/kbmod_wf/multi_night_workflow.py" --runtime-config="$rubindir"/"parsl/staging/39.0_20X20_shards/runtime_config_39.0.toml" --env="usdf" + +echo "Command:" +echo "python $kbmodwfdir/src/kbmod_wf/multi_night_workflow.py --runtime-config=____tomlfile____ --env=usdf" + +python "$kbmodwfdir/src/kbmod_wf/multi_night_workflow.py" --runtime-config="____tomlfile____" --env="usdf" diff --git a/scripts/generic_runtime_config.toml b/scripts/generic_runtime_config.toml new file mode 100644 index 00000000..b89c520a --- /dev/null +++ b/scripts/generic_runtime_config.toml @@ -0,0 +1,69 @@ +# All values set here will be applied to the resource configuration prior to +# calling parsl.load(config). Even if the key does't exist in the resource +# config, it will be added with the value defined here. +# +# Values in the apps.XXX section will be passed as a dictionary to the corresponding +# app. e.g. apps.create_uri_manifest will be passed to the create_uri_manifest app. + + +[resource_config_modifiers] +checkpoint_mode = 'task_exit' + + + +[apps.create_manifest] +# The path to the staging directory, which contains the .collection files +#staging_directory = "/sdf/home/c/colinc/rubin-user/parsl/staging/39.0_20X20_shards" + +staging_directory = "____basedir____" +output_directory = "____basedir____/output" +file_pattern = "*.collection" + +#helio_guess_dists = [39.0] +helio_guess_dists = [____reflexdist____] + + + +[apps.ic_to_wu] +# The path to the KBMOD search config file +# e.g. "/gscratch/dirac/kbmod/workflow/kbmod_search_config.yaml" +search_config_filepath = "____basedir____/search_config.yaml" + +# The path to the butler config file that instantiate a butler to retrieve images +#butler_config_filepath = "/repo/main" +butler_config_filepath = "____butlerpath____" + +# Remove a previously created WU file if it exists +overwrite = false + +helio_guess_dists = [____reflexdist____] + + + +[apps.reproject_wu] +# butler_config_filepath = "/repo/main" +butler_config_filepath = "____butlerpath____" + +search_config_filepath = "____basedir____/search_config.yaml" + +# Number of processors to use for parallelizing the reprojection +# n_workers = 32 +n_workers = ____nworkers____ + +# The name of the observation site to use for reflex correction +# observation_site = "Rubin" +observation_site = "____sitename____" + +helio_guess_dists = [____reflexdist____] + + + +[apps.kbmod_search] +# The path to the KBMOD search config yaml file +search_config_filepath = "____basedir____/search_config.yaml" + +helio_guess_dists = [____reflexdist____] + +# remove sharded WorkUnit files when done 4/11/2025 COC/WSB +#cleanup_wu = true +cleanup_wu = ____cleanupwu____ diff --git a/scripts/generic_search_config.yaml b/scripts/generic_search_config.yaml new file mode 100644 index 00000000..60fbc524 --- /dev/null +++ b/scripts/generic_search_config.yaml @@ -0,0 +1,59 @@ +center_thresh: 0.0 +chunk_size: 1000000 +clip_negative: false +cluster_eps: 120.0 +cluster_type: nn_start_end +cluster_v_scale: 1.0 +coadds: [] +debug: true +do_clustering: true +do_mask: true +do_stamp_filter: false +encode_num_bytes: -1 +generator_config: + angle_units: degree + angles: + - 90 + - -90 + - 64 + given_ecliptic: null + name: EclipticCenteredSearch + velocities: + - 25.0 + - 225.0 + - 64 + velocity_units: pix / d +im_filepath: null +legacy_filename: null +lh_level: 5.0 +max_lh: 10000.0 +mom_lims: +- 35.5 +- 35.5 +- 2.0 +- 0.3 +- 0.3 +num_obs: 7 +peak_offset: +- 2.0 +- 2.0 +psf_val: 1.4 +res_filepath: /sdf/home/c/colinc/rubin-user/20X20_patch250060 +result_filename: /sdf/home/c/colinc/rubin-user/20X20_patch250060/full_results.ecsv +results_per_pixel: 4 +save_all_stamps: False +sigmaG_lims: +- 25 +- 75 +stamp_radius: 50 +coadds: +- sum +- mean +- median +- weighted +stamp_type: sum +track_filtered: false +x_pixel_bounds: null +x_pixel_buffer: null +y_pixel_bounds: null +y_pixel_buffer: null diff --git a/scripts/parsl_configurator.py b/scripts/parsl_configurator.py new file mode 100644 index 00000000..1ed30a00 --- /dev/null +++ b/scripts/parsl_configurator.py @@ -0,0 +1,112 @@ +# 4/21/2025 COC + +import os + +def make_toml_file(basedir, generic_toml_path, site_name="Rubin", disable_cleanup=False, nworkers=32, repo_path="/repo/main", reflex_distances=None): + # + if reflex_distances == None: + print(f'WARNING: default reflex distances hardcoded to [39.0]') + reflex_distances = [39.0] + lookup_dict = {} + lookup_dict["____basedir____"] = basedir + lookup_dict["____butlerpath____"] = repo_path + lookup_dict["____reflexdist____"] = ",".join([str(float(i)) for i in reflex_distances]) + lookup_dict["____nworkers____"] = str(nworkers) + lookup_dict["____sitename____"] = site_name + if disable_cleanup == True: + lookup_dict["____cleanupwu____"] = "false" + else: + lookup_dict["____cleanupwu____"] = "true" + # + outfile = f"{basedir}/runtime_config.toml" + with open(outfile, "w") as f: + with open(generic_toml_path, "r") as g: + for line in g: + line = line.strip() + if line.startswith("#") or len(line.strip()) == 0: # comment or blank line + print(line, file=f) + continue + found_key = False + for key in lookup_dict: + if key in line: + print(line.replace(key, lookup_dict[key]), file=f) + found_key = True + break + if found_key == True: + continue + print(line, file=f) # catch-all + print(f"Finished writing {outfile} to disk.") + + +def make_sbatch_file(generic_sbatch_file, basedir, comment=None): + """ + Make accompanying Parsl parent sbatch file. + """ + outfile = f"{basedir}/parent_parsl_sbatch.sh" + tomlfile = f"{basedir}/runtime_config.toml" + + lookup_dict = {} + lookup_dict["____basedir____"] = basedir + lookup_dict["____tomlfile____"] = tomlfile + + if comment == None: + lookup_dict["____comment____"] = os.path.basename(basedir) + else: + lookup_dict["____comment____"] = comment + + with open(outfile, "w") as f: + with open(generic_sbatch_file, "r") as g: + for line in g: +# if line.startswith("#") or len(line) == 0: +# print(line, file=f) +# continue + found_key = False + for key in lookup_dict: + if key in line: + print(line.replace(key, lookup_dict[key]), file=f) + found_key = True + break + if found_key == True: + continue + print(line, file=f) + print(f'Wrote {outfile} to disk.') + return outfile + +def make_yaml_file(generic_yaml_file, basedir): + yaml_outfile = f"{basedir}/search_config.yaml" + with open(generic_yaml_file, 'r') as g: + with open(yaml_outfile, 'w') as f: + for line in g: + print(line, file=f) + print(f'Wrote {yaml_outfile} to disk.') + return yaml_outfile + + +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser(description='Tool to generate .toml files for Parsl Workflow.') # 4/21/2025 COC + parser.add_argument('--basedir', dest='basedir', help='path to staging directory containing ImageCollections. Default: current working directory.', type=str, default=os.getcwd()) + parser.add_argument('--reflex-distances', dest='reflex_distances', help='reflex-correction distances; default is None, which results in the directory being crawled and the first reflex-distance found in an ImageCollection is used. There should only be one distance, but multiple are supported.', type=float, default=None, nargs='+') + parser.add_argument('--site-name', dest='site_name', help='Observatory site name. Default: "Rubin"', type=str, default="Rubin") + parser.add_argument('--disable-cleanup', dest='disable_cleanup', help='disable deletion of reprojected WorkUnits after successful run. Default: False', type=bool, default=False) + parser.add_argument('--nworkers', dest='nworkers', help='number of workers (cores) to assume. Default: 32', type=int, default=32) + parser.add_argument('--repo-path', dest='repo_path', help='Butler repository path. Default: /repo/main', type=str, default="/repo/main") + parser.add_argument('--generic-toml-path', dest='generic_toml_path', help='Path to a special generic TOML file. Default: ~/generic_runtime_config.toml', type=str, default=f"{os.environ['HOME']}/generic_runtime_config.toml") + parser.add_argument('--generic-sbatch-path', dest='generic_sbatch_path', help='Path to a special generic sbatch file. Default: ~/generic_parent_parsl_sbatch.sh', type=str, default=f"{os.environ['HOME']}/generic_parent_parsl_sbatch.sh") + parser.add_argument('--generic-yaml-path', dest='generic_yaml_path', help='Path to a special generic kbmod search config yaml file. Default: ~/generic_search_config.yaml', type=str, default=f"{os.environ['HOME']}/generic_search_config.yaml") + + # + args = parser.parse_args() + # + # TODO add checks for input files here + # + make_toml_file(basedir=args.basedir, + generic_toml_path=args.generic_toml_path, + site_name=args.site_name, + disable_cleanup=args.disable_cleanup, + nworkers=args.nworkers, + repo_path=args.repo_path, + reflex_distances=args.reflex_distances + ) + make_sbatch_file(generic_sbatch_file=args.generic_sbatch_path, basedir=args.basedir) + make_yaml_file(generic_yaml_file=args.generic_yaml_path, basedir=args.basedir) \ No newline at end of file diff --git a/src/kbmod_wf/resource_configs/usdf_configuration.py b/src/kbmod_wf/resource_configs/usdf_configuration.py index 28df76f6..7ff87a0f 100644 --- a/src/kbmod_wf/resource_configs/usdf_configuration.py +++ b/src/kbmod_wf/resource_configs/usdf_configuration.py @@ -1,9 +1,27 @@ +# Resource notes +# Milano - 480 Gb max RAM - Rubin main nodes for processing +# Roma - 480 Gb max RAM +# Ampere - A100 - 40 Gb GPU - ~896 Gb max RAM +# Ada - L40S - 46 Gb GPU - ~512 Gb max RAM + import os import datetime from parsl import Config from parsl.executors import HighThroughputExecutor from parsl.providers import LocalProvider, SlurmProvider from parsl.utils import get_all_checkpoints +import platform + +nodename_map = {"sdfada":"ada", "sdfampere":"ampere", "sdfroma":"roma", "sdfmilano":"milano"} +max_ram_dict = {"ada":350, "ampere":896, "roma":480, "milano":480} + +gpu_partition = "ampere" +cpu_partition = "milano" + +if "GPUNODE" in os.environ: + gpu_partition = os.environ["GPUNODE"].lower() + print(f"Set gpu_partition to {gpu_partition} via environment variable GPUNODE.") + walltimes = { "compute_bigmem": "01:00:00", @@ -31,7 +49,7 @@ def usdf_resource_config(): label="small_cpu", max_workers=1, provider=SlurmProvider( - partition="milano", + partition=cpu_partition, account=account_name, min_blocks=0, max_blocks=64, @@ -39,7 +57,7 @@ def usdf_resource_config(): parallelism=1, nodes_per_block=1, cores_per_node=1, # perhaps should be 8??? - mem_per_node=256, # In GB; milano has a 256 Gb cap + mem_per_node=256, # In GB; milano, roma have a ~480 Gb cap 4/16/2025 COC exclusive=False, walltime=walltimes["compute_bigmem"], scheduler_options="#SBATCH --export=ALL", # Add other options as needed @@ -51,15 +69,15 @@ def usdf_resource_config(): label="large_mem", max_workers=1, provider=SlurmProvider( - partition="ampere", # or ada?; note: milano has a 256 Gb cap + partition="roma", # or ada?; note: milano, roma have a ~480 Gb cap 4/16/2025 COC account=account_name, min_blocks=0, - max_blocks=12, + max_blocks=20, # 12 to 20 4/16/2025 COC init_blocks=0, parallelism=1, nodes_per_block=1, cores_per_node=32, - mem_per_node=512, + mem_per_node=max_ram_dict["roma"], exclusive=False, walltime=walltimes["large_mem"], scheduler_options="#SBATCH --export=ALL", # Add other options as needed @@ -71,15 +89,15 @@ def usdf_resource_config(): label="sharded_reproject", max_workers=1, provider=SlurmProvider( - partition="ampere", # or ada?; note: milano has a 256 Gb cap + partition=cpu_partition, # or ada?; see resource notes at top account=account_name, min_blocks=0, - max_blocks=12, + max_blocks=20, # 12 to 20 4/16/2025 COC init_blocks=0, parallelism=1, nodes_per_block=1, cores_per_node=32, - mem_per_node=512, # ~2-4 GB per core + mem_per_node=max_ram_dict[cpu_partition], # ~2-4 GB per core exclusive=False, walltime=walltimes["sharded_reproject"], scheduler_options="#SBATCH --export=ALL", # Add other options as needed @@ -91,19 +109,19 @@ def usdf_resource_config(): label="gpu", max_workers=1, provider=SlurmProvider( - partition="ampere", # or ada + partition=gpu_partition, # or ada account=account_name, min_blocks=0, - max_blocks=8, + max_blocks=8, # 8 to 24 4/16/2025 COC to 12 4/20/2025 COC to 8 4/22/2025 COC init_blocks=0, parallelism=1, nodes_per_block=1, cores_per_node=2, # perhaps should be 8??? - mem_per_node=512, # In GB + mem_per_node=max_ram_dict[gpu_partition], # In GB; 512 OOMs with 20X20s 4/16/2025 COC exclusive=False, walltime=walltimes["gpu_max"], # Command to run before starting worker - i.e. conda activate - worker_init="", + worker_init="hostname;hostnamectl;nvidia-smi", scheduler_options="#SBATCH --gpus=1\n#SBATCH --export=ALL", ), ), @@ -111,19 +129,19 @@ def usdf_resource_config(): label="large_gpu", max_workers=1, provider=SlurmProvider( - partition="ampere", # or ada; was turing, but we do not have access + partition=gpu_partition, # or ada; was turing, but we do not have access account=account_name, min_blocks=0, - max_blocks=8, + max_blocks=8, # 8 to 24 4/16/2025 COC to 12 4/20/2025 to 8 4/25/2025 COC init_blocks=0, parallelism=1, nodes_per_block=1, cores_per_node=2, # perhaps should be 8??? - mem_per_node=512, # In GB + mem_per_node=max_ram_dict[gpu_partition], # In GB; 512G OOM with 20X20 4/16/2025 COC exclusive=False, walltime=walltimes["gpu_max"], # Command to run before starting worker - i.e. conda activate - worker_init="", + worker_init="hostname;hostnamectl;nvidia-smi", scheduler_options="#SBATCH --gpus=1\n#SBATCH --export=ALL", ), ), From dc4a8ed447cfa8f700c57e1e070a718bd07ed8fb Mon Sep 17 00:00:00 2001 From: Colin Orion Chandler Date: Thu, 24 Apr 2025 17:06:22 -0700 Subject: [PATCH 12/14] automatic detection of usdf; new Parsl fixes (#64) * automatic detection of usdf; new Parsl fixes * automated correct location of generic files * fixed Klone config to match new Parsl --------- Co-authored-by: Colin Orion Chandler Co-authored-by: Colin Orion Chandler --- scripts/parsl_configurator.py | 11 ++-- .../resource_configs/klone_configuration.py | 8 +-- .../resource_configs/usdf_configuration.py | 51 ++++++++++++++----- .../utilities/configuration_utilities.py | 6 ++- 4 files changed, 52 insertions(+), 24 deletions(-) diff --git a/scripts/parsl_configurator.py b/scripts/parsl_configurator.py index 1ed30a00..e7e02a7b 100644 --- a/scripts/parsl_configurator.py +++ b/scripts/parsl_configurator.py @@ -1,6 +1,9 @@ # 4/21/2025 COC import os +import inspect +import os +script_directory = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) def make_toml_file(basedir, generic_toml_path, site_name="Rubin", disable_cleanup=False, nworkers=32, repo_path="/repo/main", reflex_distances=None): # @@ -91,9 +94,9 @@ def make_yaml_file(generic_yaml_file, basedir): parser.add_argument('--disable-cleanup', dest='disable_cleanup', help='disable deletion of reprojected WorkUnits after successful run. Default: False', type=bool, default=False) parser.add_argument('--nworkers', dest='nworkers', help='number of workers (cores) to assume. Default: 32', type=int, default=32) parser.add_argument('--repo-path', dest='repo_path', help='Butler repository path. Default: /repo/main', type=str, default="/repo/main") - parser.add_argument('--generic-toml-path', dest='generic_toml_path', help='Path to a special generic TOML file. Default: ~/generic_runtime_config.toml', type=str, default=f"{os.environ['HOME']}/generic_runtime_config.toml") - parser.add_argument('--generic-sbatch-path', dest='generic_sbatch_path', help='Path to a special generic sbatch file. Default: ~/generic_parent_parsl_sbatch.sh', type=str, default=f"{os.environ['HOME']}/generic_parent_parsl_sbatch.sh") - parser.add_argument('--generic-yaml-path', dest='generic_yaml_path', help='Path to a special generic kbmod search config yaml file. Default: ~/generic_search_config.yaml', type=str, default=f"{os.environ['HOME']}/generic_search_config.yaml") + parser.add_argument('--generic-toml-path', dest='generic_toml_path', help='Path to a special generic TOML file. Default: ~/generic_runtime_config.toml', type=str, default=f"{script_directory}/generic_runtime_config.toml") + parser.add_argument('--generic-sbatch-path', dest='generic_sbatch_path', help='Path to a special generic sbatch file. Default: ~/generic_parent_parsl_sbatch.sh', type=str, default=f"{script_directory}/generic_parent_parsl_sbatch.sh") + parser.add_argument('--generic-yaml-path', dest='generic_yaml_path', help='Path to a special generic kbmod search config yaml file. Default: ~/generic_search_config.yaml', type=str, default=f"{script_directory}/generic_search_config.yaml") # args = parser.parse_args() @@ -109,4 +112,4 @@ def make_yaml_file(generic_yaml_file, basedir): reflex_distances=args.reflex_distances ) make_sbatch_file(generic_sbatch_file=args.generic_sbatch_path, basedir=args.basedir) - make_yaml_file(generic_yaml_file=args.generic_yaml_path, basedir=args.basedir) \ No newline at end of file + make_yaml_file(generic_yaml_file=args.generic_yaml_path, basedir=args.basedir) diff --git a/src/kbmod_wf/resource_configs/klone_configuration.py b/src/kbmod_wf/resource_configs/klone_configuration.py index 953622b7..8150a8df 100644 --- a/src/kbmod_wf/resource_configs/klone_configuration.py +++ b/src/kbmod_wf/resource_configs/klone_configuration.py @@ -25,7 +25,7 @@ def klone_resource_config(): executors=[ HighThroughputExecutor( label="small_cpu", - max_workers=1, + max_workers_per_node=1, provider=SlurmProvider( partition="ckpt-g2", account="astro", @@ -44,7 +44,7 @@ def klone_resource_config(): ), HighThroughputExecutor( label="large_mem", - max_workers=1, + max_workers_per_node=1, provider=SlurmProvider( partition="ckpt-g2", account="astro", @@ -63,7 +63,7 @@ def klone_resource_config(): ), HighThroughputExecutor( label="sharded_reproject", - max_workers=1, + max_workers_per_node=1, provider=SlurmProvider( partition="ckpt-g2", account="astro", @@ -82,7 +82,7 @@ def klone_resource_config(): ), HighThroughputExecutor( label="gpu", - max_workers=1, + max_workers_per_node=1, provider=SlurmProvider( partition="ckpt-g2", account="escience", diff --git a/src/kbmod_wf/resource_configs/usdf_configuration.py b/src/kbmod_wf/resource_configs/usdf_configuration.py index 7ff87a0f..5e72b759 100644 --- a/src/kbmod_wf/resource_configs/usdf_configuration.py +++ b/src/kbmod_wf/resource_configs/usdf_configuration.py @@ -4,17 +4,29 @@ # Ampere - A100 - 40 Gb GPU - ~896 Gb max RAM # Ada - L40S - 46 Gb GPU - ~512 Gb max RAM +# +# removed max_workers from executors 4/23/2025 COC +# + import os import datetime from parsl import Config from parsl.executors import HighThroughputExecutor from parsl.providers import LocalProvider, SlurmProvider from parsl.utils import get_all_checkpoints +from parsl.monitoring.monitoring import MonitoringHub # COC +from parsl.addresses import address_by_hostname # COC +import logging # COC + import platform nodename_map = {"sdfada":"ada", "sdfampere":"ampere", "sdfroma":"roma", "sdfmilano":"milano"} -max_ram_dict = {"ada":350, "ampere":896, "roma":480, "milano":480} - +max_ram_dict = {"ada":70, # 351 Gb total, with 5 GPUs total on the one node, leaves 70 Gb per task + "ampere":220, # 896 per each of the two nodes we can access, each with 4 GPUs + "roma":480, + "milano":480 +} +max_block_dict = {"ada":5, "ampere":8} gpu_partition = "ampere" cpu_partition = "milano" @@ -47,7 +59,8 @@ def usdf_resource_config(): executors=[ HighThroughputExecutor( label="small_cpu", - max_workers=1, + # max_workers=1, + max_workers_per_node=1, provider=SlurmProvider( partition=cpu_partition, account=account_name, @@ -67,7 +80,8 @@ def usdf_resource_config(): ), HighThroughputExecutor( label="large_mem", - max_workers=1, + # max_workers=1, + max_workers_per_node=1, provider=SlurmProvider( partition="roma", # or ada?; note: milano, roma have a ~480 Gb cap 4/16/2025 COC account=account_name, @@ -87,7 +101,8 @@ def usdf_resource_config(): ), HighThroughputExecutor( label="sharded_reproject", - max_workers=1, + # max_workers=1, + max_workers_per_node=1, provider=SlurmProvider( partition=cpu_partition, # or ada?; see resource notes at top account=account_name, @@ -102,46 +117,48 @@ def usdf_resource_config(): walltime=walltimes["sharded_reproject"], scheduler_options="#SBATCH --export=ALL", # Add other options as needed # Command to run before starting worker - i.e. conda activate - worker_init="", + worker_init="env | grep SLURM", ), ), HighThroughputExecutor( label="gpu", - max_workers=1, + # max_workers=1, + max_workers_per_node=1, provider=SlurmProvider( partition=gpu_partition, # or ada account=account_name, min_blocks=0, - max_blocks=8, # 8 to 24 4/16/2025 COC to 12 4/20/2025 COC to 8 4/22/2025 COC + max_blocks=max_block_dict[gpu_partition], # 8 to 24 4/16/2025 COC to 12 4/20/2025 COC to 8 4/22/2025 COC init_blocks=0, parallelism=1, nodes_per_block=1, - cores_per_node=2, # perhaps should be 8??? + cores_per_node=4, # perhaps should be 8??? mem_per_node=max_ram_dict[gpu_partition], # In GB; 512 OOMs with 20X20s 4/16/2025 COC exclusive=False, walltime=walltimes["gpu_max"], # Command to run before starting worker - i.e. conda activate - worker_init="hostname;hostnamectl;nvidia-smi", + worker_init="hostname;hostnamectl;nvidia-smi;env | grep SLURM", scheduler_options="#SBATCH --gpus=1\n#SBATCH --export=ALL", ), ), HighThroughputExecutor( label="large_gpu", - max_workers=1, + # max_workers=1, + max_workers_per_node=1, provider=SlurmProvider( partition=gpu_partition, # or ada; was turing, but we do not have access account=account_name, min_blocks=0, - max_blocks=8, # 8 to 24 4/16/2025 COC to 12 4/20/2025 to 8 4/25/2025 COC + max_blocks=max_block_dict[gpu_partition], # 8 to 24 4/16/2025 COC to 12 4/20/2025 to 8 4/25/2025 COC init_blocks=0, parallelism=1, nodes_per_block=1, - cores_per_node=2, # perhaps should be 8??? + cores_per_node=4, # perhaps should be 8??? mem_per_node=max_ram_dict[gpu_partition], # In GB; 512G OOM with 20X20 4/16/2025 COC exclusive=False, walltime=walltimes["gpu_max"], # Command to run before starting worker - i.e. conda activate - worker_init="hostname;hostnamectl;nvidia-smi", + worker_init="hostname;hostnamectl;nvidia-smi;env | grep SLURM", scheduler_options="#SBATCH --gpus=1\n#SBATCH --export=ALL", ), ), @@ -153,4 +170,10 @@ def usdf_resource_config(): ), ), ], + monitoring=MonitoringHub( + hub_address=address_by_hostname(), + hub_port=55055, + monitoring_debug=True, + resource_monitoring_interval=10, + ), ) diff --git a/src/kbmod_wf/utilities/configuration_utilities.py b/src/kbmod_wf/utilities/configuration_utilities.py index fcb9adaf..3730d9fd 100644 --- a/src/kbmod_wf/utilities/configuration_utilities.py +++ b/src/kbmod_wf/utilities/configuration_utilities.py @@ -26,9 +26,11 @@ def get_resource_config(env: Literal["dev", "klone", "usdf"] | None = None): ValueError If an unknown environment is provided, raise a ValueError. """ - + print(f"env was {env}") if env is None: - if platform.system().lower() == "darwin": + if platform.node().startswith("sdf"): + config = usdf_resource_config() + elif platform.system().lower() == "darwin": config = dev_resource_config() elif is_running_on_wsl(): config = dev_resource_config() From 353b91b9b76a0d291e4eaf3c0b05dcba549ffcff Mon Sep 17 00:00:00 2001 From: Colin Orion Chandler Date: Mon, 5 May 2025 16:09:02 -0700 Subject: [PATCH 13/14] One node to rule them all (#65) * Adding whole-node approach for GPU work. * Rest of the files --------- Co-authored-by: Colin Orion Chandler --- scripts/generic_parent_parsl_sbatch.sh | 2 +- scripts/generic_search_config.yaml | 1 + scripts/gpu_worker_launcher.sh | 14 ++ scripts/parsl_configurator.py | 43 +++++- .../resource_configs/usdf_configuration.py | 137 +++++++----------- src/kbmod_wf/task_impls/kbmod_search.py | 3 + src/kbmod_wf/utilities/logger_utilities.py | 10 +- 7 files changed, 109 insertions(+), 101 deletions(-) create mode 100755 scripts/gpu_worker_launcher.sh diff --git a/scripts/generic_parent_parsl_sbatch.sh b/scripts/generic_parent_parsl_sbatch.sh index 03d6fc9b..b44da5d4 100644 --- a/scripts/generic_parent_parsl_sbatch.sh +++ b/scripts/generic_parent_parsl_sbatch.sh @@ -1,6 +1,6 @@ #!/bin/bash #SBATCH --job-name=parslParent -#SBATCH --time=48:00:00 +#SBATCH --time=72:00:00 #SBATCH --ntasks=1 #SBATCH --cpus-per-task=1 #SBATCH --mem=16G diff --git a/scripts/generic_search_config.yaml b/scripts/generic_search_config.yaml index 60fbc524..a49cf28d 100644 --- a/scripts/generic_search_config.yaml +++ b/scripts/generic_search_config.yaml @@ -10,6 +10,7 @@ do_clustering: true do_mask: true do_stamp_filter: false encode_num_bytes: -1 +gpu_filter: true generator_config: angle_units: degree angles: diff --git a/scripts/gpu_worker_launcher.sh b/scripts/gpu_worker_launcher.sh new file mode 100755 index 00000000..a0eecf6e --- /dev/null +++ b/scripts/gpu_worker_launcher.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +# This will be called by each Parsl worker + +# Choose GPU based on slot/index in the host +WORKER_ID=$1 +export CUDA_VISIBLE_DEVICES=$WORKER_ID + +# Print for debugging +echo "Launching worker $WORKER_ID on GPU $CUDA_VISIBLE_DEVICES" + +# Shift args and exec actual worker +shift +exec "$@" diff --git a/scripts/parsl_configurator.py b/scripts/parsl_configurator.py index e7e02a7b..c7600367 100644 --- a/scripts/parsl_configurator.py +++ b/scripts/parsl_configurator.py @@ -1,10 +1,30 @@ # 4/21/2025 COC import os -import inspect -import os +import inspect +import os +import shutil +import glob + script_directory = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) +def get_reflex_distance(basedir, mode="collection"): + """Determine reflex distance, either by filename or by .collection content. 5/5/2025 COC""" + reflex_distance = None + first_ic_path = glob.glob(f"{basedir}/*.collection")[0] + if mode == "collection": + print(f"Determining reflex distances via first row of first collection in {basedir}...") + import kbmod + ic = kbmod.ImageCollection.read(first_ic_path) + reflex_distance = ic["helio_guess_dist"][0] + else: + print("Using default mode for get_reflex_distance: collection name") + reflex_distance = os.path.basename(first_ic_path).split("_")[1] + reflex_distance = float(reflex_distance) + print(f"Using reflex_distance {reflex_distance}") + return reflex_distance + + def make_toml_file(basedir, generic_toml_path, site_name="Rubin", disable_cleanup=False, nworkers=32, repo_path="/repo/main", reflex_distances=None): # if reflex_distances == None: @@ -47,19 +67,20 @@ def make_sbatch_file(generic_sbatch_file, basedir, comment=None): """ outfile = f"{basedir}/parent_parsl_sbatch.sh" tomlfile = f"{basedir}/runtime_config.toml" - + # lookup_dict = {} lookup_dict["____basedir____"] = basedir lookup_dict["____tomlfile____"] = tomlfile - + # if comment == None: lookup_dict["____comment____"] = os.path.basename(basedir) else: lookup_dict["____comment____"] = comment - + # with open(outfile, "w") as f: with open(generic_sbatch_file, "r") as g: for line in g: + line = line.rstrip("\n") # if line.startswith("#") or len(line) == 0: # print(line, file=f) # continue @@ -80,6 +101,7 @@ def make_yaml_file(generic_yaml_file, basedir): with open(generic_yaml_file, 'r') as g: with open(yaml_outfile, 'w') as f: for line in g: + line = line.rstrip("\n") print(line, file=f) print(f'Wrote {yaml_outfile} to disk.') return yaml_outfile @@ -97,19 +119,26 @@ def make_yaml_file(generic_yaml_file, basedir): parser.add_argument('--generic-toml-path', dest='generic_toml_path', help='Path to a special generic TOML file. Default: ~/generic_runtime_config.toml', type=str, default=f"{script_directory}/generic_runtime_config.toml") parser.add_argument('--generic-sbatch-path', dest='generic_sbatch_path', help='Path to a special generic sbatch file. Default: ~/generic_parent_parsl_sbatch.sh', type=str, default=f"{script_directory}/generic_parent_parsl_sbatch.sh") parser.add_argument('--generic-yaml-path', dest='generic_yaml_path', help='Path to a special generic kbmod search config yaml file. Default: ~/generic_search_config.yaml', type=str, default=f"{script_directory}/generic_search_config.yaml") - # args = parser.parse_args() # # TODO add checks for input files here # + reflex_distances = args.reflex_distances + if reflex_distances == None or reflex_distances == []: + reflex_distances = [get_reflex_distance(basedir=args.basedir)] make_toml_file(basedir=args.basedir, generic_toml_path=args.generic_toml_path, site_name=args.site_name, disable_cleanup=args.disable_cleanup, nworkers=args.nworkers, repo_path=args.repo_path, - reflex_distances=args.reflex_distances + reflex_distances=reflex_distances ) make_sbatch_file(generic_sbatch_file=args.generic_sbatch_path, basedir=args.basedir) make_yaml_file(generic_yaml_file=args.generic_yaml_path, basedir=args.basedir) + # + # Copy the GPU Worker script + shutil.copy(os.path.join(script_directory, "gpu_worker_launcher.sh"), args.basedir) + # + print(f"Finished!") diff --git a/src/kbmod_wf/resource_configs/usdf_configuration.py b/src/kbmod_wf/resource_configs/usdf_configuration.py index 5e72b759..dcbf6b28 100644 --- a/src/kbmod_wf/resource_configs/usdf_configuration.py +++ b/src/kbmod_wf/resource_configs/usdf_configuration.py @@ -5,8 +5,8 @@ # Ada - L40S - 46 Gb GPU - ~512 Gb max RAM # -# removed max_workers from executors 4/23/2025 COC # +# trying 10X10s so lowering memory by 50% to 240 on reproejct phase import os import datetime @@ -17,29 +17,39 @@ from parsl.monitoring.monitoring import MonitoringHub # COC from parsl.addresses import address_by_hostname # COC import logging # COC - +import numpy as np import platform nodename_map = {"sdfada":"ada", "sdfampere":"ampere", "sdfroma":"roma", "sdfmilano":"milano"} -max_ram_dict = {"ada":70, # 351 Gb total, with 5 GPUs total on the one node, leaves 70 Gb per task - "ampere":220, # 896 per each of the two nodes we can access, each with 4 GPUs - "roma":480, - "milano":480 + +slurm_cmd_timeout = 60 # default is 10 and that is timing out for sacct -X 5/1/2025 COC + +max_ram_dict = {"ada":350, # 351 Gb total, with 5 GPUs total on the one node, leaves 70 Gb per task + "ampere":952, # 896 per each of the two nodes we can access, each with 4 GPUs + "roma":140, # 240 to 140 + "milano":140 # 240 to 140 } -max_block_dict = {"ada":5, "ampere":8} +max_block_dict = {"ada":1, "ampere":2} +gpus_per_node_dict = {"ada":5, "ampere":4} +max_nodes_dict = {"ada":1, "ampere":2} +cpus_per_node_dict = {"ada":30, "ampere":112} # {"ada":6, "ampere":28} # ada cap is 36, ampere ≥100 +cores_per_worker_dict = {"ada":6, "ampere":28} +monitor_port_dict = {"ada":55056, "ampere":55066} + + gpu_partition = "ampere" -cpu_partition = "milano" +cpus_for_gpus_dict = {"ampere":"roma", "ada":"milano"} +cpu_partition = cpus_for_gpus_dict[gpu_partition] + if "GPUNODE" in os.environ: - gpu_partition = os.environ["GPUNODE"].lower() - print(f"Set gpu_partition to {gpu_partition} via environment variable GPUNODE.") + gpu_partition = os.environ["GPUNODE"].lower() + print(f"Set gpu_partition to {gpu_partition} via environment variable GPUNODE.") walltimes = { - "compute_bigmem": "01:00:00", - "large_mem": "04:00:00", "sharded_reproject": "04:00:00", - "gpu_max": "08:00:00", + "gpu": "12:00:00", } base_path = f"{os.environ['HOME']}/rubin-user/parsl/workflow_output" @@ -57,52 +67,10 @@ def usdf_resource_config(): run_dir=os.path.join(base_path, "kbmod/workflow/run_logs"), retries=1, executors=[ - HighThroughputExecutor( - label="small_cpu", - # max_workers=1, - max_workers_per_node=1, - provider=SlurmProvider( - partition=cpu_partition, - account=account_name, - min_blocks=0, - max_blocks=64, - init_blocks=0, - parallelism=1, - nodes_per_block=1, - cores_per_node=1, # perhaps should be 8??? - mem_per_node=256, # In GB; milano, roma have a ~480 Gb cap 4/16/2025 COC - exclusive=False, - walltime=walltimes["compute_bigmem"], - scheduler_options="#SBATCH --export=ALL", # Add other options as needed - # Command to run before starting worker - i.e. conda activate - worker_init="", - ), - ), - HighThroughputExecutor( - label="large_mem", - # max_workers=1, - max_workers_per_node=1, - provider=SlurmProvider( - partition="roma", # or ada?; note: milano, roma have a ~480 Gb cap 4/16/2025 COC - account=account_name, - min_blocks=0, - max_blocks=20, # 12 to 20 4/16/2025 COC - init_blocks=0, - parallelism=1, - nodes_per_block=1, - cores_per_node=32, - mem_per_node=max_ram_dict["roma"], - exclusive=False, - walltime=walltimes["large_mem"], - scheduler_options="#SBATCH --export=ALL", # Add other options as needed - # Command to run before starting worker - i.e. conda activate - worker_init="", - ), - ), HighThroughputExecutor( label="sharded_reproject", # max_workers=1, - max_workers_per_node=1, + max_workers_per_node=1, provider=SlurmProvider( partition=cpu_partition, # or ada?; see resource notes at top account=account_name, @@ -118,48 +86,40 @@ def usdf_resource_config(): scheduler_options="#SBATCH --export=ALL", # Add other options as needed # Command to run before starting worker - i.e. conda activate worker_init="env | grep SLURM", + cmd_timeout=slurm_cmd_timeout, ), ), HighThroughputExecutor( label="gpu", - # max_workers=1, - max_workers_per_node=1, + max_workers_per_node=gpus_per_node_dict[gpu_partition], # was 1 + cores_per_worker=cores_per_worker_dict[gpu_partition], + mem_per_worker=int(np.floor(max_ram_dict[gpu_partition]/gpus_per_node_dict[gpu_partition])), provider=SlurmProvider( partition=gpu_partition, # or ada account=account_name, - min_blocks=0, + min_blocks=max_nodes_dict[gpu_partition], # was 0 + init_blocks=max_nodes_dict[gpu_partition], # added 4/29/2025 COC max_blocks=max_block_dict[gpu_partition], # 8 to 24 4/16/2025 COC to 12 4/20/2025 COC to 8 4/22/2025 COC - init_blocks=0, parallelism=1, nodes_per_block=1, - cores_per_node=4, # perhaps should be 8??? + cores_per_node=cpus_per_node_dict[gpu_partition], mem_per_node=max_ram_dict[gpu_partition], # In GB; 512 OOMs with 20X20s 4/16/2025 COC - exclusive=False, - walltime=walltimes["gpu_max"], - # Command to run before starting worker - i.e. conda activate - worker_init="hostname;hostnamectl;nvidia-smi;env | grep SLURM", - scheduler_options="#SBATCH --gpus=1\n#SBATCH --export=ALL", - ), - ), - HighThroughputExecutor( - label="large_gpu", - # max_workers=1, - max_workers_per_node=1, - provider=SlurmProvider( - partition=gpu_partition, # or ada; was turing, but we do not have access - account=account_name, - min_blocks=0, - max_blocks=max_block_dict[gpu_partition], # 8 to 24 4/16/2025 COC to 12 4/20/2025 to 8 4/25/2025 COC - init_blocks=0, - parallelism=1, - nodes_per_block=1, - cores_per_node=4, # perhaps should be 8??? - mem_per_node=max_ram_dict[gpu_partition], # In GB; 512G OOM with 20X20 4/16/2025 COC - exclusive=False, - walltime=walltimes["gpu_max"], - # Command to run before starting worker - i.e. conda activate - worker_init="hostname;hostnamectl;nvidia-smi;env | grep SLURM", - scheduler_options="#SBATCH --gpus=1\n#SBATCH --export=ALL", + # exclusive=False, # disabled + walltime=walltimes["gpu"], + cmd_timeout=slurm_cmd_timeout, + worker_init=""" +export CUDA_VISIBLE_DEVICES=$((${PARSL_WORKER_RANK:-0} % 4)) +echo Assigned GPU $CUDA_VISIBLE_DEVICES to worker $PARSL_WORKER_RANK +hostname +hostnamectl +nvidia-smi +env +""", + scheduler_options=f""" +#SBATCH --gres=gpu:{gpus_per_node_dict[gpu_partition]} +#SBATCH --exclusive +""", +#"#SBATCH --gpus=1\n#SBATCH --export=ALL", ), ), HighThroughputExecutor( @@ -172,8 +132,9 @@ def usdf_resource_config(): ], monitoring=MonitoringHub( hub_address=address_by_hostname(), - hub_port=55055, + hub_port=monitor_port_dict[gpu_partition], monitoring_debug=True, resource_monitoring_interval=10, ), ) + diff --git a/src/kbmod_wf/task_impls/kbmod_search.py b/src/kbmod_wf/task_impls/kbmod_search.py index 1d2c629e..837b847f 100644 --- a/src/kbmod_wf/task_impls/kbmod_search.py +++ b/src/kbmod_wf/task_impls/kbmod_search.py @@ -27,6 +27,9 @@ def kbmod_search( str The fully resolved filepath of the results file. """ + worker_rank = os.environ["PARSL_WORKER_RANK"] + os.environ["CUDA_VISIBLE_DEVICES"] = worker_rank + print(f"Set worker_rank to {worker_rank}") kbmod_searcher = KBMODSearcher( wu_filepath=wu_filepath, result_filepath=result_filepath, diff --git a/src/kbmod_wf/utilities/logger_utilities.py b/src/kbmod_wf/utilities/logger_utilities.py index 85780969..8c06be11 100644 --- a/src/kbmod_wf/utilities/logger_utilities.py +++ b/src/kbmod_wf/utilities/logger_utilities.py @@ -17,31 +17,31 @@ }, "handlers": { "stdout": { - "level": "INFO", + "level": "DEBUG", "formatter": "standard", "class": "logging.StreamHandler", "stream": "ext://sys.stdout", }, "stderr": { - "level": "INFO", + "level": "DEBUG", "formatter": "standard", "class": "logging.StreamHandler", "stream": "ext://sys.stderr", }, "file": { - "level": "INFO", + "level": "DEBUG", "formatter": "standard", "class": "logging.FileHandler", "filename": "parsl.log", }, }, "loggers": { - "task": {"level": "INFO", "handlers": ["file", "stdout"], "propagate": False}, + "task": {"level": "DEBUG", "handlers": ["file", "stdout"], "propagate": False}, "task.create_manifest": {}, "task.ic_to_wu": {}, "task.reproject_wu": {}, "task.kbmod_search": {}, - "kbmod": {"level": "INFO", "handlers": ["file", "stdout"], "propagate": False}, + "kbmod": {"level": "DEBUG", "handlers": ["file", "stdout"], "propagate": False}, }, } """Default logging configuration for Parsl.""" From 6d08418a1397d80570919c43720ff0b8b92a2048 Mon Sep 17 00:00:00 2001 From: Wilson Beebe Date: Wed, 14 May 2025 14:55:55 -0700 Subject: [PATCH 14/14] Add flag to disorder/fuzz the obstimes of a WorkUnit (#66) --- src/kbmod_wf/task_impls/kbmod_search.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/kbmod_wf/task_impls/kbmod_search.py b/src/kbmod_wf/task_impls/kbmod_search.py index 837b847f..29e10c62 100644 --- a/src/kbmod_wf/task_impls/kbmod_search.py +++ b/src/kbmod_wf/task_impls/kbmod_search.py @@ -57,6 +57,10 @@ def __init__( self.cleanup_wu = self.runtime_config.get("cleanup_wu", False) self.results_directory = os.path.dirname(self.result_filepath) + # Whether or not to randomize timestamp ordering to create bad searches + # Useful for testing and ML training purposes. + self.disordered_search = self.runtime_config.get("disordered_search", False) + def run_search(self): # Check that KBMOD has access to a GPU before starting the search. if not kbmod.search.HAS_GPU: @@ -77,6 +81,9 @@ def run_search(self): config = wu.config + if self.disordered_search: + wu.disorder_obstimes() + # Modify the work unit results to be what is specified in command line args base_filename, _ = os.path.splitext(os.path.basename(self.result_filepath)) input_parameters = { @@ -96,7 +103,7 @@ def run_search(self): self.logger.info(f"Writing results to output file: {self.result_filepath}") res.write_table(self.result_filepath) self.logger.info("Results written to file") - + if self.cleanup_wu: self.logger.info(f"Cleaning up sharded WorkUnit {self.input_wu_filepath} with {len(wu)}") # Delete the head filefor the WorkUnit