discoproject / disco

a Map/Reduce framework for distributed computing
http://discoproject.org
BSD 3-Clause "New" or "Revised" License
1.63k stars 241 forks source link

Tasks will wait for a single node which is busy, while there are other nodes that are idle #597

Open pooya opened 9 years ago

pooya commented 9 years ago

From the thread: https://groups.google.com/forum/#!topic/disco-dev/pIynGbq1qMk

The scheduler decides to run some tasks on a node and keeps them in the waiting stage. However, there might be other nodes that are free and can run that task. The scheduler should be improved to avoid such early assignments. Anothe solution might be using a task-stealing mechanism to let some nodes steal the tasks of other nodes when they are idle.

erikdubbelboer commented 9 years ago

We're seeing a similar thing with our setup of 2 nodes where one node has twice as many workers as the other while being a similar machine. The node with twice as many workers finishes it's tasks much quicker and then just waits for the other node.

I'm not familiar with erlang at all but I wouldn't mind spending some time to dive into it and try to fix this if someone could quickly point me in the right direction.

pooya commented 9 years ago

In master/src/job_coordinator.erl. At the beginning of a stage, the start_next_stage function is called which gets all of the outputs of the previous stage and calls do_submit_tasks and then do_submit_tasks_in which will schedule the task. We have a mechanism for limiting the number of tasks (job_event:can_run_task) which doesn't do much now. I think the best solution is to modify this policy to avoid scheduling all of the tasks all the time and keeping them in the pending state. However, any such policy modifications needs a good amount of testing in different scenarios to maximize the usage of the cluster. Hopefully, I can get to this toward this weekend. But I cannot promise to merge anything in the main branch because I have to test a lot of different cases.

sloweater commented 9 years ago

I had the same issue and "fixed" this by changing master/src/pipeline_utils.erl line 118. This is in the function group_outputs(split, Outputs). I changed pick_local_host(Reps) to none. Making this change lets the scheduler select a host to run on, rather than forcing the task to run on the host that gets randomly selected in this function.

This does mean the task can now get scheduled on a host that does not have the input local (although the scheduler prefers to put it on a host that does, if none of the locals are open it puts the task on any other free host). I don't think everyone would like that behavior so this is not a general solution. But I want to put it out there for anyone looking for a "dirty" answer right now. For my use this was good enough and better than waiting on one busy node.

pooya commented 9 years ago

@sloweater interesting. Are you sure you had the same issue?

sloweater commented 9 years ago

It was the same behavior described by ErikDubbelboer above, where tasks in the first stage were waiting to run on a node with fewer workers than the others. Oh, I was using the pipeline worker rather than the map-reduce, that probably makes a difference, didn't think about that before.

tigerite commented 9 years ago

Remember that the Map-Reduce paradigm is about bring computation to the data.

So if the majority of all your data is only exists on one node, then jobs must wait until workers are available on that node.

So you must fine tune the distribution of your data uniformly across the DDFS nodes.

Strategies to help distribute your data: 1) At the beginning of your job cycle, it helps if your DDFS nodes have the same amount of free space.

2) Replicate your data across more than one DDFS nodes. Test this out: if your cluster has 8 nodes, replicate each chunk of data across 8 nodes.

3) Use the disco apis to create your own data distribution algorithm. The disco api allows you to select which nodes you want the data to be distributed to and how many replicas.

For example, if your processing a data stream in real-time, create a data distribution algorithm that selects a DDFS node based on the number of available workers that are not busy.

Cheers!

erikdubbelboer commented 9 years ago

@tigerite We have this issue even though all data is available on all nodes so I'm afraid your suggestion doesn't fix the real problem.

tigerite commented 9 years ago

Hi Erik

I'm not doubting the disco job scheduler needs to be smarter.

The problem is that if you tell disco to process X chunks of data, it will pre-schedule X tasks to run on the available nodes by pre-selecting the input for each of these X job tasks. If the chunks of data are uniformly distributed or replicated across the nodes, there should be the same amount of tasks schedule for each node.

The issue is that each task will not complete in the same amount of time. So on nodes where the tasks complete quickly, they will become idle. On nodes where the tasks complete slower, it will take longer to complete all the tasks assigned to that node.

The disco scheduler needs to be updated to not assign all tasks by pre-selecting the inputs at the beginning of a job.

For example, if there are N available workers in the cluster, the disco scheduler should only schedule N tasks. Then, once a worker is free again, the scheduler can schedule a task to that free worker.

For example, say you have 3 DDFS/Disco nodes with 2 workers each. In total, 6 workers.

Now say your data set has 24 chunks and is spread out uniformly across the 3 DDFS nodes. So each node has 8 chunks each.

If your process tells disco to process all 24 chunks in one call ... by using one tag that points to all 24 chunks ... disco will or should assign 8 tasks to each node. (Further investigation indicates this will not always be the case...I believe its random?).

Now because each node only has 2 workers, only 2 task can be executed at any given time. So initially, there will be 3 x 2 = 6 tasks executing in parallel.

If each task completes in exactly the same amount time, when the first set of 6 tasks completes, the next 6 tasks gets executed.

But in reality, task completion varies. So some nodes will complete processing the tasks assign to them before other nodes.

The way I attempt to get around this issue is to use the disco APIs to schedule tasks based on the availability of workers as I have described. More specifically, my algorithm is as follows:

  1. If there are N workers and M disco nodes, I schedule N+M tasks.
  2. When C tasks complete, I schedule (N+M) - C new tasks.

Please note, that this helps to reduce the problem with idle workers but does not completely solve it. You will still get idle workers.

I hope this helps.