JuliaParallel / Dagger.jl

A framework for out-of-core and parallel execution
Other
640 stars 67 forks source link

[Distributed] Mixed env libuv error #324

Open krynju opened 2 years ago

krynju commented 2 years ago

Another one found when benchmarking, also pretty rare. For later, will probably be useful when looking at stability

@@@ STARTING CONFIG: julia -J /tmp/jlsysimage/sysimage.so -p3 -t4 scripts/dtable_innerjoin_unique.jl 500000000 10000000 10000 4
  Activating project at `/home/dtable`
      From worker 2:      Activating project at `/home/dtable`
      From worker 3:      Activating project at `/home/dtable`
      From worker 4:      Activating project at `/home/dtable`
@@@ TABLESIZE:       8000.0 MB
@@@ SAVING TO:       results/dtable_bench1640877700.csv
@@@ STARTED:         innerjoin_r_unique : 2021-12-30T15:21:48.174
      From worker 3:    ┌ Error: Error on 3 while connecting to peer 2, exiting
      From worker 3:    │   exception =
      From worker 3:    │    IOError: connect: address not available (EADDRNOTAVAIL)
      From worker 3:    │    Stacktrace:
      From worker 3:    │      [1] uv_error
      From worker 3:    │        @ ./libuv.jl:97 [inlined]
      From worker 3:    │      [2] connect!(sock::Sockets.TCPSocket, host::Sockets.IPv4, port::UInt16)
      From worker 3:    │        @ Sockets /usr/local/julia/share/julia/stdlib/v1.8/Sockets/src/Sockets.jl:505
      From worker 3:    │      [3] connect
      From worker 3:    │        @ /usr/local/julia/share/julia/stdlib/v1.8/Sockets/src/Sockets.jl:563 [inlined]
      From worker 3:    │      [4] connect_to_worker(host::String, port::Int64)
      From worker 3:    │        @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/managers.jl:651
      From worker 3:    │      [5] connect_w2w(pid::Int64, config::WorkerConfig)
      From worker 3:    │        @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/managers.jl:599
      From worker 3:    │      [6] connect(manager::Distributed.DefaultClusterManager, pid::Int64, config::WorkerConfig)
      From worker 3:    │        @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/managers.jl:531
      From worker 3:    │      [7] connect_to_peer(manager::Distributed.DefaultClusterManager, rpid::Int64, wconfig::WorkerConfig)
      From worker 3:    │        @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/process_messages.jl:355
      From worker 3:    │      [8] (::Distributed.var"#127#129"{Int64, WorkerConfig})()
      From worker 3:    │        @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/process_messages.jl:342
      From worker 3:    │      [9] exec_conn_func(w::Distributed.Worker)
      From worker 3:    │        @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/cluster.jl:187
      From worker 3:    │     [10] (::Distributed.var"#19#22"{Distributed.Worker})()
      From worker 3:    │        @ Distributed ./task.jl:466
      From worker 3:    └ @ Distributed /tmp/juliabuild/julia/usr/share/julia/stdlib/v1.8/Distributed/src/process_messages.jl:362
Error in sending dynamic result:
IOError: stream is closed or unusable
Stacktrace:
  [1] check_open
    @ ./stream.jl:388 [inlined]
  [2] uv_write_async(s::Sockets.TCPSocket, p::Ptr{UInt8}, n::UInt64)
    @ Base ./stream.jl:1069
  [3] uv_write(s::Sockets.TCPSocket, p::Ptr{UInt8}, n::UInt64)
    @ Base ./stream.jl:1032
  [4] uv_write
    @ ./stream.jl:1028 [inlined]
  [5] flush(s::Sockets.TCPSocket)
    @ Base ./stream.jl:1124
  [6] send_msg_(w::Distributed.Worker, header::Distributed.MsgHeader, msg::Distributed.CallMsg{:call_fetch}, now::Bool)
    @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/messages.jl:187
  [7] send_msg
    @ /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/messages.jl:122 [inlined]
  [8] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:460
  [9] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any})
    @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:457
 [10] remotecall_fetch(::Function, ::Int64, ::Distributed.RRID, ::Vararg{Any}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:492
 [11] remotecall_fetch(::Function, ::Int64, ::Distributed.RRID, ::Vararg{Any})
    @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:492
 [12] call_on_owner(::Function, ::RemoteChannel{Channel{Any}}, ::Int64, ::Vararg{Any})
    @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:565
 [13] put!
    @ /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:722 [inlined]
 [14] macro expansion
    @ ~/.julia/packages/Dagger/JDbcB/src/sch/dynamic.jl:75 [inlined]
 [15] (::Dagger.Sch.var"#38#42"{Context, Dagger.Sch.ComputeState, Task, RemoteChannel{Channel{Any}}, RemoteChannel{Channel{Any}}})()
    @ Dagger.Sch ./task.jl:466
Error in enqueued work:
IOError: stream is closed or unusableError in enqueued work:
no process with id 3 existsError in eager scheduler:
TaskFailedException

    nested task error: no process with id 3 exists
    Stacktrace:
     [1] error(s::String)
       @ Base ./error.jl:33
     [2] worker_from_id(pg::Distributed.ProcessGroup, i::Int64)
       @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/cluster.jl:1129
     [3] worker_from_id
       @ /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/cluster.jl:1121 [inlined]
     [4] remote_do(::Function, ::Int64, ::Dagger.NoOpLog, ::Vararg{Any}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
       @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:557
     [5] remote_do(::Function, ::Int64, ::Dagger.NoOpLog, ::Vararg{Any})
       @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:557
     [6] (::Dagger.Sch.var"#117#119"{Context, Set{Dagger.Chunk}, Int64})()
       @ Dagger.Sch ./task.jl:466
Stacktrace:
 [1] sync_end(c::Channel{Any})
   @ Base ./task.jl:424
 [2] macro expansion
   @ ./task.jl:443 [inlined]
 [3] evict_all_chunks!(ctx::Context, to_evict::Set{Dagger.Chunk})
   @ Dagger.Sch ~/.julia/packages/Dagger/JDbcB/src/sch/Sch.jl:783
 [4] finish_task!(ctx::Context, state::Dagger.Sch.ComputeState, node::Thunk, thunk_failed::Bool)
   @ Dagger.Sch ~/.julia/packages/Dagger/JDbcB/src/sch/Sch.jl:778
 [5] (::Dagger.Sch.var"#90#96"{Context, Dagger.Sch.ComputeState, OSProc, NamedTuple{(:pressure, :loadavg, :threadtime, :transfer_rate), Tuple{UInt64, Tuple{Float64, Float64, Float64}, UInt64, UInt64}}, Dagger.Chunk{NamedTuple{(:a1, :a2, :a3, :a4, :a5), NTuple{5, SentinelArrays.ChainedVector{Int32, Vector{Int32}}}}, MemPool.DRef, Dagger.ThreadProc, AnyScope}, Int64, Dagger.ThreadProc, Int64})()
   @ Dagger.Sch ~/.julia/packages/Dagger/JDbcB/src/sch/Sch.jl:451
 [6] lock(f::Dagger.Sch.var"#90#96"{Context, Dagger.Sch.ComputeState, OSProc, NamedTuple{(:pressure, :loadavg, :threadtime, :transfer_rate), Tuple{UInt64, Tuple{Float64, Float64, Float64}, UInt64, UInt64}}, Dagger.Chunk{NamedTuple{(:a1, :a2, :a3, :a4, :a5), NTuple{5, SentinelArrays.ChainedVector{Int32, Vector{Int32}}}}, MemPool.DRef, Dagger.ThreadProc, AnyScope}, Int64, Dagger.ThreadProc, Int64}, l::ReentrantLock)
   @ Base ./lock.jl:183
 [7] compute_dag(ctx::Context, d::Thunk; options::Dagger.Sch.SchedulerOptions)
   @ Dagger.Sch ~/.julia/packages/Dagger/JDbcB/src/sch/Sch.jl:407
 [8] compute(ctx::Context, d::Thunk; options::Dagger.Sch.SchedulerOptions)
   @ Dagger ~/.julia/packages/Dagger/JDbcB/src/compute.jl:31
 [9] (::Dagger.Sch.var"#61#62"{Context})()
   @ Dagger.Sch ./task.jl:466
Worker 3 terminated.
┌ Warning: Worker 3 died, rescheduling work
└ @ Dagger.Sch ~/.julia/packages/Dagger/JDbcB/src/sch/Sch.jl:411
UNHANDLED TASK ERROR: IOError: read: connection reset by peer (ECONNRESET)
Stacktrace:
  [1] wait_readnb(x::Sockets.TCPSocket, nb::Int64)
    @ Base ./stream.jl:410
  [2] (::Base.var"#wait_locked#660")(s::Sockets.TCPSocket, buf::IOBuffer, nb::Int64)
    @ Base ./stream.jl:944
  [3] unsafe_read(s::Sockets.TCPSocket, p::Ptr{UInt8}, nb::UInt64)
    @ Base ./stream.jl:950
  [4] unsafe_read
    @ ./io.jl:751 [inlined]
  [5] unsafe_read(s::Sockets.TCPSocket, p::Base.RefValue{NTuple{4, Int64}}, n::Int64)
    @ Base ./io.jl:750
  [6] read!
    @ ./io.jl:752 [inlined]
  [7] deserialize_hdr_raw
    @ /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/messages.jl:167 [inlined]
  [8] message_handler_loop(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool)
    @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/process_messages.jl:165
  [9] process_tcp_streams(r_stream::Sockets.TCPSocket, w_stream::Sockets.TCPSocket, incoming::Bool)
    @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/process_messages.jl:126
 [10] (::Distributed.var"#109#110"{Sockets.TCPSocket, Sockets.TCPSocket, Bool})()
    @ Distributed ./task.jl:466
Error in eager listener:
KeyError: key 29 not found
Stacktrace:
  [1] getindex
    @ ./dict.jl:498 [inlined]
  [2] #57
    @ ~/.julia/packages/Dagger/JDbcB/src/sch/dynamic.jl:189 [inlined]
  [3] map
    @ ./tuple.jl:223 [inlined]
  [4] _add_thunk!(ctx::Context, state::Dagger.Sch.ComputeState, task::Task, tid::Int64, ::Tuple{Dagger.var"#593#595", Tuple{Dagger.Sch.ThunkID, DTable, Base.Pairs{Symbol, Any, Tuple{Symbol, Symbol}, NamedTuple{(:on, :r_unique), Tuple{Symbol, Bool}}}}, Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}}, Dagger.ThunkFuture, MemPool.DRef})
    @ Dagger.Sch ~/.julia/packages/Dagger/JDbcB/src/sch/dynamic.jl:189
  [5] #invokelatest#2
    @ ./essentials.jl:731 [inlined]
  [6] invokelatest
    @ ./essentials.jl:729 [inlined]
  [7] #39
    @ ~/.julia/packages/Dagger/JDbcB/src/sch/dynamic.jl:69 [inlined]
  [8] lock(f::Dagger.Sch.var"#39#43"{Tuple{Dagger.var"#593#595", Tuple{Dagger.Sch.ThunkID, DTable, Base.Pairs{Symbol, Any, Tuple{Symbol, Symbol}, NamedTuple{(:on, :r_unique), Tuple{Symbol, Bool}}}}, Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}}, Dagger.ThunkFuture, MemPool.DRef}, typeof(Dagger.Sch._add_thunk!), Context, Dagger.Sch.ComputeState, Task}, l::ReentrantLock)
    @ Base ./lock.jl:183
  [9] macro expansion
    @ ~/.julia/packages/Dagger/JDbcB/src/sch/dynamic.jl:68 [inlined]
 [10] (::Dagger.Sch.var"#38#42"{Context, Dagger.Sch.ComputeState, Task, RemoteChannel{Channel{Any}}, RemoteChannel{Channel{Any}}})()
    @ Dagger.Sch ./task.jl:466
Stacktrace:
 [1] exec!(::Function, ::Dagger.Sch.SchedulerHandle, ::Function, ::Vararg{Any})
   @ Dagger.Sch ~/.julia/packages/Dagger/JDbcB/src/sch/dynamic.jl:112
 [2] add_thunk!(::Function, ::Dagger.Sch.SchedulerHandle, ::Dagger.Sch.ThunkID, ::Vararg{Any}; future::Dagger.ThunkFuture, ref::MemPool.DRef, kwargs::Base.Pairs            done
{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
   @ Dagger.Sch ~/.julia/packages/Dagger/JDbcB/src/sch/dynamic.jl:185
 [3] eager_thunk()
   @ Dagger.Sch ~/.julia/packages/Dagger/JDbcB/src/sch/eager.jl:87
 [4] (::Dagger.var"#80#81"{typeof(Dagger.Sch.eager_thunk), Tuple{}, NamedTuple{(:sch_uid, :sch_handle, :processor, :utilization), Tuple{UInt64, Dagger.Sch.SchedulerHandle, Dagger.ThreadProc, UInt64}}})()
   @ Dagger ~/.julia/packages/Dagger/JDbcB/src/processor.jl:165
      From worker 4:    ProcessExitedException(3)
      From worker 4:    Stacktrace:
      From worker 4:      [1] worker_from_id(pg::Distributed.ProcessGroup, i::Int64)
      From worker 4:        @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/cluster.jl:1124
      From worker 4:      [2] worker_from_id
      From worker 4:        @ /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/cluster.jl:1121 [inlined]
      From worker 4:      [3] #remotecall_fetch#168
      From worker 4:        @ /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:492 [inlined]
      From worker 4:      [4] remotecall_fetch
      From worker 4:        @ /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/remotecall.jl:492 [inlined]
      From worker 4:      [5] #68
      From worker 4:        @ ~/.julia/packages/Dagger/JDbcB/src/processor.jl:98 [inlined]
      From worker 4:      [6] get!(default::Dagger.var"#68#69"{Int64}, h::Dict{Int64, Vector{Dagger.Processor}}, key::Int64)
      From worker 4:        @ Base ./dict.jl:481
      From worker 4:      [7] OSProc
      From worker 4:        @ ~/.julia/packages/Dagger/JDbcB/src/processor.jl:97 [inlined]
      From worker 4:      [8] evict_chunks!(log_sink::Dagger.NoOpLog, chunks::Set{Dagger.Chunk})
      From worker 4:        @ Dagger.Sch ~/.julia/packages/Dagger/JDbcB/src/sch/Sch.jl:789
      From worker 4:      [9] invokelatest(::Any, ::Any, ::Vararg{Any}; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
      From worker 4:        @ Base ./essentials.jl:731
      From worker 4:     [10] invokelatest(::Any, ::Any, ::Vararg{Any})
      From worker 4:        @ Base ./essentials.jl:729
      From worker 4:     [11] (::Distributed.var"#124#126"{Distributed.RemoteDoMsg})()
      From worker 4:        @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/process_messages.jl:301
      From worker 4:     [12] run_work_thunk(thunk::Distributed.var"#124#126"{Distributed.RemoteDoMsg}, print_error::Bool)
      From worker 4:        @ Distributed /usr/local/julia/share/julia/stdlib/v1.8/Distributed/src/process_messages.jl:63
      From worker 4:     [13] (::Distributed.var"#123#125"{Distributed.RemoteDoMsg})()
jpsamaroo commented 2 years ago

It seems like various failures when using Distributed will cause a cascade of failures in Dagger. We should try to wrap every call into Distributed APIs with a "rollback" function, which will attempt to recover from failure, or at least gracefully kill the scheduler and all hanging fetch/wait calls.