JuliaLang / julia

The Julia Programming Language
https://julialang.org/
MIT License
45.91k stars 5.49k forks source link

addprocs problem with multiple nodes in 0.6.1 #24722

Closed alkorang closed 6 years ago

alkorang commented 7 years ago

I setup cluster with multiple nodes and I works perfectly with 0.6.0 version, but not with 0.6.1 version.

               _
   _       _ _(_)_     |  A fresh approach to technical computing
  (_)     | (_) (_)    |  Documentation: https://docs.julialang.org
   _ _   _| |_  __ _   |  Type "?help" for help.
  | | | | | | |/ _` |  |
  | | |_| | | | (_| |  |  Version 0.6.1 (2017-10-24 22:15 UTC)
 _/ |\__'_|_|_|\__'_|  |  Official http://julialang.org/ release
|__/                   |  x86_64-pc-linux-gnu

julia> versioninfo()
Julia Version 0.6.1
Commit 0d7248e2ff (2017-10-24 22:15 UTC)
Platform Info:
  OS: Linux (x86_64-pc-linux-gnu)
  CPU: Intel(R) Xeon(R) CPU           E5405  @ 2.00GHz
  WORD_SIZE: 64
  BLAS: libopenblas (USE64BITINT DYNAMIC_ARCH NO_AFFINITY Penryn)
  LAPACK: libopenblas64_
  LIBM: libopenlibm
  LLVM: libLLVM-3.9.1 (ORCJIT, penryn)

julia> addprocs([("node2", 1)])
ERROR: connect: host is unreachable (EHOSTUNREACH)
try_yieldto(::Base.##296#297{Task}, ::Task) at ./event.jl:189
wait() at ./event.jl:234
wait(::Condition) at ./event.jl:27
stream_wait(::TCPSocket, ::Condition, ::Vararg{Condition,N} where N) at ./stream.jl:42
wait_connected(::TCPSocket) at ./stream.jl:258
connect at ./stream.jl:983 [inlined]
connect_to_worker(::SubString{String}, ::UInt16) at ./distributed/managers.jl:493
connect(::Base.Distributed.SSHManager, ::Int64, ::WorkerConfig) at ./distributed/managers.jl:431
create_worker(::Base.Distributed.SSHManager, ::WorkerConfig) at ./distributed/cluster.jl:443
setup_launched_worker(::Base.Distributed.SSHManager, ::WorkerConfig, ::Array{Int64,1}) at ./distributed/cluster.jl:389
(::Base.Distributed.##33#36{Base.Distributed.SSHManager,WorkerConfig,Array{Int64,1}})() at ./task.jl:335
Stacktrace:
 [1] sync_end() at ./task.jl:287
 [2] macro expansion at ./task.jl:303 [inlined]
 [3] #addprocs_locked#30(::Array{Any,1}, ::Function, ::Base.Distributed.SSHManager) at ./distributed/cluster.jl:344
 [4] (::Base.Distributed.#kw##addprocs_locked)(::Array{Any,1}, ::Base.Distributed.#addprocs_locked, ::Base.Distributed.SSHManager) at ./<missing>:0
 [5] #addprocs#29(::Array{Any,1}, ::Function, ::Base.Distributed.SSHManager) at ./distributed/cluster.jl:319
 [6] (::Base.Distributed.#kw##addprocs)(::Array{Any,1}, ::Base.Distributed.#addprocs, ::Base.Distributed.SSHManager) at ./<missing>:0
 [7] #addprocs#239(::Bool, ::Cmd, ::Int64, ::Array{Any,1}, ::Function, ::Array{Tuple{String,Int64},1}) at ./distributed/managers.jl:114
 [8] addprocs(::Array{Tuple{String,Int64},1}) at ./distributed/managers.jl:113

julia> Master process (id 1) could not connect within 60.0 seconds.
exiting.
julia> 

julia> addprocs([("node2", 1)];tunnel=true)
1-element Array{Int64,1}:
 3

julia> addprocs([("node3", 1)];tunnel=true)
ERROR: connect: host is unreachable (EHOSTUNREACH)
Stacktrace:
 [1] try_yieldto(::Base.##296#297{Task}, ::Task) at ./event.jl:189
 [2] wait() at ./event.jl:234
 [3] wait(::Condition) at ./event.jl:27
 [4] stream_wait(::TCPSocket, ::Condition, ::Vararg{Condition,N} where N) at ./stream.jl:42
 [5] wait_connected(::TCPSocket) at ./stream.jl:258
 [6] connect at ./stream.jl:983 [inlined]
 [7] connect_to_worker(::SubString{String}, ::UInt16) at ./distributed/managers.jl:493
 [8] connect_w2w(::Int64, ::WorkerConfig) at ./distributed/managers.jl:452
 [9] connect(::Base.Distributed.DefaultClusterManager, ::Int64, ::WorkerConfig) at ./distributed/managers.jl:386
 [10] connect_to_peer(::Base.Distributed.DefaultClusterManager, ::Int64, ::WorkerConfig) at ./distributed/process_messages.jl:329
 [11] (::Base.Distributed.##117#118{WorkerConfig,Int64})() at ./task.jl:335
Error [connect: host is unreachable (EHOSTUNREACH)] on 4 while connecting to peer 3. Exiting.
Worker 4 terminated.
ERROR (unhandled task failure): Version read failed. Connection closed by peer.
Stacktrace:
 [1] process_hdr(::TCPSocket, ::Bool) at ./distributed/process_messages.jl:257
 [2] message_handler_loop(::TCPSocket, ::TCPSocket, ::Bool) at ./distributed/process_messages.jl:143
 [3] process_tcp_streams(::TCPSocket, ::TCPSocket, ::Bool) at ./distributed/process_messages.jl:118
 [4] (::Base.Distributed.##99#100{TCPSocket,TCPSocket,Bool})() at ./event.jl:73

First I tried with default option, it does not worked. So I tried with tunnel=true option, which make it possible to connect one node, but not multiple nodes at once.

So I tried the same with 0.6.0 version, and it worked perfectly.

               _
   _       _ _(_)_     |  A fresh approach to technical computing
  (_)     | (_) (_)    |  Documentation: https://docs.julialang.org
   _ _   _| |_  __ _   |  Type "?help" for help.
  | | | | | | |/ _` |  |
  | | |_| | | | (_| |  |  Version 0.6.0 (2017-06-19 13:05 UTC)
 _/ |\__'_|_|_|\__'_|  |  Official http://julialang.org/ release
|__/                   |  x86_64-pc-linux-gnu

julia> versioninfo()
Julia Version 0.6.0
Commit 9036443 (2017-06-19 13:05 UTC)
Platform Info:
  OS: Linux (x86_64-pc-linux-gnu)
  CPU: Intel(R) Xeon(R) CPU           E5405  @ 2.00GHz
  WORD_SIZE: 64
  BLAS: libopenblas (USE64BITINT DYNAMIC_ARCH NO_AFFINITY Penryn)
  LAPACK: libopenblas64_
  LIBM: libopenlibm
  LLVM: libLLVM-3.9.1 (ORCJIT, penryn)

julia> addprocs([("node2", 1)])
1-element Array{Int64,1}:
 2

julia> addprocs([("node3", 1)])
1-element Array{Int64,1}:
 3

julia> addprocs([("node2", 1)];tunnel=true)
1-element Array{Int64,1}:
 4

julia> addprocs([("node3", 1)];tunnel=true)
1-element Array{Int64,1}:
 5

julia> 
ararslan commented 7 years ago

Can you build the aa/backports-0.6.2 branch and see if that works for you?

amitmurthy commented 7 years ago

I think the backport of this https://github.com/JuliaLang/julia/pull/21818 onto 0.6 may be the cause of this behavior.

The workers are now listening on a system selected ephemeral port which may not be accessible from the master node. Does addprocs([("node2 <ip-of-node2>:9009", 1)]) work? Can you test with all ports open between the master node and workers?

fredrikekre commented 7 years ago

xref: https://discourse.julialang.org/t/addprocs-with-ssh-does-not-work-on-0-6-1/7253

alkorang commented 7 years ago

@ararslan Thank you I will try it.

amitmurthy commented 7 years ago

Thanks. Please test with all ports open between all nodes in the cluster as the workers connect to each other too.

alkorang commented 7 years ago

@amitmurthy Thank you for your comments. I tried open port numbers starting from 9009 and it works fine with ("node2 <ip-of-node2>:9009", 1) and ("node2 <ip-of-node2>:9010", 1), but error occurs with ("node2 <ip-of-node2>:9009", 2).

julia> addprocs([("node2 <ip-of-node2>:9009", 2)])
ERROR: listen: address already in use (EADDRINUSE)
Stacktrace:
 [1] uv_error at ./libuv.jl:68 [inlined]
 [2] #listen#347(::Int64, ::Function, ::Base.TCPServer) at ./stream.jl:933
 [3] (::Base.#kw##listen)(::Array{Any,1}, ::Base.#listen, ::Base.TCPServer) at ./<missing>:0
 [4] #listen#354(::Int64, ::Function, ::Base.InetAddr{IPv4}) at ./socket.jl:773
 [5] (::Base.#kw##listen)(::Array{Any,1}, ::Base.#listen, ::Base.InetAddr{IPv4}) at ./<missing>:0
 [6] start_worker(::Base.PipeEndpoint, ::String) at ./distributed/cluster.jl:159
 [7] process_options(::Base.JLOptions) at ./client.jl:262
 [8] _start() at ./client.jl:371
ERROR: On worker 2:
Unable to read host:port string from worker. Launch command exited with error?
read_worker_host_port at ./distributed/cluster.jl:236
launch_additional at ./distributed/cluster.jl:533
#106 at ./distributed/process_messages.jl:268 [inlined]
run_work_thunk at ./distributed/process_messages.jl:56
macro expansion at ./distributed/process_messages.jl:268 [inlined]
#105 at ./event.jl:73
#remotecall_fetch#141(::Array{Any,1}, ::Function, ::Function, ::Base.Distributed.Worker, ::Int64, ::Vararg{Any,N} where N) at ./distributed/remotecall.jl:354
remotecall_fetch(::Function, ::Base.Distributed.Worker, ::Int64, ::Vararg{Any,N} where N) at ./distributed/remotecall.jl:346
#remotecall_fetch#144(::Array{Any,1}, ::Function, ::Function, ::Int64, ::Int64, ::Vararg{Any,N} where N) at ./distributed/remotecall.jl:367
macro expansion at ./distributed/cluster.jl:413 [inlined]
macro expansion at ./task.jl:302 [inlined]
launch_n_additional_processes(::Base.Distributed.SSHManager, ::Int64, ::WorkerConfig, ::Int64, ::Array{Int64,1}) at ./distributed/cluster.jl:408
setup_launched_worker(::Base.Distributed.SSHManager, ::WorkerConfig, ::Array{Int64,1}) at ./distributed/cluster.jl:402
(::Base.Distributed.##33#36{Base.Distributed.SSHManager,WorkerConfig,Array{Int64,1}})() at ./task.jl:335
Stacktrace:
 [1] sync_end() at ./task.jl:287
 [2] macro expansion at ./task.jl:303 [inlined]
 [3] #addprocs_locked#30(::Array{Any,1}, ::Function, ::Base.Distributed.SSHManager) at ./distributed/cluster.jl:344
 [4] (::Base.Distributed.#kw##addprocs_locked)(::Array{Any,1}, ::Base.Distributed.#addprocs_locked, ::Base.Distributed.SSHManager) at ./<missing>:0
 [5] #addprocs#29(::Array{Any,1}, ::Function, ::Base.Distributed.SSHManager) at ./distributed/cluster.jl:319
 [6] (::Base.Distributed.#kw##addprocs)(::Array{Any,1}, ::Base.Distributed.#addprocs, ::Base.Distributed.SSHManager) at ./<missing>:0
 [7] #addprocs#239(::Bool, ::Cmd, ::Int64, ::Array{Any,1}, ::Function, ::Array{Tuple{String,Int64},1}) at ./distributed/managers.jl:114
 [8] addprocs(::Array{Tuple{String,Int64},1}) at ./distributed/managers.jl:113

julia>
amitmurthy commented 7 years ago

That is fine. The address in use is expected with the way you tested above - 2 workers cannot both bind to 9009 on the same host . Can you test by opening all ports between all nodes of the cluster(and master) and a regular addprocs?

alkorang commented 7 years ago

Can you test by opening all ports between all nodes of the cluster(and master) and a regular addprocs?

I don't have permission for the network right now, so I created a cluster on AWS.

julia> addprocs([("node2", 1)])
1-element Array{Int64,1}:
 2

julia> addprocs([("node3", 1)])
1-element Array{Int64,1}:
 3

julia> addprocs([("node2", 1)];tunnel=true)
1-element Array{Int64,1}:
 4

julia> addprocs([("node3", 1)];tunnel=true)
1-element Array{Int64,1}:
 5

julia> versioninfo()
Julia Version 0.6.1
Commit 0d7248e2ff (2017-10-24 22:15 UTC)
Platform Info:
  OS: Linux (x86_64-pc-linux-gnu)
  CPU: Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
  WORD_SIZE: 64
  BLAS: libopenblas (USE64BITINT DYNAMIC_ARCH NO_AFFINITY Sandybridge)
  LAPACK: libopenblas64_
  LIBM: libopenlibm
  LLVM: libLLVM-3.9.1 (ORCJIT, ivybridge)

julia> 

...and it worked fine.

One of my co-workers said the problem could be FreeIPA in our cluster. Was there any problems with FreeIPA?

amitmurthy commented 7 years ago

No idea about FreeIPA.

I am wondering if cluster setups usually block connections to the ephemeral port range. If so, we should address the issue on master too.

usefulhyun commented 7 years ago

https://discourse.julialang.org/t/addprocs-with-ssh-does-not-work-on-0-6-1/7253/3

I use IPA system and all of my remotehosts are connected via IPA. My program works well in version 0.6.0 I do not know why this does not work in version 0.6.1. In my case, this works in version 0.6.0. But it suddenly does not work. I wonder what changes have made between two versions. addprocs(["hostname"]) does not work more. and addprocs(["hostname"], tunnel=true) works, but it does not work when I add more than one ssh host.

usefulhyun commented 7 years ago

And I tried

addprocs(1, restrict=false)
addprocs(["remotehost"], tunnel=true)  # this line generates error

and

addprocs(["remotehost"], tunnel=true)
addprocs( 1 ) # this line also generates the same error as the above.

Both cases create errors in version 0.6.1.

amitmurthy commented 7 years ago

It does appear that the cluster environments in question block connections to ports in the ephemeral port range. Can you check with your sysadmin?

Or you could try the following:

With Julia 0.6.1:

On one terminal open a ssh session to node2 and run the following:

julia> p,h = listenany(IPv4("0.0.0.0"), 0)
(0xd068, Base.TCPServer(RawFD(21) active))

julia> Int(p)
53352

julia> @schedule begin
         l = listen(p)
         while true
           a = accept(l)
           println("GOT CONNECTION!")
         end
       end    
Task (runnable) @0x000000011c0cbcd0

In another (local) terminal, try connecting to the port printed above (in my case it was 53352, will be different for you)

julia> connect(node2, 53352)
TCPSocket(RawFD(21) open, 0 bytes waiting)

It should fail. Repeat the same exercise with the listen port changed to 9009. It should work.

amitmurthy commented 7 years ago

See https://github.com/JuliaLang/julia/issues/24722#issuecomment-346657148 for the cause. We are planning to revert this behavior in 0.6.2

ararslan commented 7 years ago

Amit's fix for this has now been incorporated into my backport branch. It would be great if you could build aa/backports-0.6.2 and let me know if that fixes the issue for you.

alkorang commented 7 years ago

@ararslan

It would be great if you could build aa/backports-0.6.2 and let me know if that fixes the issue for you.

I tried to build aa/backports-0.6.2 on CentOS 7.4 but it failed. I tried it on Debian 9.2 and it succeeded, but the cluster runs on CentOS, so libstdc++ version is not matched and failed to run. I've searched the way to build generic linux binary but still have no idea. Could you give me some guide for it?

alkorang commented 7 years ago

@amitmurthy

On one terminal open a ssh session to node2 and run the following:

The error occurs with your code on node2:

julia> p,h = listenany(IPv4("0.0.0.0"), 0)
(0x9b3e, Base.TCPServer(RawFD(20) active))

julia> Int(p)
39742

julia> @schedule begin
                l = listen(p)
                while true
                  a = accept(l)
                  println("GOT CONNECTION!")
                end
              end
ERROR (unhandled task failure): listen: address already in use (EADDRINUSE)
Stacktrace:
 [1] uv_error at ./libuv.jl:68 [inlined]
 [2] #listen#347(::Int64, ::Function, ::Base.TCPServer) at ./stream.jl:933
 [3] (::Base.#kw##listen)(::Array{Any,1}, ::Base.#listen, ::Base.TCPServer) at ./<missing>:0
 [4] #listen#354(::Int64, ::Function, ::Base.InetAddr{IPv4}) at ./socket.jl:773
 [5] (::Base.#kw##listen)(::Array{Any,1}, ::Base.#listen, ::Base.InetAddr{IPv4}) at ./<missing>:0 (repeats 2 times)
 [6] listen(::UInt16) at ./socket.jl:776
 [7] macro expansion at ./REPL[3]:2 [inlined]
 [8] (::##1#2)() at ./event.jl:73
Task (failed) @0x00007f2c01a531f0
listen: address already in use (EADDRINUSE)
uv_error at ./libuv.jl:68 [inlined]
#listen#347(::Int64, ::Function, ::Base.TCPServer) at ./stream.jl:933
(::Base.#kw##listen)(::Array{Any,1}, ::Base.#listen, ::Base.TCPServer) at ./<missing>:0
#listen#354(::Int64, ::Function, ::Base.InetAddr{IPv4}) at ./socket.jl:773
(::Base.#kw##listen)(::Array{Any,1}, ::Base.#listen, ::Base.InetAddr{IPv4}) at ./<missing>:0 (repeats 2 times)
listen(::UInt16) at ./socket.jl:776
macro expansion at ./REPL[3]:2 [inlined]
(::##1#2)() at ./event.jl:73

julia>

So I changed the port number to 9009 but still the same error.

julia> p,h = listenany(IPv4("0.0.0.0"), 9009)
(0x2331, Base.TCPServer(RawFD(20) active))

julia> Int(p)
9009

julia> @schedule begin
                l = listen(p)
                while true
                  a = accept(l)
                  println("GOT CONNECTION!")
                end
              end
ERROR (unhandled task failure): listen: address already in use (EADDRINUSE)
Stacktrace:
 [1] uv_error at ./libuv.jl:68 [inlined]
 [2] #listen#347(::Int64, ::Function, ::Base.TCPServer) at ./stream.jl:933
 [3] (::Base.#kw##listen)(::Array{Any,1}, ::Base.#listen, ::Base.TCPServer) at ./<missing>:0
 [4] #listen#354(::Int64, ::Function, ::Base.InetAddr{IPv4}) at ./socket.jl:773
 [5] (::Base.#kw##listen)(::Array{Any,1}, ::Base.#listen, ::Base.InetAddr{IPv4}) at ./<missing>:0 (repeats 2 times)
 [6] listen(::UInt16) at ./socket.jl:776
 [7] macro expansion at ./REPL[3]:2 [inlined]
 [8] (::##1#2)() at ./event.jl:73
Task (failed) @0x00007f89433771f0
listen: address already in use (EADDRINUSE)
uv_error at ./libuv.jl:68 [inlined]
#listen#347(::Int64, ::Function, ::Base.TCPServer) at ./stream.jl:933
(::Base.#kw##listen)(::Array{Any,1}, ::Base.#listen, ::Base.TCPServer) at ./<missing>:0
#listen#354(::Int64, ::Function, ::Base.InetAddr{IPv4}) at ./socket.jl:773
(::Base.#kw##listen)(::Array{Any,1}, ::Base.#listen, ::Base.InetAddr{IPv4}) at ./<missing>:0 (repeats 2 times)
listen(::UInt16) at ./socket.jl:776
macro expansion at ./REPL[3]:2 [inlined]
(::##1#2)() at ./event.jl:73

julia>

Yet addprocs([("node2 <ip-of-node2>:9009", 1)]) works fine.

ararslan commented 7 years ago

@alkorang Try this test binary. That's a generic Linux build of my backport branch.

Note: That binary is NOT intended for general use. It is for testing purposes ONLY.

amitmurthy commented 7 years ago

Sorry, the code block should be

p,s = listenany(IPv4("0.0.0.0"), 0)
Int(p)
@schedule begin
   while true
     accept(s)
     println("GOT CONNECTION!")
   end
end

i.e., remove the additional listen call.

alkorang commented 7 years ago

@ararslan

Try this test binary. That's a generic Linux build of my backport branch.

The same error when I opened this issue occurs.

alkorang commented 7 years ago

@amitmurthy I tried the code with a random port and 9009.

With a random port, node2:

julia> p,s = listenany(IPv4("0.0.0.0"), 0)
(0x9a4a, Base.TCPServer(RawFD(20) active))

julia> Int(p)
39498

julia> @schedule begin
          while true
            accept(s)
            println("GOT CONNECTION!")
          end
       end
Task (runnable) @0x00007fbbf82471f0

julia>

node1:

julia> connect("node2", 39498)
ERROR: connect: host is unreachable (EHOSTUNREACH)
Stacktrace:
 [1] try_yieldto(::Base.##296#297{Task}, ::Task) at ./event.jl:189
 [2] wait() at ./event.jl:234
 [3] wait(::Condition) at ./event.jl:27
 [4] stream_wait(::TCPSocket, ::Condition, ::Vararg{Condition,N} where N) at ./stream.jl:42
 [5] wait_connected(::TCPSocket) at ./stream.jl:258
 [6] connect at ./stream.jl:983 [inlined]
 [7] connect(::String, ::Int64) at ./socket.jl:741

julia> 

With port 9009, node2:

julia> p,s = listenany(IPv4("0.0.0.0"), 9009)
(0x2331, Base.TCPServer(RawFD(20) active))

julia> Int(p)
9009

julia> @schedule begin
          while true
            accept(s)
            println("GOT CONNECTION!")
          end
       end
Task (runnable) @0x00007f3bbc9cf1f0

julia> GOT CONNECTION!
julia>

node1:

julia> connect("node2", 9009)
TCPSocket(RawFD(20) open, 0 bytes waiting)

julia>
alkorang commented 6 years ago

Reverted in 0.6.2