JuliaParallel / Dagger.jl

A framework for out-of-core and parallel execution
Other
621 stars 67 forks source link

`@spawn` does not work with nested function call #480

Closed schlichtanders closed 2 months ago

schlichtanders commented 5 months ago

While trying to workaround https://github.com/JuliaParallel/Dagger.jl/issues/478

I came up with the following version which still fails, but for reasons related to @spawn in @spawn lock(() -> fit!(agg(), data), lck).

using Dagger: @spawn, @shard
using Distributed
# add two further julia processes which could run on other machines
addprocs(2, exeflags="--threads=2")
# Distributed.@everywhere execute code on all machines
@everywhere using Dagger, DataFrames, OnlineStats
# Dagger uses both Threads and Machines as processes
Dagger.all_processors()

# let's distributes some calculations
aggregators = [Mean, Variance, Extrema] 
df = DataFrame()
# @sync waits until all enclosed calls to @spawn are ready
@sync for i in 1:1000
    data = @spawn rand(10000)
    # This creates a lock per worker. If the task is run on
    # a worker, the correct lock is automatically picked up.
    # Needed for multi-threading access to data.
    lck = @shard ReentrantLock()
    for agg in aggregators
        res = @spawn lock(() -> fit!(agg(), data), lck)
        push!(df, (i=i, aggregator=nameof(agg), result=res))
    end
end

df.result .= fetch.(df.result)   

results in the following error

julia> df.result .= fetch.(df.result)
ERROR: ThunkFailedException:
Root Exception Type: RemoteException
Root Exception:
On worker 2:
MethodError: no method matching iterate(::Dagger.EagerThunk)

Closest candidates are:
iterate(::DataStructures.TrieIterator)
@ DataStructures ~/.julia/packages/DataStructures/t9DKl/src/trie.jl:112
iterate(::DataStructures.TrieIterator, ::Any)
@ DataStructures ~/.julia/packages/DataStructures/t9DKl/src/trie.jl:112
iterate(::Pkg.Types.Manifest, ::Int64)
@ Pkg ~/.julia/juliaup/julia-1.10.2+0.x64.linux.gnu/share/julia/stdlib/v1.10/Pkg/src/Types.jl:314
...

Stacktrace:
[1] fit!
@ ~/.julia/packages/OnlineStatsBase/FMY19/src/OnlineStatsBase.jl:137
[2] #3
@ ./REPL[8]:5
[3] lock
@ ./lock.jl:229
[4] #invokelatest#2
@ ./essentials.jl:892 [inlined]
[5] invokelatest
@ ./essentials.jl:889 [inlined]
[6] #41
@ ~/.julia/packages/Dagger/Tx54v/src/threadproc.jl:20
Stacktrace:
[1] wait
@ ./task.jl:352 [inlined]
[2] fetch
@ ./task.jl:372 [inlined]
[3] #execute!#40
@ ~/.julia/packages/Dagger/Tx54v/src/threadproc.jl:26
[4] execute!
@ ~/.julia/packages/Dagger/Tx54v/src/threadproc.jl:13
[5] #167
@ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1611 [inlined]
[6] #21
@ ~/.julia/packages/Dagger/Tx54v/src/options.jl:18 [inlined]
[7] #1
@ ~/.julia/packages/ScopedValues/Kvcrb/src/ScopedValues.jl:163
[8] with_logstate
@ ./logging.jl:515
[9] with_logger
@ ./logging.jl:627 [inlined]
[10] enter_scope
@ ~/.julia/packages/ScopedValues/Kvcrb/src/payloadlogger.jl:17 [inlined]
[11] with
@ ~/.julia/packages/ScopedValues/Kvcrb/src/ScopedValues.jl:162
[12] with_options
@ ~/.julia/packages/Dagger/Tx54v/src/options.jl:17
[13] do_task
@ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1609
[14] #143
@ ~/.julia/packages/Dagger/Tx54v/src/sch/Sch.jl:1302
This Thunk:  Thunk(id=9, lock(#3, Dagger.Shard(Dict{Dagger.Processor, Dagger.Chunk}(OSProc(1) => Dagger.Chunk{ReentrantLock, MemPool.DRef, OSProc, ProcessScope}(ReentrantLock, UnitDomain(), MemPool.DRef(1, 6, 0x0000000000000060), OSProc(1), ProcessScope: worker == 1, false), OSProc(2) => Dagger.Chunk{ReentrantLock, MemPool.DRef, OSProc, ProcessScope}(ReentrantLock, UnitDomain(), MemPool.DRef(2, 1, 0x0000000000000060), OSProc(2), ProcessScope: worker == 2, false), OSProc(3) => Dagger.Chunk{ReentrantLock, MemPool.DRef, OSProc, ProcessScope}(ReentrantLock, UnitDomain(), MemPool.DRef(3, 0, 0x0000000000000060), OSProc(3), ProcessScope: worker == 3, false)))))
Stacktrace:
[1] fetch(t::Dagger.ThunkFuture; proc::OSProc, raw::Bool)
@ Dagger ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:16
[2] fetch
@ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:11 [inlined]
[3] #fetch#73
@ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:58 [inlined]
[4] fetch
@ ~/.julia/packages/Dagger/Tx54v/src/eager_thunk.jl:54 [inlined]
[5] _broadcast_getindex_evalf
@ ./broadcast.jl:709 [inlined]
[6] _broadcast_getindex
@ ./broadcast.jl:682 [inlined]
[7] getindex
@ ./broadcast.jl:636 [inlined]
[8] copy
@ ./broadcast.jl:942 [inlined]
[9] materialize
@ ./broadcast.jl:903 [inlined]
[10] copyto!(lazydf::DataFrames.LazyNewColDataFrame{…}, bc::Base.Broadcast.Broadcasted{…})
@ DataFrames ~/.julia/packages/DataFrames/58MUJ/src/other/broadcasting.jl:207
[11] materialize!
@ ./broadcast.jl:914 [inlined]
[12] materialize!(dest::DataFrames.LazyNewColDataFrame{…}, bc::Base.Broadcast.Broadcasted{…})
@ Base.Broadcast ./broadcast.jl:911
[13] top-level scope
@ REPL[9]:1
Some type information was truncated. Use `show(err)` to see complete types.

This might relate to https://github.com/JuliaParallel/Dagger.jl/issues/357, but I am unsure.

jpsamaroo commented 5 months ago

This is not exactly a bug per-se, as we've always only unwrapped EagerThunk dependencies when passed as explicit arguments to Dagger.@spawn.

Yet, it is also desirable to allow something like this, where the EagerThunk is embedded within a closure. I have a branch where I've worked on this, but it never materialized since we need to both pull out such implicit arguments from the closure, but also put them back into the same closure, which I couldn't figure out how to do.

Anyway, I agree that it's worth doing this, but it will take some time to implement, and it's not my top priority. In the meantime, either use Dagger.spawn or instead pass it as an explicit argument.