JuliaLang / Distributed.jl

Create and control multiple Julia processes remotely for distributed computing. Ships as a Julia stdlib.
https://docs.julialang.org/en/v1/stdlib/Distributed/
MIT License
23 stars 9 forks source link

Concurrency violation on interplay between Distributed and Base.Threads #73

Open jonas-schulze opened 3 years ago

jonas-schulze commented 3 years ago

I’m working on a distributed pipeline algorithm that uses several stages per worker process. IIRC tasks cannot hop between threads once they’ve been scheduled. Since I want my stages to potentially run in parallel, I tried to create non-sticky tasks by chaining each D.@spawnat with a T.@spawn. However, this setup keeps failing/crashing and I don’t understand why.

I boiled it down to a minimal example:

using Distributed, Base.Threads

const D = Distributed
const T = Threads

pids = addprocs(10)
wids = repeat(pids, inner=2)

conns = map(RemoteChannel, wids)
fst = first(conns)
lst = RemoteChannel()
push!(conns, lst)

@everywhere begin
    function stillepost(i, prev, next)
        message = take!(prev)
        put!(next, message)
        @info "Player $i done"
    end
end

players = []
for i in 1:length(wids)
    w = wids[i]
    c1 = conns[i]
    c2 = conns[i+1]
    p = D.@spawnat w fetch(T.@spawn stillepost(i, c1, c2))
    push!(players, p)
end

game = @async begin
    m1 = "gibberish"
    put!(fst, m1)
    m2 = take!(lst)
    @info "'$m1' turned into '$m2'; well done!"
end

wait.(players)
wait(game)

Player 2 fails with a concurrency violation:

julia> include("stillepost.jl")
[ Info: Player 1 done
ERROR: LoadError: On worker 2:
TaskFailedException:
concurrency violation detected
error at ./error.jl:33
concurrency_violation at ./condition.jl:8
assert_havelock at ./condition.jl:25 [inlined]
assert_havelock at ./condition.jl:48 [inlined]
assert_havelock at ./condition.jl:72 [inlined]
wait at ./condition.jl:102
wait_for_conn at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/cluster.jl:193
check_worker_state at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/cluster.jl:168
send_msg_ at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/messages.jl:176
send_msg at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/messages.jl:134 [inlined]
#remotecall_fetch#143 at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:389
remotecall_fetch at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:386
#remotecall_fetch#146 at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:421
remotecall_fetch at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:421
call_on_owner at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:494
put! at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/remotecall.jl:595 [inlined]
stillepost at /Users/jonas/.../stillepost.jl:18
JuliaLang/julia#3 at ./threadingconstructs.jl:169
wait at ./task.jl:267 [inlined]

Am I holding it wrong?

Julia Version 1.5.1
Commit 697e782ab8 (2020-08-25 20:08 UTC)
Platform Info:
  OS: macOS (x86_64-apple-darwin19.5.0)
  CPU: Intel(R) Core(TM) i5-8259U CPU @ 2.30GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-9.0.1 (ORCJIT, skylake)
Environment:
  JULIA_NUM_THREADS = 4
  JULIA_PROJECT = @.

See also this post on discourse. foobar_lv2 suggested this might be a bug in Distributed.

jonas-schulze commented 3 years ago

The error persists on version 1.5.2 and here is the output for

Julia Version 1.6.0-DEV.1029
Commit f26a8c352f (2020-09-23 23:37 UTC)

which is the current version of julia-nightly on Homebrew:

[ Info: Player 1 done
ERROR: LoadError: On worker 2:
TaskFailedException
Stacktrace:
 [1] wait
   @ ./task.jl:303 [inlined]
 [2] fetch
   @ ./task.jl:318 [inlined]
 [3] JuliaLang/julia#2
   @ /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/macros.jl:87
 [4] JuliaLang/julia#103
   @ /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:290
 [5] run_work_thunk
   @ /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:79
 [6] run_work_thunk
   @ /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/process_messages.jl:88
 [7] JuliaLang/julia#96
   @ ./task.jl:392

    nested task error:
Stacktrace:
  [1] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any, N} where N; kwargs::Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:394
  [2] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any, N} where N)
    @ Distributed /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:386
  [3] remotecall_fetch(::Function, ::Int64, ::Distributed.RRID, ::Vararg{Any, N} where N; kwargs::Base.Iterators.Pairs{Union{}, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
    @ Distributed /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:421
  [4] remotecall_fetch
    @ /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:421 [inlined]
  [5] call_on_owner
    @ /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:494 [inlined]
  [6] wait(r::Future)
    @ Distributed /Users/sabae/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.6/Distributed/src/remotecall.jl:515
  [7] _broadcast_getindex_evalf
    @ ./broadcast.jl:648 [inlined]
  [8] _broadcast_getindex
    @ ./broadcast.jl:621 [inlined]
  [9] getindex
    @ ./broadcast.jl:575 [inlined]
 [10] copyto_nonleaf!(dest::Vector{Future}, bc::Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1}, Tuple{Base.OneTo{Int64}}, typeof(wait), Tuple{Base.Broadcast.Extruded{Vector{Any}, Tuple{Bool}, Tuple{Int64}}}}, iter::Base.OneTo{Int64}, state::Int64, count::Int64)
    @ Base.Broadcast ./broadcast.jl:1026
 [11] copy
    @ ./broadcast.jl:880 [inlined]
 [12] materialize(bc::Base.Broadcast.Broadcasted{Base.Broadcast.DefaultArrayStyle{1}, Nothing, typeof(wait), Tuple{Vector{Any}}})
    @ Base.Broadcast ./broadcast.jl:837
 [13] top-level scope
    @ ~/.../stillepost.jl:44
in expression starting at /Users/jonas/.../stillepost.jl:44
JeffBezanson commented 3 years ago

Distributed does not yet support messages from threads other than 1. Should not be too hard to fix.

jonas-schulze commented 3 years ago

Indeed, if I funnel all the communication through thread 1, it works!

    # ...
    p = D.@spawnat w @sync begin
        funnel = Channel()
        T.@spawn stillepost(i, c1, funnel)
        stillepost(i+0.5, funnel, c2)
    end
    # ...

... but is this something we could hope for to be fixed before the next LTS is released?

Why is thread 1 so special? I am not at all familiar with the code in either of the two libraries, but if you could give me some pointers, I can give it a go. 🙂

JeffBezanson commented 3 years ago

Thread 1 isn't special, this is just something that hasn't been updated for threads yet. The Condition objects used to synchronize workers need to be replaced with Threads.Condition plus locking, or possibly Threads.Event instead.

jonas-schulze commented 3 years ago

TL;DR: There is a different issue. Above example is fixed by using addprocs(..., lazy=false).

I tried to write some tests that send and receive from all combinations of threads, processes and where the RemoteChannel is stored, but they pass (even if I increase the contention by using just a single channel). 🤔 In order to isolate the different situations, I tried to not reuse the worker processes (as a side effect even reducing contention on the channel), which for some reason caused the tests to fail (yay but not yay).

using Test
using Distributed, Base.Threads
using Base.Iterators: product

exeflags = ("--startup-file=no",
            "--check-bounds=yes",
            "--depwarn=error",
            "--threads=2")

function call_on(f, wid, tid)
  remotecall(wid) do
    t = Task(f)
    ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, tid-1)
    schedule(t)
    @assert threadid(t) == tid
    t
  end
end

# Run function on process holding the data to only serialize the result of f.
# This becomes useful for things that cannot be serialized (e.g. running tasks)
# or that would be unnecessarily big if serialized.
fetch_from_owner(f, rr) = remotecall_fetch(f∘fetch, rr.where, rr)

isdone(rr) = fetch_from_owner(istaskdone, rr)
isfailed(rr) = fetch_from_owner(istaskfailed, rr)

@testset "RemoteChannel is threadsafe" begin
  ws = ts = product(1:2, 1:2)
  timeout = 10.0
  @testset "from worker $w1 to $w2 via 1" for (w1, w2) in ws
    @testset "from thread $w1.$t1 to $w2.$t2" for (t1, t2) in ts
      procs_added = addprocs(2; exeflags)
      @everywhere procs_added using Base.Threads
      p1 = procs_added[w1]
      p2 = procs_added[w2]
      chan_id = first(procs_added)
      chan = RemoteChannel(chan_id)
      send = call_on(p1, t1) do
        put!(chan, nothing)
      end
      recv = call_on(p2, t2) do
        take!(chan)
      end
      timedwait(() -> isdone(send) && isdone(recv), timeout)
      @test isdone(send)
      @test isdone(recv)
      @test !isfailed(send)
      @test !isfailed(recv)
      rmprocs(procs_added)
    end
  end
end

Above tests fail consistently in these cases -- all errors being concurrency violations:

Test Summary:               | Pass  Fail  Error  Total
RemoteChannel is threadsafe |   48     4      4     56
  from worker 1 to 1 via 1  |   16                  16
  from worker 2 to 1 via 1  |    8            2     10
    from thread 2.1 to 1.1  |    4                   4
    from thread 2.2 to 1.1  |                 1      1
    from thread 2.1 to 1.2  |    4                   4
    from thread 2.2 to 1.2  |                 1      1
  from worker 1 to 2 via 1  |   14     2      1     17
    from thread 1.1 to 2.1  |    4                   4
    from thread 1.2 to 2.1  |    4                   4
    from thread 1.1 to 2.2  |    3     1      1      5
    from thread 1.2 to 2.2  |    3     1             4
  from worker 2 to 2 via 1  |   10     2      1     13
    from thread 2.1 to 2.1  |    4                   4
    from thread 2.2 to 2.1  |                 1      1
    from thread 2.1 to 2.2  |    4                   4
    from thread 2.2 to 2.2  |    2     2             4

Comparing the (nearly) complete outputs of those tests, I was lucky to see that some workers were not available:

$ julia-1.5 threads.jl > out1.txt
$ julia-1.5 threads.jl > out2.txt
$ diff out1.txt out2.txt
124,125c124,126
<   ProcessExitedException(29)
<   worker_from_id(::Distributed.ProcessGroup, ::Int64) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/cluster.jl:1074
---
>   no process with id 29 exists
>   error(::String) at ./error.jl:33
>   worker_from_id(::Distributed.ProcessGroup, ::Int64) at /Users/julia/buildbot/worker/package_macos64/build/usr/share/julia/stdlib/v1.5/Distributed/src/cluster.jl:1079

So I tried adding the workers using addprocs(..., lazy=false) and, out of the blue, all the tests are passing. Even the original example works. @JeffBezanson do you have a clue about what could be going on here?

jonas-schulze commented 3 years ago

Latest attempt is JuliaLang/julia#38405

orenbenkiki commented 3 years ago

I hit on what is probably the same issue - using channels and futures to communicate between threads in a single process. The gist https://gist.github.com/orenbenkiki/ac71f348d4915b394805656b142b33fe contains small sample code and error traces, in case this isn't exactly the same issue (specifically, there are no worker processes involved here at all - the problem is purely lack of thread safety).

I found it surprising that channels/futures are not thread safe - I think it warrants an explicit warning in the documentation until this is fixed.

KristofferC commented 3 years ago

Try again now that https://github.com/JuliaLang/julia/pull/38405 is merged?

orenbenkiki commented 3 years ago

Is that in the latest Julia 1.7 ? It isn't listed under https://github.com/JuliaLang/julia/blob/v1.7.0-beta2/NEWS.md#multi-threading-changes

Edit: It seems this was merged 30m ago? Wow, that's some timing. I suppose this means I'd have to download Julia from github and build it from source - I never tried doing that before... Is there somewhere one can download the compiled-bleeding-latest-githib-version (for Linux)?

Edit2: Ah, nighly builds. I'll give it a day or two so the merged version will get there and then give it a try. Thanks!

KristofferC commented 3 years ago

It just got merged so it isn't in 1.7 (yet).

jpsamaroo commented 3 years ago

With JuliaLang/julia#38405, 3 threads, and 3 workers, the second test (https://github.com/JuliaLang/Distributed.jl/issues/73) passes for me, while the first test (https://github.com/JuliaLang/Distributed.jl/issues/73) hangs with no CPU activity.

orenbenkiki commented 3 years ago

Note I see non-deterministic results. If you run it multiple times, sometimes it passes, most likely it deadlocks, sometimes it crashes, even with the same number of threads/processes. Running it for longer (larger iterations count) increases the chance of a deadlock (unsurprisingly). This was all on Julia 1.6, mind you - I haven't tried it on the latest version yet, since the potential fix was only merged less than two hours ago - waiting for the nightly build to pick it up.

vchuravy commented 3 years ago

JuliaLang/julia#38405 also only fixes a limit set of interactions and there is more work needed to ensure that Distributed.jl is fully thread-safe.

vchuravy commented 2 years ago

JuliaLang/julia#41722 reverted the first fix again.

vchuravy commented 2 years ago

Moving this of the 1.7 milestone. Making Distributed.jl thread-safe will be more work, I am hopeful that we can make progress on this for 1.8

vtjnash commented 2 years ago

@vchuravy JuliaLang/julia#42239 seems stalled, so moving this off the v1.8 milestone

jonas-schulze commented 1 year ago

I just watched the State of Julia 2022 which claimed that Distributed would now be thread-safe. However, the snippet above (https://github.com/JuliaLang/Distributed.jl/issues/73) remains broken on 1.8.5 as well as 1.9.1 both with the following summary.

Test Summary:               | Pass  Fail  Error  Total     Time
RemoteChannel is threadsafe |   48     4      3     55  1m15.1s
  from worker 1 to 1 via 1  |   16                  16    16.4s
  from worker 2 to 1 via 1  |    8            2     10    20.1s
    from thread 2.1 to 1.1  |    4                   4     4.7s
    from thread 2.2 to 1.1  |                 1      1     5.3s
    from thread 2.1 to 1.2  |    4                   4     5.0s
    from thread 2.2 to 1.2  |                 1      1     5.1s
  from worker 1 to 2 via 1  |   14     2            16    18.8s
    from thread 1.1 to 2.1  |    4                   4     4.7s
    from thread 1.2 to 2.1  |    4                   4     4.5s
    from thread 1.1 to 2.2  |    3     1             4     4.7s
    from thread 1.2 to 2.2  |    3     1             4     4.9s
  from worker 2 to 2 via 1  |   10     2      1     13    19.8s
    from thread 2.1 to 2.1  |    4                   4     5.1s
    from thread 2.2 to 2.1  |                 1      1     5.0s
    from thread 2.1 to 2.2  |    4                   4     5.0s
    from thread 2.2 to 2.2  |    2     2             4     4.7s
ERROR: LoadError: Some tests did not pass: 48 passed, 4 failed, 3 errored, 0 broken.

Note that from thread 1.1 to 2.2 is now considered failing without an error.

CC @JeffBezanson