Skip to content

Commit f551099

Browse files
authored
Deprecate trigger --on-resume option. (#6974)
Remove obsolete trigger --on-resume option.
1 parent d0de7dc commit f551099

File tree

8 files changed

+21
-117
lines changed

8 files changed

+21
-117
lines changed

changes.d/6974.feat.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Removed obsolete trigger --on-resume option.

cylc/flow/commands.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,10 @@ async def force_trigger_tasks(
652652
flow: List[str],
653653
flow_wait: bool = False,
654654
flow_descr: Optional[str] = None,
655+
# BACK COMPAT: on_resume
656+
# Arg no longer used but retained for older clients.
657+
# From: 8.6
658+
# Remove at: 8.7
655659
on_resume: bool = False
656660
):
657661
"""Match and trigger a group of tasks (`cylc trigger` command).
@@ -667,12 +671,6 @@ async def force_trigger_tasks(
667671
flow = back_compat_flow_all(flow) # BACK COMPAT (see func def)
668672
ids = validate.is_tasks(tasks)
669673
validate.flow_opts(flow, flow_wait)
670-
if on_resume:
671-
LOG.warning(
672-
"The --on-resume option is deprecated and will be removed "
673-
"at Cylc 8.6."
674-
)
675-
676674
yield
677675

678676
matched, unmatched = schd.pool.id_match(ids)
@@ -705,7 +703,6 @@ async def force_trigger_tasks(
705703
flow,
706704
flow_wait,
707705
flow_descr,
708-
on_resume,
709706
)
710707

711708

@@ -715,7 +712,6 @@ def _force_trigger_tasks(
715712
flow: List[str],
716713
flow_wait: bool = False,
717714
flow_descr: Optional[str] = None,
718-
on_resume: bool = False
719715
):
720716
active = schd.pool.get_itasks(group_ids)
721717
if flow or not active:
@@ -792,7 +788,7 @@ def _force_trigger_tasks(
792788
schd.pool.merge_flows(itask, flow_nums)
793789

794790
# Trigger group start task.
795-
schd.pool.queue_or_trigger(itask, on_resume)
791+
schd.pool.queue_or_trigger(itask)
796792

797793
else:
798794
active_to_remove.append(itask.tokens.task)
@@ -883,6 +879,6 @@ def _force_trigger_tasks(
883879

884880
if jtask is not None and not in_flow_prereqs:
885881
# Trigger group start task.
886-
schd.pool.queue_or_trigger(jtask, on_resume)
882+
schd.pool.queue_or_trigger(jtask)
887883

888884
schd.pool.release_runahead_tasks()

cylc/flow/network/schema.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2219,12 +2219,16 @@ class Meta:
22192219
''')
22202220
resolver = partial(mutator, command='force_trigger_tasks')
22212221

2222+
# BACK COMPAT: on_resume
2223+
# Arg no longer used but retained for older clients.
2224+
# From: 8.6
2225+
# Remove at: 8.7
22222226
class Arguments(TaskMutation.Arguments, FlowMutationArguments):
22232227
on_resume = Boolean(
22242228
default_value=False,
22252229
description=sstrip('''
2226-
If the workflow is paused, wait until it is resumed before
2227-
running the triggered task(s).
2230+
DEPRECATED: this option is no longer needed and will be
2231+
ignored by the scheduler.
22282232
''')
22292233
)
22302234

cylc/flow/scheduler.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -683,9 +683,6 @@ async def run_scheduler(self) -> None:
683683

684684
# If we shut down with manually triggered waiting tasks,
685685
# submit them to run now.
686-
# NOTE: this will run tasks that were triggered with
687-
# the trigger "--on-resume" option, even if the workflow
688-
# is restarted as paused. Option to be removed at 8.6.0.
689686
pre_prep_tasks = []
690687
for itask in self.pool.get_tasks():
691688
if (
@@ -1349,7 +1346,7 @@ def release_tasks_to_run(self) -> bool:
13491346
and self.reload_pending is False
13501347
):
13511348
if self.pool.tasks_to_trigger_now:
1352-
# manually triggered tasks to run now, workflow paused or not
1349+
# manually triggered tasks to run now.
13531350
pre_prep_tasks.update(self.pool.tasks_to_trigger_now)
13541351
self.pool.tasks_to_trigger_now = set()
13551352

@@ -1362,10 +1359,6 @@ def release_tasks_to_run(self) -> bool:
13621359
else:
13631360
# release queued tasks
13641361
pre_prep_tasks.update(self.pool.release_queued_tasks())
1365-
if self.pool.tasks_to_trigger_on_resume:
1366-
# and manually triggered tasks to run once workflow resumed
1367-
pre_prep_tasks.update(self.pool.tasks_to_trigger_on_resume)
1368-
self.pool.tasks_to_trigger_on_resume = set()
13691362

13701363
elif (
13711364
(

cylc/flow/scripts/trigger.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,13 @@
7676
$flow: [Flow!],
7777
$flowWait: Boolean,
7878
$flowDescr: String,
79-
$onResume: Boolean,
8079
) {
8180
trigger (
8281
workflows: $wFlows,
8382
tasks: $tasks,
8483
flow: $flow,
8584
flowWait: $flowWait,
8685
flowDescr: $flowDescr,
87-
onResume: $onResume,
8886
) {
8987
result
9088
}
@@ -103,17 +101,6 @@ def get_option_parser() -> COP:
103101

104102
add_flow_opts_for_trigger_and_set(parser)
105103

106-
parser.add_option(
107-
"--on-resume",
108-
help=(
109-
"If the workflow is paused, wait until it is resumed before "
110-
"running the triggered task(s). DEPRECATED - this will be "
111-
"removed at Cylc 8.6."
112-
),
113-
action="store_true",
114-
default=False,
115-
dest="on_resume"
116-
)
117104
return parser
118105

119106

@@ -131,7 +118,6 @@ async def run(options: 'Values', workflow_id: str, *tokens_list):
131118
'flow': options.flow,
132119
'flowWait': options.flow_wait,
133120
'flowDescr': options.flow_descr,
134-
'onResume': options.on_resume,
135121
}
136122
}
137123
return await pclient.async_request('graphql', mutation_kwargs)

cylc/flow/task_pool.py

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,6 @@ def __init__(
212212

213213
self.tasks_to_hold: Set[Tuple[str, 'PointBase']] = set()
214214
self.tasks_to_trigger_now: Set['TaskProxy'] = set()
215-
self.tasks_to_trigger_on_resume: Set['TaskProxy'] = set()
216215

217216
def set_stop_task(self, task_id):
218217
"""Set stop after a task."""
@@ -904,8 +903,6 @@ def remove(self, itask: 'TaskProxy', reason: Optional[str] = None) -> None:
904903
else:
905904
with suppress(KeyError):
906905
self.tasks_to_trigger_now.remove(itask)
907-
with suppress(KeyError):
908-
self.tasks_to_trigger_on_resume.remove(itask)
909906
self.tasks_removed = True
910907
self.active_tasks_changed = True
911908
if not self.active_tasks[itask.point]:
@@ -2339,7 +2336,7 @@ def _get_active_flow_nums(self) -> 'FlowNums':
23392336
or {1}
23402337
)
23412338

2342-
def queue_or_trigger(self, itask: 'TaskProxy', on_resume: bool = False):
2339+
def queue_or_trigger(self, itask: 'TaskProxy'):
23432340
"""Handle state, queues, and runahead for a manually triggered task.
23442341
23452342
Triggering a non-queued task:
@@ -2349,7 +2346,7 @@ def queue_or_trigger(self, itask: 'TaskProxy', on_resume: bool = False):
23492346
Triggering a queued task:
23502347
- run it, regardless of the queue limit
23512348
2352-
If ready, add itask to the tasks_to_trigger_(now/on_resume) lists.
2349+
If ready, add itask to the tasks_to_trigger_now list.
23532350
23542351
Assumes the task is in the pool.
23552352
@@ -2389,16 +2386,7 @@ def queue_or_trigger(self, itask: 'TaskProxy', on_resume: bool = False):
23892386
if not itask.state.is_queued:
23902387
# If not queued now, record the task as ready to run.
23912388
itask.waiting_on_job_prep = True
2392-
2393-
if on_resume:
2394-
self.tasks_to_trigger_on_resume.add(itask)
2395-
# In case previously triggered without --on-resume.
2396-
# (It should have run already, but just in case).
2397-
self.tasks_to_trigger_now.discard(itask)
2398-
else:
2399-
self.tasks_to_trigger_now.add(itask)
2400-
# In case previously triggered with --on-resume.
2401-
self.tasks_to_trigger_on_resume.discard(itask)
2389+
self.tasks_to_trigger_now.add(itask)
24022390

24032391
# Task may be set running before xtrigger is satisfied,
24042392
# if so check/spawn if xtrigger sequential.

tests/integration/test_force_trigger.py

Lines changed: 0 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -152,69 +152,6 @@ async def test_trigger_group_whilst_paused(flow, scheduler, run, complete):
152152
await complete(schd, '1/c')
153153

154154

155-
async def test_trigger_on_resume(
156-
flow: 'Fixture',
157-
scheduler: 'Fixture',
158-
start: 'Fixture',
159-
capture_submission: 'Fixture',
160-
):
161-
"""
162-
Test manual triggering on-resume option when the workflow is paused.
163-
164-
https://github.com/cylc/cylc-flow/issues/6192
165-
166-
"""
167-
id_ = flow({
168-
'scheduling': {
169-
'queues': {
170-
'default': {
171-
'limit': 1,
172-
},
173-
},
174-
'graph': {
175-
'R1': '''
176-
a => x & y & z
177-
''',
178-
},
179-
},
180-
})
181-
schd = scheduler(id_, paused_start=True)
182-
183-
# start the scheduler (but don't set the main loop running)
184-
async with start(schd):
185-
186-
# capture task submissions (prevents real submissions)
187-
submitted_tasks = capture_submission(schd)
188-
189-
# paused at start-up so no tasks should be submitted
190-
assert len(submitted_tasks) == 0
191-
192-
# manually trigger 1/x - it not should be submitted
193-
await run_cmd(
194-
force_trigger_tasks(schd, ['1/x'], ["1"], on_resume=True))
195-
schd.release_tasks_to_run()
196-
assert len(submitted_tasks) == 0
197-
198-
# manually trigger 1/y - it should not be submitted
199-
# (queue limit reached)
200-
await run_cmd(
201-
force_trigger_tasks(schd, ['1/y'], ["1"], on_resume=True))
202-
schd.release_tasks_to_run()
203-
assert len(submitted_tasks) == 0
204-
205-
# manually trigger 1/y again - it should not be submitted
206-
# (triggering a queued task runs it)
207-
await run_cmd(
208-
force_trigger_tasks(schd, ['1/y'], ["1"], on_resume=True))
209-
schd.release_tasks_to_run()
210-
assert len(submitted_tasks) == 0
211-
212-
# resume the workflow, both tasks should trigger now.
213-
schd.resume_workflow()
214-
schd.release_tasks_to_run()
215-
assert len(submitted_tasks) == 2
216-
217-
218155
async def test_trigger_group(
219156
flow, scheduler, run, complete, log_filter
220157
):

tests/integration/test_remove.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ async def test_reload_changed_config(flow, scheduler, run, complete):
519519

520520

521521
async def test_remove_triggered(flow, scheduler, start):
522-
"""It should remove tasks from pool and from to_trigger sets."""
522+
"""It should remove tasks from pool and from to_trigger set."""
523523
conf = {
524524
'scheduling': {
525525
'graph': {
@@ -536,15 +536,14 @@ async def test_remove_triggered(flow, scheduler, start):
536536
)
537537
assert foo in schd.pool.tasks_to_trigger_now
538538

539-
# trigger bar (on resume)
539+
# trigger bar
540540
await run_cmd(
541-
force_trigger_tasks(schd, [bar.identity], [], on_resume=True)
541+
force_trigger_tasks(schd, [bar.identity], [])
542542
)
543-
assert bar in schd.pool.tasks_to_trigger_on_resume
543+
assert bar in schd.pool.tasks_to_trigger_now
544544

545545
await run_cmd(
546546
remove_tasks(schd, [foo.identity, bar.identity], [])
547547
)
548548
assert not schd.pool.get_tasks()
549549
assert not schd.pool.tasks_to_trigger_now
550-
assert not schd.pool.tasks_to_trigger_on_resume

0 commit comments

Comments
 (0)