1
+ # DTask-level cancellation
2
+
3
+ mutable struct CancelToken
4
+ @atomic cancelled:: Bool
5
+ @atomic graceful:: Bool
6
+ event:: Base.Event
7
+ end
8
+ CancelToken () = CancelToken (false , false , Base. Event ())
9
+ function cancel! (token:: CancelToken ; graceful:: Bool = true )
10
+ if ! graceful
11
+ @atomic token. graceful = false
12
+ end
13
+ @atomic token. cancelled = true
14
+ notify (token. event)
15
+ return
16
+ end
17
+ function is_cancelled (token:: CancelToken ; must_force:: Bool = false )
18
+ if token. cancelled[]
19
+ if must_force && token. graceful[]
20
+ # If we're only responding to forced cancellation, ignore graceful cancellations
21
+ return false
22
+ end
23
+ return true
24
+ end
25
+ return false
26
+ end
27
+ Base. wait (token:: CancelToken ) = wait (token. event)
28
+ # TODO : Enable this for safety
29
+ # Serialization.serialize(io::AbstractSerializer, ::CancelToken) =
30
+ # throw(ConcurrencyViolationError("Cannot serialize a CancelToken"))
31
+
32
+ const DTASK_CANCEL_TOKEN = TaskLocalValue {Union{CancelToken,Nothing}} (()-> nothing )
33
+
34
+ function clone_cancel_token_remote (orig_token:: CancelToken , wid:: Integer )
35
+ remote_token = remotecall_fetch (wid) do
36
+ return poolset (CancelToken ())
37
+ end
38
+ errormonitor_tracked (" remote cancel_token communicator" , Threads. @spawn begin
39
+ wait (orig_token)
40
+ @dagdebug nothing :cancel " Cancelling remote token on worker $wid "
41
+ MemPool. access_ref (remote_token) do remote_token
42
+ cancel! (remote_token)
43
+ end
44
+ end )
45
+ end
46
+
47
+ # Global-level cancellation
48
+
1
49
"""
2
50
cancel!(task::DTask; force::Bool=false, halt_sch::Bool=false)
3
51
@@ -48,7 +96,7 @@ function _cancel!(state, tid, force, halt_sch)
48
96
for task in state. ready
49
97
tid != = nothing && task. id != tid && continue
50
98
@dagdebug tid :cancel " Cancelling ready task"
51
- state. cache[task] = InterruptException ()
99
+ state. cache[task] = DTaskFailedException (task, task, InterruptException () )
52
100
state. errored[task] = true
53
101
Sch. set_failed! (state, task)
54
102
end
@@ -58,7 +106,7 @@ function _cancel!(state, tid, force, halt_sch)
58
106
for task in keys (state. waiting)
59
107
tid != = nothing && task. id != tid && continue
60
108
@dagdebug tid :cancel " Cancelling waiting task"
61
- state. cache[task] = InterruptException ()
109
+ state. cache[task] = DTaskFailedException (task, task, InterruptException () )
62
110
state. errored[task] = true
63
111
Sch. set_failed! (state, task)
64
112
end
@@ -80,11 +128,11 @@ function _cancel!(state, tid, force, halt_sch)
80
128
Tf === typeof (Sch. eager_thunk) && continue
81
129
istaskdone (task) && continue
82
130
any_cancelled = true
83
- @dagdebug tid :cancel " Cancelling running task ($Tf )"
84
131
if force
85
132
@dagdebug tid :cancel " Interrupting running task ($Tf )"
86
133
Threads. @spawn Base. throwto (task, InterruptException ())
87
134
else
135
+ @dagdebug tid :cancel " Cancelling running task ($Tf )"
88
136
# Tell the processor to just drop this task
89
137
task_occupancy = task_spec[4 ]
90
138
time_util = task_spec[2 ]
@@ -93,6 +141,7 @@ function _cancel!(state, tid, force, halt_sch)
93
141
push! (istate. cancelled, tid)
94
142
to_proc = istate. proc
95
143
put! (istate. return_queue, (myid (), to_proc, tid, (InterruptException (), nothing )))
144
+ cancel! (istate. cancel_tokens[tid]; graceful= false )
96
145
end
97
146
end
98
147
end
0 commit comments