JuliaLang / Distributed.jl

Create and control multiple Julia processes remotely for distributed computing. Ships as a Julia stdlib.
https://docs.julialang.org/en/v1/stdlib/Distributed/
MIT License
23 stars 9 forks source link

Different remote copying behaviour with one process vs multiple #41

Open iamed2 opened 7 years ago

iamed2 commented 7 years ago
using Base.Test

function main()
    @testset begin
        rng = MersenneTwister(3742)
        numbers = pmap(1:1000) do _
            rand(rng)
        end

        @test all(numbers .== numbers[1])

        spawn_numbers = [(@fetch rand(rng)) for i = 1:1000]

        @test all(spawn_numbers .== spawn_numbers[1])

        pfor_numbers = @parallel vcat for i = 1:1000
            [rand(rng)]
        end

        @test all(pfor_numbers .== pfor_numbers[1])
    end
end

main()

There are three cases here to compare different behaviours with the different parallel methods but I really just care about the @fetch/@spawn case.

ericdavies@whitacre> julia -p 1 ~/rng_parallel_test.jl
test set: Test Failed
  Expression: all(pfor_numbers .== pfor_numbers[1])
 in record(::Base.Test.DefaultTestSet, ::Base.Test.Fail) at test.jl:431
 in do_test(::Base.Test.Returned, ::Expr) at test.jl:281
 in macro expansion at rng_parallel_test.jl:20 [inlined]
 in macro expansion at test.jl:674 [inlined]
 in main() at rng_parallel_test.jl:4
 in include_from_node1(::String) at loading.jl:488
 in include_from_node1(::String) at sys.dylib:?
 in process_options(::Base.JLOptions) at client.jl:265
 in _start() at client.jl:321
 in _start() at sys.dylib:?
Test Summary: | Pass  Fail  Total
  test set    |    2     1      3
ERROR: LoadError: Some tests did not pass: 2 passed, 1 failed, 0 errored, 0 broken.
 in finish(::Base.Test.DefaultTestSet) at test.jl:498
 in macro expansion at test.jl:681 [inlined]
 in main() at rng_parallel_test.jl:4
 in include_from_node1(::String) at loading.jl:488
 in include_from_node1(::String) at sys.dylib:?
 in process_options(::Base.JLOptions) at client.jl:265
 in _start() at client.jl:321
 in _start() at sys.dylib:?
while loading /Users/ericdavies/rng_parallel_test.jl, in expression starting on line 24
ericdavies@whitacre> julia ~/rng_parallel_test.jl
test set: Test Failed
  Expression: all(numbers .== numbers[1])
 in record(::Base.Test.DefaultTestSet, ::Base.Test.Fail) at test.jl:431
 in do_test(::Base.Test.Returned, ::Expr) at test.jl:281
 in macro expansion at rng_parallel_test.jl:10 [inlined]
 in macro expansion at test.jl:674 [inlined]
 in main() at rng_parallel_test.jl:4
 in include_from_node1(::String) at loading.jl:488
 in include_from_node1(::String) at sys.dylib:?
 in process_options(::Base.JLOptions) at client.jl:265
 in _start() at client.jl:321
 in _start() at sys.dylib:?
test set: Test Failed
  Expression: all(spawn_numbers .== spawn_numbers[1])
 in record(::Base.Test.DefaultTestSet, ::Base.Test.Fail) at test.jl:431
 in do_test(::Base.Test.Returned, ::Expr) at test.jl:281
 in macro expansion at rng_parallel_test.jl:14 [inlined]
 in macro expansion at test.jl:674 [inlined]
 in main() at rng_parallel_test.jl:4
 in include_from_node1(::String) at loading.jl:488
 in include_from_node1(::String) at sys.dylib:?
 in process_options(::Base.JLOptions) at client.jl:265
 in _start() at client.jl:321
 in _start() at sys.dylib:?
test set: Test Failed
  Expression: all(pfor_numbers .== pfor_numbers[1])
 in record(::Base.Test.DefaultTestSet, ::Base.Test.Fail) at test.jl:431
 in do_test(::Base.Test.Returned, ::Expr) at test.jl:281
 in macro expansion at rng_parallel_test.jl:20 [inlined]
 in macro expansion at test.jl:674 [inlined]
 in main() at rng_parallel_test.jl:4
 in include_from_node1(::String) at loading.jl:488
 in include_from_node1(::String) at sys.dylib:?
 in process_options(::Base.JLOptions) at client.jl:265
 in _start() at client.jl:321
 in _start() at sys.dylib:?
Test Summary: | Fail  Total
  test set    |    3      3
ERROR: LoadError: Some tests did not pass: 0 passed, 3 failed, 0 errored, 0 broken.
 in finish(::Base.Test.DefaultTestSet) at test.jl:498
 in macro expansion at test.jl:681 [inlined]
 in main() at rng_parallel_test.jl:4
 in include_from_node1(::String) at loading.jl:488
 in include_from_node1(::String) at sys.dylib:?
 in process_options(::Base.JLOptions) at client.jl:265
 in _start() at client.jl:321
 in _start() at sys.dylib:?
while loading /Users/ericdavies/rng_parallel_test.jl, in expression starting on line 24

When there are one or more additional processes, rng is copied. When there is only one process, it is not.

The reason the @parallel for case fails both times is due to the use of CachingPool, which may also be used for pmap in the future via https://github.com/JuliaLang/julia/issues/21946.

Ideally I would like the behaviours to be consistent (though I don't need them to be deterministic).

amitmurthy commented 7 years ago

This would be a tricky situation to address if the RNG is being serialized/deserialized every time.

On 0.6, global const rng = MersenneTwister(3742) would ensure consistency with/without additional workers.

iamed2 commented 7 years ago

The behaviour we're aiming for in our use case is actually to have the RNG copied each time, but that's outside this issue. You can replace the RNG with any local mutable for the purposes of this issue.

amitmurthy commented 7 years ago

mutables are not synchronized across processes in any manner. There is no cluster wide shared state for mutables referenced in closures. A common data store external to the julia processes can address such a requirement. Or keeping data on one of the processes and workers doing an atomic update-and-fetch from the data store process.

iamed2 commented 7 years ago

But does that mean that mutating mutables captured with @spawn has undefined behaviour and might or might not mutate the same variable in other @spawn calls?

The docs currently say (emphasis mine):

Note that although parallel for loops look like serial for loops, their behavior is dramatically different. In particular, the iterations do not happen in a specified order, and writes to variables or arrays will not be globally visible since iterations run on different processes. Any variables used inside the parallel loop will be copied and broadcast to each process.

This is the only mention of the behaviour of captured mutables.

I think one of these should happen:

  1. Mutables are either copied for each @spawn call, even if the only worker is the host, or cached and shared among processes based on some trait such as:
    remote_move(::Type{MyType}) = Copy()
    remote_move(::Type{MyOtherType}) = Cache()
  2. Mutables are always copied for each @spawn call, even if the only worker is the host
  3. The cache-for-each-worker behaviour is used for @spawn and pmap and documented in @spawn and pmap docstrings as well as in the Parallel Computing section of the manual
amitmurthy commented 7 years ago

For performance reasons remote calls executing locally short-circuit the entire serialization-deserialization cycle of the closure. I get your view about consistent results between nprocs=1 and nprocs>1 in absolutely all cases.

The easier implementation to ensure similar behavior in all cases is to have remote calls to myid() go through a loopback connection. But at the cost of inefficiency in remote calls to the local process.