vatlab / sos

SoS workflow system for daily data analysis
http://vatlab.github.io/sos-docs
BSD 3-Clause "New" or "Revised" License
274 stars 45 forks source link

Send tasks to multiple workers #1063

Open BoPeng opened 6 years ago

BoPeng commented 6 years ago

A typical work patternn for ipyparallel is to start multiple local or remote workers and ipyparallel would send multiple tasks to them using map, map_async etc. ipyparallel would handle task monitoring and load balancing. Translating to sos, it would mean something like

input: many_input_files, group_by=1
task: quque=MULTIPLE_QUEUES
task_content

This does not work right now because sos supports one queue for each task. The remote tasks are executed one by one, with task engine controlling the number of concurrent tasks on each worker (queue).

SoS' process quque SoS' cluster queue SoS' rq/celery support ipyparallel (or celery)
job dispatcher sos sos sos ipyparallel
job executor process PBS rq workers
parallelism multiple separated processes multiple PBS jobs multiple workers multiple workers
load balancing none (sos controls number of running jobs by PBS by rq by ipyparallel
multiple queues no no no multiple workers as seen by ipyparallel

Basically, ipyparallel is not a workflow system. It is an (interactive) batch system more similar to rq, celery than sos. SoS replies on batch systems (PBS, rq etc) to handle multiple jobs. However, it is possible for SoS to handle multiple queues (workers) directly so that,

  1. Users define multiple queues (remote servers)
  2. Use sos run -q q1 q2 q3 to submit tasks to multiple queues directly, without the need for a task queue or PBS system. sos would need to handle load balancing etc.

In practice our department has a number of individual servers not managed by PBS so it is possible to do this, but a PBS system would be much better at managing resources so I am wondering if there is really a need for SoS to support such a scenario.

gaow commented 6 years ago

What's the advantage to send multiple queues for each task -- task is the smallest SoS unit so how can one task runs on multiple hosts?

BoPeng commented 6 years ago

Sorry, I created this ticket from my cellphone without details. I have updated the ticket.

BoPeng commented 6 years ago

In terms of comparison between "workers" of sos and ipyparallel, I would emphasize that

  1. sos' workers are used to execute different nodes of DAG. The worker loads are heterogeneous and a worker might generate more tasks.
  2. ipyparallel workers are used to execute large number of homogeneous tasks mostly (apply, apply_async etc). More like the concurrent substeps in the same SoS step.
  3. sos' concurrent substeps are currently handled by multiprocessing pool locally.
gaow commented 6 years ago

I see, of course. Let me try to interpret the implications. One benefit implementing the ipyparallel model would be more balanced workload DAG-wise, not locally. Another would be a more homogeneous job controlling mechanism and allows the potential to natively support more systems (like nextflow?), although it might be hard to reach to a level of sophistication of a system like PBS. Right?

BoPeng commented 6 years ago

Yes, the "mini-cluster" approach of nextflow means it has some built-in load balancing features and we generally avoids this problem by letting others to manage the execution of large amount of tasks.

BoPeng commented 6 years ago

Our model currently is

  1. Keep DAG and light computation (steps) locally, even with multiprocessing.
  2. Send specialized tasks to remote hosts as processes.
  3. Send heavy computations to task queues.

Nexflow submits multi-node jobs on clusters and execute everything in the mini-cluster, so it also does heavy computations in nextflow. It avoids the overhead of submitting and monitoring PBS jobs and can be more efficient. SoS could also do this by allowing our workers to reside on multiple computing nodes and converting them also to workers that handle jobs (substeps). We have already converted from local multiprocessing to single node zmq messaging so expanding the architecture to remote zmq workers would allow us to do something like that. That is to say, with something like

mpirun -n8 sos run  script -m

we can start sos on 8 nodes and one sos instance can act as master and treat other sos instances as workers (instead of creating sos workers locally). We can ignore option -q (or task: queue="") in this case and submit all tasks as concurrent jobs to workers. This can potentially be much more efficient especially for your case (large amount of small tasks/jobs).