From f93c7d89fd7800820c2ca158794ea3e7c552b3e7 Mon Sep 17 00:00:00 2001 From: Sergey Serebryakov Date: Mon, 20 Sep 2021 22:14:16 -0700 Subject: [PATCH] Check for wrong task CLI arguments. When users provide unrecognized task parameters on a command line (e.g., typo), MLCube configuration parser will report it and will raise a configuration error. --- mlcube/mlcube/__main__.py | 13 ++++---- mlcube/mlcube/config.py | 66 +++++++++++++++++++++++++++++++++------ 2 files changed, 64 insertions(+), 15 deletions(-) diff --git a/mlcube/mlcube/__main__.py b/mlcube/mlcube/__main__.py index e66a127d..37d5c63c 100644 --- a/mlcube/mlcube/__main__.py +++ b/mlcube/mlcube/__main__.py @@ -49,7 +49,7 @@ def parser_process(value: t.Text, state: click.parser.ParsingState): def _parse_cli_args(ctx: t.Optional[click.core.Context], mlcube: t.Text, platform: t.Optional[t.Text], - workspace: t.Optional[t.Text], + workspace: t.Optional[t.Text], tasks: t.Optional[t.List[t.Text]], resolve: bool) -> t.Tuple[t.Optional[t.Type[Runner]], DictConfig]: """ Args: @@ -57,6 +57,7 @@ def _parse_cli_args(ctx: t.Optional[click.core.Context], mlcube: t.Text, platfor mlcube: Path to MLCube root directory or mlcube.yaml file. platform: Platform to use to run this MLCube (docker, singularity, gcp, k8s etc). workspace: Workspace path to use. If not specified, default workspace inside MLCube directory is used. + tasks: List of tasks to be executed. resolve: if True, compute values in MLCube configuration. """ mlcube_inst: MLCubeDirectory = CliParser.parse_mlcube_arg(mlcube) @@ -75,7 +76,7 @@ def _parse_cli_args(ctx: t.Optional[click.core.Context], mlcube: t.Text, platfor runner_cls, runner_config = None, None mlcube_config = MLCubeConfig.create_mlcube_config( os.path.join(mlcube_inst.path, mlcube_inst.file), mlcube_cli_args, task_cli_args, runner_config, workspace, - resolve=resolve, runner_cls=runner_cls + tasks=tasks, resolve=resolve, runner_cls=runner_cls ) return runner_cls, mlcube_config @@ -132,7 +133,7 @@ def show_config(ctx: click.core.Context, mlcube: t.Text, platform: t.Text, works workspace: Workspace path to use. If not specified, default workspace inside MLCube directory is used. resolve: if True, compute values in MLCube configuration. """ - _, mlcube_config = _parse_cli_args(ctx, mlcube, platform, workspace, resolve) + _, mlcube_config = _parse_cli_args(ctx, mlcube, platform, workspace, tasks=None, resolve=resolve) print(OmegaConf.to_yaml(mlcube_config)) @@ -152,8 +153,8 @@ def run(ctx: click.core.Context, mlcube: t.Text, platform: t.Text, task: t.Text, task: Comma separated list of tasks to run. workspace: Workspace path to use. If not specified, default workspace inside MLCube directory is used. """ - runner_cls, mlcube_config = _parse_cli_args(ctx, mlcube, platform, workspace, resolve=True) - tasks: t.List[str] = CliParser.parse_list_arg(task, default='main') + tasks: t.List[t.Text] = CliParser.parse_list_arg(task, default='main') + runner_cls, mlcube_config = _parse_cli_args(ctx, mlcube, platform, workspace, tasks, resolve=True) for task in tasks: docker_runner = runner_cls(mlcube_config, task=task) docker_runner.run() @@ -162,7 +163,7 @@ def run(ctx: click.core.Context, mlcube: t.Text, platform: t.Text, task: t.Text, @cli.command(name='describe', help='Describe MLCube.') @mlcube_option def describe(mlcube: t.Text) -> None: - _, mlcube_config = _parse_cli_args(None, mlcube, None, None, resolve=True) + _, mlcube_config = _parse_cli_args(ctx=None, mlcube=mlcube, platform=None, workspace=None, tasks=None, resolve=True) print(f"MLCube") print(f" path = {mlcube_config.runtime.root}") print(f" name = {mlcube_config.name}:{mlcube_config.get('version', 'latest')}") diff --git a/mlcube/mlcube/config.py b/mlcube/mlcube/config.py index 373552e5..060a73fe 100644 --- a/mlcube/mlcube/config.py +++ b/mlcube/mlcube/config.py @@ -2,6 +2,7 @@ import logging import typing as t from omegaconf import (OmegaConf, DictConfig) +from mlcube.errors import ConfigurationError from mlcube.runner import Runner logger = logging.getLogger(__name__) @@ -48,8 +49,8 @@ def get_uri(value: t.Text) -> t.Text: @staticmethod def create_mlcube_config(mlcube_config_file: t.Text, mlcube_cli_args: t.Optional[DictConfig] = None, task_cli_args: t.Optional[t.Dict] = None, runner_config: t.Optional[DictConfig] = None, - workspace: t.Optional[t.Text] = None, resolve: bool = True, - runner_cls: t.Optional[t.Type[Runner]] = None) -> DictConfig: + workspace: t.Optional[t.Text] = None, tasks: t.Optional[t.List[t.Text]] = None, + resolve: bool = True, runner_cls: t.Optional[t.Type[Runner]] = None) -> DictConfig: """ Create MLCube mlcube merging different configs - base, global, local and cli. Args: mlcube_config_file: Path to mlcube.yaml file. @@ -57,14 +58,13 @@ def create_mlcube_config(mlcube_config_file: t.Text, mlcube_cli_args: t.Optional task_cli_args: Task parameters from command line. runner_config: MLCube runner configuration, usually comes from system settings file. workspace: Workspace path to use in this MLCube run. + tasks: List of tasks to be executed. If empty or None, consider all tasks in MLCube config. resolve: If true, compute all values (some of them may reference other parameters or environmental variables). runner_cls: A python class for the runner type specified in `runner_config`. """ if mlcube_cli_args is None: mlcube_cli_args = OmegaConf.create({}) - if task_cli_args is None: - task_cli_args = {} if runner_config is None: runner_config = OmegaConf.create({}) logger.debug("mlcube_config_file = %s", mlcube_config_file) @@ -72,6 +72,7 @@ def create_mlcube_config(mlcube_config_file: t.Text, mlcube_cli_args: t.Optional logger.debug("task_cli_args = %s", task_cli_args) logger.debug("runner_config = %s", str(runner_config)) logger.debug("workspace = %s", workspace) + logger.debug("tasks = %s", tasks) # Load MLCube configuration and maybe override parameters from command line (like -Pdocker.build_strategy=...). actual_workspace = '${runtime.root}/workspace' if workspace is None else MLCubeConfig.get_uri(workspace) @@ -99,26 +100,49 @@ def create_mlcube_config(mlcube_config_file: t.Text, mlcube_cli_args: t.Optional if runner_cls: runner_cls.CONFIG.validate(mlcube_config) + # TODO: This needs some discussion. Originally, one task was supposed to run at a time. Now, we seem to converge + # to support lists of tasks. Current implementation continues to use old format of task parameters, i.e. + # `param_name=param_value`. This may result in an unexpected behavior when many tasks run, so we should think + # about a different rule: `task_name.param_name=param_value`. This is similar to parameter specification in + # DVC. + + # We will iterate over all tasks and make them representation canonical, but will use only tasks to run to + # check if users provided unrecognized parameters. + task_cli_args = task_cli_args or dict() # Dictionary of task arguments from a CL. + tasks_to_run = tasks or list(mlcube_config.tasks.keys()) # These tasks need to run later. + overridden_parameters = set() # Set of parameters from task_cli_args that were used. for task_name in mlcube_config.tasks.keys(): [task] = MLCubeConfig.ensure_values_exist(mlcube_config.tasks, task_name, dict) [parameters] = MLCubeConfig.ensure_values_exist(task, 'parameters', dict) [inputs, outputs] = MLCubeConfig.ensure_values_exist(parameters, ['inputs', 'outputs'], dict) - MLCubeConfig.check_parameters(inputs, task_cli_args) - MLCubeConfig.check_parameters(outputs, task_cli_args) + overridden_inputs = MLCubeConfig.check_parameters(inputs, task_cli_args) + overridden_outputs = MLCubeConfig.check_parameters(outputs, task_cli_args) + + if task_name in tasks_to_run: + overridden_parameters.update(overridden_inputs) + overridden_parameters.update(overridden_outputs) + + unknown_task_cli_args = set([name for name in task_cli_args if name not in overridden_parameters]) + if unknown_task_cli_args: + MLCubeConfig.report_unknown_task_cli_args(task_cli_args, unknown_task_cli_args) + raise ConfigurationError(f"Unknown task CLI arguments: {unknown_task_cli_args}") if resolve: OmegaConf.resolve(mlcube_config) return mlcube_config @staticmethod - def check_parameters(parameters: DictConfig, task_cli_args: t.Dict) -> None: + def check_parameters(parameters: DictConfig, task_cli_args: t.Dict) -> t.Set: """ Check that task parameters are defined according to MLCube schema. Args: parameters: Task parameters (`inputs` or `outputs`). - task_cli_args: Task parameters from command line. + task_cli_args: Task parameters that a user provided on a command line. + Returns: + Set of parameters that were overridden using task parameters on a command line. This function does not set `type` of parameters (if not present) in all cases. """ + overridden_parameters = set() for name in parameters.keys(): # The `_param_name` is anyway there, so check it's not None. [param_def] = MLCubeConfig.ensure_values_exist(parameters, name, dict) @@ -137,7 +161,9 @@ def check_parameters(parameters: DictConfig, task_cli_args: t.Dict) -> None: if param_def.type == ParameterType.UNKNOWN and param_def.default.endswith(os.sep): param_def.type = ParameterType.DIRECTORY # See if there is value on a command line - param_def.default = task_cli_args.get(name, param_def.default) + if name in task_cli_args: + param_def.default = task_cli_args.get(name) + overridden_parameters.add(name) # Check again parameter type. Users in certain number of cases will not be providing final slash on a # command line for directories, so we tried to infer types above using default values. Just in case, see # if we can do the same with user-provided values. @@ -150,3 +176,25 @@ def check_parameters(parameters: DictConfig, task_cli_args: t.Dict) -> None: # It probably does not make too much sense to see, let's say, if an input parameter exists and set its # type at this moment, because MLCube can run on remote hosts. + return overridden_parameters + + @staticmethod + def check_task_cli_args(tasks: DictConfig, task_cli_args: t.Dict) -> None: + """Find any unknown task CLI arguments and report error. + Args: + tasks: Dictionary of task definitions. + task_cli_args: Dictionary of task parameters that a user provided on a command line. + Raises: + ConfigurationError if at least one task CLI argument is not recognized. + """ + + @staticmethod + def report_unknown_task_cli_args(task_cli_args: t.Dict, unknown_task_cli_args: t.Set) -> None: + """Task CLI argument (s) has not been recognized, report this error. + Args: + task_cli_args: Dictionary of all task CLI arguments. + unknown_task_cli_args: Arguments that have not been used (recognized). + """ + print("The following task CLI arguments have not been used:") + for arg in unknown_task_cli_args: + print(f"\t{arg} = {task_cli_args[arg]}")