JuliaLang / Distributed.jl

Create and control multiple Julia processes remotely for distributed computing. Ships as a Julia stdlib.
https://docs.julialang.org/en/v1/stdlib/Distributed/
MIT License
29 stars 11 forks source link

Ctrl-C when master process is waiting for crashed workers #29

Open alyst opened 9 years ago

alyst commented 9 years ago

When master Julia process is waiting in take!() on its own RemoteRef and the worker processes have all thrown exceptions, pressing Ctrl-C in REPL results in

ERROR (unhandled task failure): InterruptException:
 in task_done_hook at ./task.jl:175

but doesn't bring the master from the waiting state.

alyst commented 9 years ago

Somehow related question: would it be possible to modify take!() to throw RemoteException if all(any?) of the processes it waits for have crashed? Otherwise the tasks have to make sure that before they throw exception in the outer space they put something in the channel the master is waiting for.

amitmurthy commented 9 years ago

Can you post your code that results in this behavior?

alyst commented 9 years ago

It was not as simple as I've described. In very simple examples Julia behaves normally. However, here's the stripped down version of master/slave parallel optimizer from robertfeldt/BlackBoxOptim.jl#25. Unfortunately it's not the most minimal test, but at least it should demonstrate the problem. The master stucks in step!() waiting for the slave that have thrown an exception. When I run this code from REPL, Ctrl+C while waiting for take!() gives the above exception. From command line it's ok.

addprocs(1)

@everywhere module TestTake

# message with the candidate passed between the workers and the master

typealias WorkerChannel Channel{Float64}
typealias WorkerChannelRef RemoteRef{WorkerChannel}

type ParallelPopulationOptimizer
  worker_procs::Vector{Int}                         # IDs of worker processes
  final_fitnesses::Vector{RemoteRef{Channel{Any}}}  # references to the @spawnat ID run_worker()
  from_workers::WorkerChannelRef                 # inbound channel of candidates from all workers
  to_workers::Vector{WorkerChannelRef}           # outgoing channels to each worker
  is_started::RemoteRef{Channel{Bool}}              # flag that all workers have started
end

nworkers(ppopt::ParallelPopulationOptimizer) = length(ppopt.worker_procs)

# outer parallel population optimizer constructor that
# also spawns worker tasks
function ParallelPopulationOptimizer(NWorkers::Int = 2,
    ArchiveCapacity::Int = 10,
    ToWorkerChannelCapacity::Int = 1000,
    FromWorkersChannelCapacity::Int = 10000)
  # take the first NWorkers workers
  Workers = workers()
  ParallelPopulationOptimizer(Workers,
       Vector{RemoteRef{Channel{Any}}}(length(Workers)),
       RemoteRef(() -> WorkerChannel(FromWorkersChannelCapacity)),
       WorkerChannelRef[RemoteRef(() -> WorkerChannel(ToWorkerChannelCapacity), id) for id in Workers],
       RemoteRef(() -> Channel{Bool}(1)))
end

function setup!(ppopt::ParallelPopulationOptimizer)
  info("Initializing parallel workers...")
  workers_ready = RemoteRef(() -> Channel{Int}(nworkers(ppopt))) # FIXME do we need to wait for the worker?
  @assert !isready(ppopt.is_started)
  for i in eachindex(ppopt.worker_procs)
    procid = ppopt.worker_procs[i]
    info("  Spawning worker #$i at process #$procid...");
    ppopt.final_fitnesses[i] = @spawnat procid run_worker(i,
                           workers_ready, ppopt.is_started
                           )
  end
  info("Waiting for the workers to be ready...")
  nready = 0
  while nready < nworkers(ppopt)
    worker_id = take!(workers_ready)
    info("  Worker #$worker_id is ready")
    nready += 1
  end
  info("All workers ready")
  return ppopt
end

function step!(ppopt::ParallelPopulationOptimizer)
  #println("main#: n_evals=$(num_evals(ppopt.evaluator))")
  if !isready(ppopt.is_started) put!(ppopt.is_started, true) end # if it's the first iteration
  info("take!...")
  candidate = take!(ppopt.from_workers)#::CandidateMessage
  info("take!")
end

function run_worker(id::Int,
                    worker_ready::RemoteRef{Channel{Int}},
                    is_started::RemoteRef{Channel{Bool}} )
  put!(worker_ready, id)
  fetch(is_started) # wait until the master is started
  try
    error("just error")
  catch e
    rethrow(e)
  end
end

end

ppopt = TestTake.ParallelPopulationOptimizer()
TestTake.setup!(ppopt)
TestTake.step!(ppopt)