JuliaParallel / Dagger.jl

A framework for out-of-core and parallel execution
Other
621 stars 67 forks source link

BUG `@spawn` macro does not support do syntax #479

Closed schlichtanders closed 2 months ago

schlichtanders commented 5 months ago

I just tried to workaround concurrancy bugs by introducing locks, but apparently I am doing something wrong

using Dagger: @spawn
using Distributed
# add two further julia processes which could run on other machines
addprocs(2, exeflags="--threads=2")
# Distributed.@everywhere execute code on all machines
@everywhere using Dagger, DataFrames, OnlineStats
# Dagger uses both Threads and Machines as processes
Dagger.all_processors()

# let's distributes some calculations
aggregators = [Mean, Variance, Extrema] 
df = DataFrame()
# @sync waits until all enclosed calls to @spawn are ready
@sync for i in 1:1000
    data = @spawn rand(10000)
    lck = @shard ReentrantLock()  # needed for multi-threading access to data
    for agg in aggregators
        res = @spawn lock(lck) do
            fit!(agg(), data)
        end
        push!(df, (i=i, aggregator=nameof(agg), result=res))
    end
end

df.result .= fetch.(df.result)   

This fails with

ERROR: MethodError: objects of type Nothing are not callable
Stacktrace:
[1] macro expansion
@ REPL[13]:400 [inlined]
[2] macro expansion
@ task.jl:479 [inlined]
[3] top-level scope
@ REPL[13]:1

Looking at dump vs @macroexpand, it is apparent that @spawn does not behave normally here.

ulia> @macroexpand(
@spawn lock(lck) do
hi
end)
:(begin
#= /home/ssahm/.julia/packages/Dagger/Tx54v/src/thunk.jl:400 =#
let var"#39#args" = (lck,)
#= /home/ssahm/.julia/packages/Dagger/Tx54v/src/thunk.jl:401 =#
var"#40###result#231" = (Dagger.spawn)(lock, (Dagger.Options)(; ), var"#39#args"...; )
#= /home/ssahm/.julia/packages/Dagger/Tx54v/src/thunk.jl:402 =#
if $(Expr(:islocal, Symbol("##sync#48")))
#= /home/ssahm/.julia/packages/Dagger/Tx54v/src/thunk.jl:403 =#
Dagger.put!(var"##sync#48", Dagger.schedule(Dagger.Task((()->begin
#= /home/ssahm/.julia/packages/Dagger/Tx54v/src/thunk.jl:403 =#
Dagger.wait(var"#40###result#231")
end))))
end
#= /home/ssahm/.julia/packages/Dagger/Tx54v/src/thunk.jl:405 =#
var"#40###result#231"
end
end do
#= REPL[14]:3 =#
hi
end)

julia> dump(:(
@spawn lock(lck) do
hi
end))
Expr
head: Symbol macrocall
args: Array{Any}((3,))
1: Symbol @spawn
2: LineNumberNode
line: Int64 2
file: Symbol REPL[15]
3: Expr
head: Symbol do
args: Array{Any}((2,))
1: Expr
head: Symbol call
args: Array{Any}((2,))
1: Symbol lock
2: Symbol lck
2: Expr
head: Symbol ->
args: Array{Any}((2,))
1: Expr
head: Symbol tuple
args: Array{Any}((0,))
2: Expr
head: Symbol block
args: Array{Any}((2,))
1: LineNumberNode
2: Symbol hi

I am using Julia 1.10.1 and Dagger 0.18.8