Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions bioblend/_tests/TestGalaxyInvocations.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,27 @@ def test_rerun_invocation(self):
history = self.gi.histories.show_history(rerun_invocation['history_id'], contents=True)
self.assertEqual(len(history), 3)

@test_util.skip_unless_galaxy('release_21.05')
def test_rerun_and_remap_invocation(self):
path = test_util.get_abspath(os.path.join('data', 'select_first.ga'))
wf = self.gi.workflows.import_workflow_from_local_path(path)
wf_inputs = {
"0": {'src': 'hda', 'id': self.dataset_id},
"1": "-1",
}
invocation_id = self.gi.workflows.invoke_workflow(wf['id'], inputs=wf_inputs, history_id=self.history_id)['id']
self.gi.invocations.wait_for_invocation(invocation_id)
failed_datasets = self.gi.datasets.get_datasets(history_id=self.history_id, name='Select first on data 1')
failed_dataset = self.gi.datasets.wait_for_dataset(failed_datasets[0]['id'], check=False)
self.assertEqual(failed_dataset['state'], 'error')

self.gi.invocations.rerun_invocation(invocation_id, remap=True)
for dataset in self.gi.datasets.get_datasets(history_id=self.history_id):
self.gi.datasets.wait_for_dataset(dataset['id'], check=False)

new_ok_jobs = self.gi.datasets.get_datasets(state='ok', history_id=self.history_id)
self.assertEqual(len(new_ok_jobs), 3)

def _invoke_workflow(self):
dataset = {'src': 'hda', 'id': self.dataset_id}

Expand Down
28 changes: 25 additions & 3 deletions bioblend/galaxy/invocations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def show_invocation(self, invocation_id):
url = self._make_url(invocation_id)
return self._get(url=url)

def rerun_invocation(self, invocation_id: str, inputs_update: Optional[dict] = None,
def rerun_invocation(self, invocation_id: str, remap: bool = False, inputs_update: Optional[dict] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding remap here, I think it would be better to add a separate method (rerun_invocation_failed_jobs() maybe?). The implementation for remap=True doesn't share any code with the current one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can also add it as a separate method. The rationale of doing it this way was so that rerun_invocation() would work exactly the same as rerun_job() - either create an entire new job/invocation with remap=False or replace the existing failed one in place with remap=True.

params_update: Optional[dict] = None, history_id: Optional[str] = None,
history_name: Optional[str] = None, import_inputs_to_history: bool = False,
replacement_params: Optional[dict] = None, allow_tool_state_corrections: bool = False,
Expand All @@ -144,6 +144,13 @@ def rerun_invocation(self, invocation_id: str, inputs_update: Optional[dict] = N
:type invocation_id: str
:param invocation_id: Encoded workflow invocation ID to be rerun

:type remap: bool
:param remap: when ``True``, only failed jobs will be rerun. All other parameters
for this method will be ignored. Job output(s) will be remapped onto the dataset(s)
created by the original jobs; if other jobs were waiting for these jobs to finish
successfully, they will be resumed using the new outputs of the tool runs. When ``False``,
an entire new invocation will be created, using the other parameters specified.

:type inputs_update: dict
:param inputs_update: If different datasets should be used to the original
invocation, this should contain a mapping of workflow inputs to the new
Expand Down Expand Up @@ -190,12 +197,27 @@ def rerun_invocation(self, invocation_id: str, inputs_update: Optional[dict] = N
Default is ``False``, but when setting parameters for a subworkflow,
``True`` is required.

:rtype: dict
:return: A dict describing the new workflow invocation.
:rtype: dict if ``remap=False``, or list if ``remap=True``
:return: A dict describing the new workflow invocation, or a list of remapped jobs.

.. note::
This method can only be used with Galaxy ``release_21.01`` or later.
"""
if remap:
errored_jobs = self.gi.jobs.get_jobs(state='error', invocation_id=invocation_id)
remap_failures = 0
rerun_jobs = []
for job in errored_jobs:
try:
job = self.gi.jobs.rerun_job(job['id'], remap=True)
rerun_jobs.append(job)
except ValueError:
# should not occur, jobs from an invocation should always be remappable
remap_failures += 1
if remap_failures:
raise ValueError(f'remap was set to True, but {remap_failures} out of {len(errored_jobs)} errored jobs could not be remapped.')
return rerun_jobs

invocation_details = self.show_invocation(invocation_id)
workflow_id = invocation_details['workflow_id']
inputs = invocation_details['inputs']
Expand Down