diff --git a/bioblend/_tests/TestGalaxyInvocations.py b/bioblend/_tests/TestGalaxyInvocations.py index a7d95cd57..d10e2a985 100644 --- a/bioblend/_tests/TestGalaxyInvocations.py +++ b/bioblend/_tests/TestGalaxyInvocations.py @@ -144,6 +144,27 @@ def test_rerun_invocation(self): history = self.gi.histories.show_history(rerun_invocation['history_id'], contents=True) self.assertEqual(len(history), 3) + @test_util.skip_unless_galaxy('release_21.05') + def test_rerun_and_remap_invocation(self): + path = test_util.get_abspath(os.path.join('data', 'select_first.ga')) + wf = self.gi.workflows.import_workflow_from_local_path(path) + wf_inputs = { + "0": {'src': 'hda', 'id': self.dataset_id}, + "1": "-1", + } + invocation_id = self.gi.workflows.invoke_workflow(wf['id'], inputs=wf_inputs, history_id=self.history_id)['id'] + self.gi.invocations.wait_for_invocation(invocation_id) + failed_datasets = self.gi.datasets.get_datasets(history_id=self.history_id, name='Select first on data 1') + failed_dataset = self.gi.datasets.wait_for_dataset(failed_datasets[0]['id'], check=False) + self.assertEqual(failed_dataset['state'], 'error') + + self.gi.invocations.rerun_invocation(invocation_id, remap=True) + for dataset in self.gi.datasets.get_datasets(history_id=self.history_id): + self.gi.datasets.wait_for_dataset(dataset['id'], check=False) + + new_ok_jobs = self.gi.datasets.get_datasets(state='ok', history_id=self.history_id) + self.assertEqual(len(new_ok_jobs), 3) + def _invoke_workflow(self): dataset = {'src': 'hda', 'id': self.dataset_id} diff --git a/bioblend/galaxy/invocations/__init__.py b/bioblend/galaxy/invocations/__init__.py index b02e13c9e..6eba03906 100644 --- a/bioblend/galaxy/invocations/__init__.py +++ b/bioblend/galaxy/invocations/__init__.py @@ -132,7 +132,7 @@ def show_invocation(self, invocation_id): url = self._make_url(invocation_id) return self._get(url=url) - def rerun_invocation(self, invocation_id: str, inputs_update: Optional[dict] = None, + def rerun_invocation(self, invocation_id: str, remap: bool = False, inputs_update: Optional[dict] = None, params_update: Optional[dict] = None, history_id: Optional[str] = None, history_name: Optional[str] = None, import_inputs_to_history: bool = False, replacement_params: Optional[dict] = None, allow_tool_state_corrections: bool = False, @@ -144,6 +144,13 @@ def rerun_invocation(self, invocation_id: str, inputs_update: Optional[dict] = N :type invocation_id: str :param invocation_id: Encoded workflow invocation ID to be rerun + :type remap: bool + :param remap: when ``True``, only failed jobs will be rerun. All other parameters + for this method will be ignored. Job output(s) will be remapped onto the dataset(s) + created by the original jobs; if other jobs were waiting for these jobs to finish + successfully, they will be resumed using the new outputs of the tool runs. When ``False``, + an entire new invocation will be created, using the other parameters specified. + :type inputs_update: dict :param inputs_update: If different datasets should be used to the original invocation, this should contain a mapping of workflow inputs to the new @@ -190,12 +197,27 @@ def rerun_invocation(self, invocation_id: str, inputs_update: Optional[dict] = N Default is ``False``, but when setting parameters for a subworkflow, ``True`` is required. - :rtype: dict - :return: A dict describing the new workflow invocation. + :rtype: dict if ``remap=False``, or list if ``remap=True`` + :return: A dict describing the new workflow invocation, or a list of remapped jobs. .. note:: This method can only be used with Galaxy ``release_21.01`` or later. """ + if remap: + errored_jobs = self.gi.jobs.get_jobs(state='error', invocation_id=invocation_id) + remap_failures = 0 + rerun_jobs = [] + for job in errored_jobs: + try: + job = self.gi.jobs.rerun_job(job['id'], remap=True) + rerun_jobs.append(job) + except ValueError: + # should not occur, jobs from an invocation should always be remappable + remap_failures += 1 + if remap_failures: + raise ValueError(f'remap was set to True, but {remap_failures} out of {len(errored_jobs)} errored jobs could not be remapped.') + return rerun_jobs + invocation_details = self.show_invocation(invocation_id) workflow_id = invocation_details['workflow_id'] inputs = invocation_details['inputs']