Skip to content
2 changes: 2 additions & 0 deletions src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ include("scopes.jl")
include("utils/scopes.jl")
include("chunks.jl")
include("utils/signature.jl")
include("thunkid.jl")
include("utils/lfucache.jl")
include("options.jl")
include("dtask.jl")
include("cancellation.jl")
Expand Down
5 changes: 3 additions & 2 deletions src/cancellation.jl
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ function _cancel!(state, tid, force, graceful, halt_sch)
wids = unique(map(root_worker_id, values(state.running_on)))
for wid in wids
remotecall_fetch(wid, tid, sch_uid, force) do _tid, sch_uid, force
Dagger.Sch.proc_states(sch_uid) do states
for (proc, state) in states
states = Dagger.Sch.proc_states(sch_uid)
MemPool.lock_read(states.lock) do
for (proc, state) in states.dict
istate = state.state
any_cancelled = false
@lock istate.queue begin
Expand Down
22 changes: 10 additions & 12 deletions src/compute.jl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ end
Find the set of direct dependents for each task.
"""
function dependents(node::Thunk)
deps = Dict{Union{Thunk,Chunk}, Set{Thunk}}()
deps = Dict{Thunk, Set{Thunk}}()
visited = Set{Thunk}()
to_visit = Set{Thunk}()
push!(to_visit, node)
Expand All @@ -58,13 +58,11 @@ function dependents(node::Thunk)
if !haskey(deps, next)
deps[next] = Set{Thunk}()
end
for inp in next.options.syncdeps
if istask(inp) || (inp isa Chunk)
s = get!(()->Set{Thunk}(), deps, inp)
push!(s, next)
if istask(inp) && !(inp in visited)
push!(to_visit, inp)
end
for inp in Iterators.map(syncdep->unwrap_weak_checked(something(syncdep.thunk)), next.options.syncdeps)
s = get!(()->Set{Thunk}(), deps, inp)
push!(s, next)
if istask(inp) && !(inp in visited)
push!(to_visit, inp)
end
end
push!(visited, next)
Expand All @@ -73,14 +71,14 @@ function dependents(node::Thunk)
end

"""
noffspring(dpents::Dict{Union{Thunk,Chunk}, Set{Thunk}}) -> Dict{Thunk, Int}
noffspring(dpents::Dict{Thunk, Set{Thunk}}) -> Dict{Thunk, Int}

Recursively find the number of tasks dependent on each task in the DAG.
Takes a Dict as returned by [`dependents`](@ref).
"""
function noffspring(dpents::Dict{Union{Thunk,Chunk}, Set{Thunk}})
function noffspring(dpents::Dict{Thunk, Set{Thunk}})
noff = Dict{Thunk,Int}()
to_visit = collect(filter(istask, keys(dpents)))
to_visit = collect(keys(dpents))
while !isempty(to_visit)
next = popfirst!(to_visit)
haskey(noff, next) && continue
Expand Down Expand Up @@ -126,7 +124,7 @@ function order(node::Thunk, ndeps)
haskey(output, next) && continue
s += 1
output[next] = s
parents = collect(filter(istask, next.options.syncdeps))
parents = collect(Iterators.map(syncdep->unwrap_weak_checked(something(syncdep.thunk)), next.options.syncdeps))
if !isempty(parents)
# If parents is empty, sort! should be a no-op, but raises an ambiguity error
# when InlineStrings.jl is loaded (at least, version 1.1.0), because InlineStrings
Expand Down
45 changes: 29 additions & 16 deletions src/datadeps.jl
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ function _get_write_deps!(state::DataDepsState{DataDepsAliasingState}, ainfo::Ab
other_task, other_write_num = other_task_write_num
write_num == other_write_num && continue
@dagdebug nothing :spawn_datadeps "Sync with writer via $ainfo -> $other_ainfo"
push!(syncdeps, other_task)
push!(syncdeps, ThunkSyncdep(other_task))
end
end
function _get_read_deps!(state::DataDepsState{DataDepsAliasingState}, ainfo::AbstractAliasing, task, write_num, syncdeps)
Expand All @@ -408,7 +408,7 @@ function _get_read_deps!(state::DataDepsState{DataDepsAliasingState}, ainfo::Abs
for (other_task, other_write_num) in other_tasks
write_num == other_write_num && continue
@dagdebug nothing :spawn_datadeps "Sync with reader via $ainfo -> $other_ainfo"
push!(syncdeps, other_task)
push!(syncdeps, ThunkSyncdep(other_task))
end
end
end
Expand All @@ -427,14 +427,14 @@ function _get_write_deps!(state::DataDepsState{DataDepsNonAliasingState}, arg, t
if other_task_write_num !== nothing
other_task, other_write_num = other_task_write_num
if write_num != other_write_num
push!(syncdeps, other_task)
push!(syncdeps, ThunkSyncdep(other_task))
end
end
end
function _get_read_deps!(state::DataDepsState{DataDepsNonAliasingState}, arg, task, write_num, syncdeps)
for (other_task, other_write_num) in state.alias_state.args_readers[arg]
if write_num != other_write_num
push!(syncdeps, other_task)
push!(syncdeps, ThunkSyncdep(other_task))
end
end
end
Expand Down Expand Up @@ -590,6 +590,7 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
write_num = 1
proc_idx = 1
pressures = Dict{Processor,Int}()
proc_to_scope_lfu = BasicLFUCache{Processor,AbstractScope}(1024)
for (spec, task) in queue.seen_tasks[task_order]
# Populate all task dependencies
populate_task_info!(state, spec, task)
Expand Down Expand Up @@ -723,9 +724,20 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
end
@assert our_proc in all_procs
our_space = only(memory_spaces(our_proc))
our_procs = filter(proc->proc in all_procs, collect(processors(our_space)))
task_scope = @something(spec.options.scope, AnyScope())
our_scope = constrain(UnionScope(map(ExactScope, our_procs)...), task_scope)

# Find the scope for this task (and its copies)
task_scope = @something(spec.options.compute_scope, spec.options.scope, DefaultScope())
if task_scope == scope
# Optimize for the common case, cache the proc=>scope mapping
our_scope = get!(proc_to_scope_lfu, our_proc) do
our_procs = filter(proc->proc in all_procs, collect(processors(our_space)))
return constrain(UnionScope(map(ExactScope, our_procs)...), scope)
end
else
# Use the provided scope and constrain it to the available processors
our_procs = filter(proc->proc in all_procs, collect(processors(our_space)))
our_scope = constrain(UnionScope(map(ExactScope, our_procs)...), task_scope)
end
if our_scope isa InvalidScope
throw(Sch.SchedulingException("Scopes are not compatible: $(our_scope.x), $(our_scope.y)"))
end
Expand Down Expand Up @@ -769,10 +781,10 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
generate_slot!(state, data_space, arg)
end
copy_to_scope = our_scope
copy_to_syncdeps = Set{Any}()
copy_to_syncdeps = Set{ThunkSyncdep}()
get_write_deps!(state, ainfo, task, write_num, copy_to_syncdeps)
@dagdebug nothing :spawn_datadeps "($(repr(value(f))))[$idx][$dep_mod] $(length(copy_to_syncdeps)) syncdeps"
copy_to = Dagger.@spawn scope=copy_to_scope syncdeps=copy_to_syncdeps meta=true Dagger.move!(dep_mod, our_space, data_space, arg_remote, arg_local)
copy_to = Dagger.@spawn scope=copy_to_scope exec_scope=copy_to_scope syncdeps=copy_to_syncdeps meta=true Dagger.move!(dep_mod, our_space, data_space, arg_remote, arg_local)
add_writer!(state, ainfo, copy_to, write_num)

astate.data_locality[ainfo] = our_space
Expand All @@ -790,10 +802,10 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
generate_slot!(state, data_space, arg)
end
copy_to_scope = our_scope
copy_to_syncdeps = Set{Any}()
copy_to_syncdeps = Set{ThunkSyncdep}()
get_write_deps!(state, arg, task, write_num, copy_to_syncdeps)
@dagdebug nothing :spawn_datadeps "($(repr(value(f))))[$idx] $(length(copy_to_syncdeps)) syncdeps"
copy_to = Dagger.@spawn scope=copy_to_scope syncdeps=copy_to_syncdeps Dagger.move!(identity, our_space, data_space, arg_remote, arg_local)
copy_to = Dagger.@spawn scope=copy_to_scope exec_scope=copy_to_scope syncdeps=copy_to_syncdeps Dagger.move!(identity, our_space, data_space, arg_remote, arg_local)
add_writer!(state, arg, copy_to, write_num)

astate.data_locality[arg] = our_space
Expand All @@ -820,7 +832,7 @@ function distribute_tasks!(queue::DataDepsTaskQueue)

# Calculate this task's syncdeps
if spec.options.syncdeps === nothing
spec.options.syncdeps = Set{Any}()
spec.options.syncdeps = Set{ThunkSyncdep}()
end
syncdeps = spec.options.syncdeps
for (idx, (_, arg)) in enumerate(task_args)
Expand Down Expand Up @@ -853,6 +865,7 @@ function distribute_tasks!(queue::DataDepsTaskQueue)

# Launch user's task
spec.options.scope = our_scope
spec.options.exec_scope = our_scope
enqueue!(upper_queue, spec=>task)

# Update read/write tracking for arguments
Expand Down Expand Up @@ -947,10 +960,10 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
@assert arg_remote !== arg_local
data_local_proc = first(processors(data_local_space))
copy_from_scope = UnionScope(map(ExactScope, collect(processors(data_local_space)))...)
copy_from_syncdeps = Set()
copy_from_syncdeps = Set{ThunkSyncdep}()
get_write_deps!(state, ainfo, nothing, write_num, copy_from_syncdeps)
@dagdebug nothing :spawn_datadeps "$(length(copy_from_syncdeps)) syncdeps"
copy_from = Dagger.@spawn scope=copy_from_scope syncdeps=copy_from_syncdeps meta=true Dagger.move!(dep_mod, data_local_space, data_remote_space, arg_local, arg_remote)
copy_from = Dagger.@spawn scope=copy_from_scope exec_scope=copy_from_scope syncdeps=copy_from_syncdeps meta=true Dagger.move!(dep_mod, data_local_space, data_remote_space, arg_local, arg_remote)
else
@dagdebug nothing :spawn_datadeps "[$dep_mod] Skipped copy-from (local): $data_remote_space"
end
Expand Down Expand Up @@ -980,10 +993,10 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
@assert arg_remote !== arg_local
data_local_proc = first(processors(data_local_space))
copy_from_scope = ExactScope(data_local_proc)
copy_from_syncdeps = Set()
copy_from_syncdeps = Set{ThunkSyncdep}()
get_write_deps!(state, arg, nothing, write_num, copy_from_syncdeps)
@dagdebug nothing :spawn_datadeps "$(length(copy_from_syncdeps)) syncdeps"
copy_from = Dagger.@spawn scope=copy_from_scope syncdeps=copy_from_syncdeps meta=true Dagger.move!(identity, data_local_space, data_remote_space, arg_local, arg_remote)
copy_from = Dagger.@spawn scope=copy_from_scope exec_scope=copy_from_scope syncdeps=copy_from_syncdeps meta=true Dagger.move!(identity, data_local_space, data_remote_space, arg_local, arg_remote)
else
@dagdebug nothing :spawn_datadeps "Skipped copy-from (local): $data_remote_space"
end
Expand Down
9 changes: 6 additions & 3 deletions src/dtask.jl
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,15 @@ function Base.show(io::IO, t::DTask)
print(io, "DTask ($status)")
end
istask(t::DTask) = true
function Base.convert(::Type{ThunkSyncdep}, task::Dagger.DTask)
return ThunkSyncdep(ThunkID(task.uid, isdefined(task, :thunk_ref) ? task.thunk_ref : nothing))
end
ThunkSyncdep(task::DTask) = convert(ThunkSyncdep, task)

const EAGER_ID_COUNTER = Threads.Atomic{UInt64}(1)
function eager_next_id()
if myid() == 1
Threads.atomic_add!(EAGER_ID_COUNTER, one(UInt64))
return UInt64(next_id())
else
remotecall_fetch(eager_next_id, 1)
return remotecall_fetch(eager_next_id, 1)::UInt64
end
end
26 changes: 19 additions & 7 deletions src/memory-spaces.jl
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ equivalent_structure(x::AliasingWrapper, y::AliasingWrapper) =
x.hash == y.hash || equivalent_structure(x.inner, y.inner)
Base.hash(x::AliasingWrapper, h::UInt64) = hash(x.hash, h)
Base.isequal(x::AliasingWrapper, y::AliasingWrapper) = x.hash == y.hash
Base.:(==)(x::AliasingWrapper, y::AliasingWrapper) = x.hash == y.hash
will_alias(x::AliasingWrapper, y::AliasingWrapper) =
will_alias(x.inner, y.inner)

Expand Down Expand Up @@ -225,6 +226,8 @@ struct ContiguousAliasing{S} <: AbstractAliasing
span::MemorySpan{S}
end
memory_spans(a::ContiguousAliasing{S}) where S = MemorySpan{S}[a.span]
will_alias(x::ContiguousAliasing{S}, y::ContiguousAliasing{S}) where S =
will_alias(x.span, y.span)
struct IteratedAliasing{T} <: AbstractAliasing
x::T
end
Expand Down Expand Up @@ -284,20 +287,29 @@ function aliasing(x::SubArray{T,N,A}) where {T,N,A<:Array}
return UnknownAliasing()
end
end
function will_alias(x::StridedAliasing{T,N,S}, y::StridedAliasing{T,N,S}) where {T,N,S}
function will_alias(x::StridedAliasing{T1,N1,S1}, y::StridedAliasing{T2,N2,S2}) where {T1,T2,N1,N2,S1,S2}
# Check if the base pointers are the same
# FIXME: Conservatively incorrect via `unsafe_wrap` and friends
if x.base_ptr != y.base_ptr
# FIXME: Conservatively incorrect via `unsafe_wrap` and friends
return false
end

for dim in 1:N
if ((x.base_inds[dim].stop) < (y.base_inds[dim].start) || (y.base_inds[dim].stop) < (x.base_inds[dim].start))
return false
if T1 === T2 && N1 == N2 && may_alias(x.base_ptr.space, y.base_ptr.space)
# Check if the base indices overlap
for dim in 1:N1
if ((x.base_inds[dim].stop) < (y.base_inds[dim].start) || (y.base_inds[dim].stop) < (x.base_inds[dim].start))
return false
end
end
return true
else
return invoke(will_alias, Tuple{Any, Any}, x, y)
end

return true
end
will_alias(x::StridedAliasing{T,N,S}, y::ContiguousAliasing{S}) where {T,N,S} =
x.base_ptr == y.span.ptr
will_alias(x::ContiguousAliasing{S}, y::StridedAliasing{T,N,S}) where {T,N,S} =
will_alias(y, x)
# FIXME: Upgrade Contiguous/StridedAlising to same number of dims

struct TriangularAliasing{T,S} <: AbstractAliasing
Expand Down
38 changes: 13 additions & 25 deletions src/options.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Stores per-task options to be passed to the scheduler.
- `processor::Processor`: The processor associated with this task's function. Generally ignored by the scheduler.
- `compute_scope::AbstractScope`: The execution scope of the task, which determines where the task can be scheduled and executed. `scope` is another name for this option.
- `result_scope::AbstractScope`: The data scope of the task's result, which determines where the task's result can be accessed from.
- `exec_scope::AbstractScope`: The execution scope of the task, which determines where the task can be scheduled and executed. Can be set to avoid computing the scope in the scheduler, when known.
- `single::Int=0`: (Deprecated) Force task onto worker with specified id. `0` disables this option.
- `proclist=nothing`: (Deprecated) Force task to use one or more processors that are instances/subtypes of a contained type. Alternatively, a function can be supplied, and the function will be called with a processor as the sole argument and should return a `Bool` result to indicate whether or not to use the given processor. `nothing` enables all default processors.
- `get_result::Bool=false`: Whether the worker should store the result directly (`true`) or as a `Chunk` (`false`)
Expand Down Expand Up @@ -37,13 +38,14 @@ Base.@kwdef mutable struct Options
scope::Union{AbstractScope,Nothing} = nothing
compute_scope::Union{AbstractScope,Nothing} = scope
result_scope::Union{AbstractScope,Nothing} = nothing
exec_scope::Union{AbstractScope,Nothing} = nothing
single::Union{Int,Nothing} = nothing
proclist = nothing

get_result::Union{Bool,Nothing} = nothing
meta::Union{Bool,Nothing} = nothing

syncdeps::Union{Set{Any},Nothing} = nothing
syncdeps::Union{Set{ThunkSyncdep},Nothing} = nothing

time_util::Union{Dict{Type,Any},Nothing} = nothing
alloc_util::Union{Dict{Type,UInt64},Nothing} = nothing
Expand Down Expand Up @@ -101,6 +103,15 @@ _set_option!(options::Base.Pairs, field, value) = error("Cannot set option in Ba
end
return ex
end
function Base.setproperty!(options::Options, field::Symbol, value)
if field == :scope || field == :compute_scope || field == :result_scope
# If the scope is changed, we need to clear the exec_scope as it is no longer valid
setfield!(options, :exec_scope, nothing)
end
fidx = findfirst(==(field), fieldnames(Options))
ftype = fieldtypes(Options)[fidx]
return setfield!(options, field, convert(ftype, value))
end

"""
populate_defaults!(opts::Options, sig::Vector{DataType}) -> Options
Expand All @@ -113,6 +124,7 @@ function populate_defaults!(opts::Options, sig)
maybe_default!(opts, Val{:processor}(), sig)
maybe_default!(opts, Val{:compute_scope}(), sig)
maybe_default!(opts, Val{:result_scope}(), sig)
maybe_default!(opts, Val{:exec_scope}(), sig)
maybe_default!(opts, Val{:single}(), sig)
maybe_default!(opts, Val{:proclist}(), sig)
maybe_default!(opts, Val{:get_result}(), sig)
Expand Down Expand Up @@ -143,30 +155,6 @@ function maybe_default!(opts::Options, ::Val{opt}, sig::Signature) where opt
end
end

struct BasicLFUCache{K,V}
cache::Dict{K,V}
freq::Dict{K,Int}
max_size::Int

BasicLFUCache{K,V}(max_size::Int) where {K,V} = new(Dict{K,V}(), Dict{K,Int}(), max_size)
end
function Base.get!(f, cache::BasicLFUCache{K,V}, key::K) where {K,V}
if haskey(cache.cache, key)
cache.freq[key] += 1
return cache.cache[key]
end
val = f()::V
cache.cache[key] = val
cache.freq[key] = 1
if length(cache.cache) > cache.max_size
# Find the least frequently used key
_, lfu_key::K = findmin(cache.freq)
delete!(cache.cache, lfu_key)
delete!(cache.freq, lfu_key)
end
return val
end

const SIGNATURE_DEFAULT_CACHE = TaskLocalValue{BasicLFUCache{Tuple{UInt,Symbol},Any}}(()->BasicLFUCache{Tuple{UInt,Symbol},Any}(256))

# SchedulerOptions integration
Expand Down
4 changes: 2 additions & 2 deletions src/queue.jl
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ end
function _add_prev_deps!(queue::InOrderTaskQueue, spec::DTaskSpec)
# Add previously-enqueued task(s) to this task's syncdeps
opts = spec.options
syncdeps = opts.syncdeps = @something(opts.syncdeps, Set())
syncdeps = opts.syncdeps = @something(opts.syncdeps, Set{ThunkSyncdep}())
for task in queue.prev_tasks
push!(syncdeps, task)
push!(syncdeps, ThunkSyncdep(task))
end
end
function enqueue!(queue::InOrderTaskQueue, spec::Pair{DTaskSpec,DTask})
Expand Down
Loading