JuliaParallel / Dagger.jl

A framework for out-of-core and parallel execution
Other
629 stars 67 forks source link

`Cannot serialize a Thunk` when processing an error with Distributed #430

Closed StevenWhitaker closed 11 months ago

StevenWhitaker commented 12 months ago

I get the error mentioned in the title with the following example. See also JuliaParallel/Dagger.jl#431 and JuliaParallel/DTables.jl#52, which may be related. I'm submitting this issue here because I'm guessing it's not specific to just DTables.jl (but I could be wrong).

Contents of mwe.jl:

using Distributed, DelimitedFiles
nworkers = 1
addprocs(nworkers - nprocs() + 1)

@everywhere using CSV, DTables, DataFrames

file = tempname() * ".csv"
writedlm(file, [1, 2])

# See issue #431 for the purpose of this next line.
DTable(x -> CSV.File(x), [file]; tabletype = DataFrame)

remotecall_fetch(2, file) do f
    d = DTable(x -> CSV.File(x), [f]; tabletype = DataFrame)
    getproperty(d, Symbol("1"))
end

rm(file)

Results:

julia> include("mwe.jl")
Error in eager scheduler:
ArgumentError: Cannot serialize a Thunk
Stacktrace:
  [1] serialize(io::Distributed.ClusterSerializer{Sockets.TCPSocket}, t::Dagger.Thunk)
    @ Dagger ~/.julia/packages/Dagger/xGAvM/src/thunk.jl:116
  [2] serialize_any(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, x::Any)
    @ Serialization ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:678
  [3] serialize(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, x::Any)
    @ Serialization ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:657
  [4] serialize_any(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, x::Any)
    @ Serialization ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:678
  [5] serialize(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, x::Any)
    @ Serialization ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:657
  [6] serialize_any(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, x::Any)
    @ Serialization ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:678
  [7] serialize
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:657 [inlined]
  [8] serialize(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, t::Tuple{Bool, Dagger.ThunkFailedException{RemoteException}})
    @ Serialization ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:205
  [9] serialize(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, t::Tuple{Distributed.RRID, Tuple{Bool, Dagger.ThunkFailedException{RemoteException}}, Int64})
    @ Serialization ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Serialization/src/Serialization.jl:205
 [10] serialize_msg(s::Distributed.ClusterSerializer{Sockets.TCPSocket}, o::Distributed.CallMsg{:call_fetch})
    @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/messages.jl:78
 [11] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
 [12] invokelatest
    @ ./essentials.jl:816 [inlined]
 [13] send_msg_(w::Distributed.Worker, header::Distributed.MsgHeader, msg::Distributed.CallMsg{:call_fetch}, now::Bool)
    @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/messages.jl:181
 [14] send_msg
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/messages.jl:122 [inlined]
 [15] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:460
 [16] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any})
    @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
 [17] remotecall_fetch(::Function, ::Int64, ::Distributed.RRID, ::Vararg{Any}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
 [18] remotecall_fetch(::Function, ::Int64, ::Distributed.RRID, ::Vararg{Any})
    @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
 [19] call_on_owner(::Function, ::Future, ::Tuple{Bool, Dagger.ThunkFailedException{RemoteException}}, ::Vararg{Any})
    @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:565
 [20] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:681 [inlined]
 [21] macro expansion
    @ ./lock.jl:267 [inlined]
 [22] put!(r::Future, v::Tuple{Bool, Dagger.ThunkFailedException{RemoteException}})
    @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:680
 [23] #put!#73
    @ ~/.julia/packages/Dagger/xGAvM/src/eager_thunk.jl:24 [inlined]
 [24] fill_registered_futures!(state::Dagger.Sch.ComputeState, node::Dagger.Thunk, failed::Bool)
    @ Dagger.Sch ~/.julia/packages/Dagger/xGAvM/src/sch/util.jl:37
 [25] finish_failed!(state::Dagger.Sch.ComputeState, thunk::Dagger.Thunk, origin::Dagger.Thunk)
    @ Dagger.Sch ~/.julia/packages/Dagger/xGAvM/src/sch/util.jl:149
 [26] set_failed!(state::Dagger.Sch.ComputeState, origin::Dagger.Thunk, thunk::Dagger.Thunk)
    @ Dagger.Sch ~/.julia/packages/Dagger/xGAvM/src/sch/util.jl:146
 [27] set_failed!
    @ ~/.julia/packages/Dagger/xGAvM/src/sch/util.jl:143 [inlined]
 [28] finish_task!(ctx::Dagger.Context, state::Dagger.Sch.ComputeState, node::Dagger.Thunk, thunk_failed::Bool)
    @ Dagger.Sch ~/.julia/packages/Dagger/xGAvM/src/sch/Sch.jl:898
 [29] (::Dagger.Sch.var"#91#92"{Dagger.Context, Dagger.Sch.ComputeState, Dagger.OSProc, NamedTuple{(:time_pressure, :storage_pressure, :storage_capacity, :loadavg, :threadtime, :gc_allocd, :transfer_rate), Tuple{UInt64, UInt64, UInt64, Tuple{Float64, Float64, Float64}, UInt64, Int64, UInt64}}, RemoteException, Int64, Dagger.ThreadProc, Int64})()
    @ Dagger.Sch ~/.julia/packages/Dagger/xGAvM/src/sch/Sch.jl:574
 [30] lock(f::Dagger.Sch.var"#91#92"{Dagger.Context, Dagger.Sch.ComputeState, Dagger.OSProc, NamedTuple{(:time_pressure, :storage_pressure, :storage_capacity, :loadavg, :threadtime, :gc_allocd, :transfer_rate), Tuple{UInt64, UInt64, UInt64, Tuple{Float64, Float64, Float64}, UInt64, Int64, UInt64}}, RemoteException, Int64, Dagger.ThreadProc, Int64}, l::ReentrantLock)
    @ Base ./lock.jl:229
 [31] scheduler_run(ctx::Dagger.Context, state::Dagger.Sch.ComputeState, d::Dagger.Thunk, options::Dagger.Sch.SchedulerOptions)
    @ Dagger.Sch ~/.julia/packages/Dagger/xGAvM/src/sch/Sch.jl:525
 [32] compute_dag(ctx::Dagger.Context, d::Dagger.Thunk; options::Dagger.Sch.SchedulerOptions)
    @ Dagger.Sch ~/.julia/packages/Dagger/xGAvM/src/sch/Sch.jl:449
 [33] compute_dag
    @ ~/.julia/packages/Dagger/xGAvM/src/sch/Sch.jl:414 [inlined]
 [34] compute(ctx::Dagger.Context, d::Dagger.Thunk; options::Dagger.Sch.SchedulerOptions)
    @ Dagger ~/.julia/packages/Dagger/xGAvM/src/compute.jl:23
 [35] compute
    @ ~/.julia/packages/Dagger/xGAvM/src/compute.jl:22 [inlined]
 [36] macro expansion
    @ ~/.julia/packages/Dagger/xGAvM/src/sch/eager.jl:28 [inlined]
 [37] (::Dagger.Sch.var"#50#51"{Dagger.Context})()
    @ Dagger.Sch ./threadingconstructs.jl:410
ERROR: LoadError: On worker 2:
SchedulingException (Scheduler exited)
Stacktrace:
  [1] #fetch#70
    @ ~/.julia/packages/Dagger/xGAvM/src/eager_thunk.jl:16
  [2] fetch
    @ ~/.julia/packages/Dagger/xGAvM/src/eager_thunk.jl:11 [inlined]
  [3] #fetch#75
    @ ~/.julia/packages/Dagger/xGAvM/src/eager_thunk.jl:58 [inlined]
  [4] fetch
    @ ~/.julia/packages/Dagger/xGAvM/src/eager_thunk.jl:54 [inlined]
  [5] DTableColumn
    @ ~/.julia/packages/DTables/bA4g3/src/table/dtable_column.jl:21
  [6] DTableColumn
    @ ~/.julia/packages/DTables/bA4g3/src/table/dtable_column.jl:35
  [7] DTableColumn
    @ ~/.julia/packages/DTables/bA4g3/src/table/dtable_column.jl:37 [inlined]
  [8] getproperty
    @ ~/.julia/packages/DTables/bA4g3/src/table/dtable.jl:291 [inlined]
  [9] JuliaParallel/DTables.jl#7
    @ ~/tmp/mwe.jl:15
 [10] #invokelatest#2
    @ ./essentials.jl:819
 [11] invokelatest
    @ ./essentials.jl:816
 [12] #110
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
 [13] run_work_thunk
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
 [14] macro expansion
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
 [15] #109
    @ ./task.jl:514
Stacktrace:
 [1] remotecall_fetch(f::Function, w::Distributed.Worker, args::String; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
 [2] remotecall_fetch(f::Function, w::Distributed.Worker, args::String)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
 [3] #remotecall_fetch#162
   @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [4] remotecall_fetch(f::Function, id::Int64, args::String)
   @ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492
 [5] top-level scope
   @ ~/tmp/mwe.jl:13
 [6] include(fname::String)
   @ Base.MainInclude ./client.jl:478
 [7] top-level scope
   @ REPL[1]:1
in expression starting at /home/steven/tmp/mwe.jl:13

(tmp) pkg> st
Status `~/tmp/Project.toml`
  [336ed68f] CSV v0.10.11
  [20c56dc6] DTables v0.4.1
  [a93c6f00] DataFrames v1.6.1
  [8bb1440f] DelimitedFiles v1.9.1
  [8ba89e20] Distributed

Note that if I change line 543 in src/sch/Sch.jl from thunk_failed = true to throw(res) I can see the actual error. (See JuliaParallel/DTables.jl#52 for the actual error.)

Let me know if I'm missing anything or if I'm doing something wrong.

jpsamaroo commented 12 months ago

Ahh, looks like the ThunkFailedException that we throw when reporting a failed task captures a scheduler-internal Thunk object, which it definitely should not be doing. I'll figure out a workaround. Thanks for reporting this!

bolognam commented 11 months ago

Julian, do you think https://github.com/JuliaParallel/Dagger.jl/issues/435 is related to this issue?

jpsamaroo commented 11 months ago

That appears to possibly be an unrelated Dagger bug - give me some time to investigate.