vatlab / sos

SoS workflow system for daily data analysis
http://vatlab.github.io/sos-docs
BSD 3-Clause "New" or "Revised" License
271 stars 45 forks source link

SoS performance dealing with large number of files #874

Closed gaow closed 6 years ago

gaow commented 6 years ago

This bothers me when I do some very simple simulations:

[1]
output: [f"performance_test/{x+1}.out" for x in range(500)]
run:
  touch performance_test/{1..500}.out

[2]
r = [x for x in range(500)]
input: group_by = 1, paired_with = 'r'
output: [f"performance_test/{x+1}.rds" for x in range(500)], group_by = 1
task: concurrent = True
R: expand = '${ }'
  x = rnorm(${_r[0]})
  saveRDS(x, ${_output:r})

If you run this script, you'll see it halts for a second or 2 at the end of every batch of completed jobs. I can understand that things like signature checks etc are on going. Therefore a simple simulation that takes < 10 sec as a for loop can take as much as > 700 secs with SoS -- the overhead takes way longer time than the actual computation. I remember it used to be 10 sec vs > 100 secs before last summer. Now I guess as signature check becomes more strict and careful about racing conditions the whole process is a lot slower. Is there still room for optimization?

BoPeng commented 6 years ago

I can now my numbers here but what are your numbers for

  1. time sos run test -J20
    real    5m38.811s
    user    28m20.086s
    sys 11m22.295s

    without -J20 (use default 4 for my 8 core system)

    real    7m6.809s
    user    20m12.248s
    sys 7m23.454s

    with trunk_size=20

    real    1m59.531s
    user    5m4.720s
    sys 1m22.205s
  2. time sos run test for test without task
    real    3m38.740s
    user    2m51.102s
    sys 0m37.695s
  3. A shell script that calls R scripts?
gaow commented 6 years ago

Okey here are my stats:

  1. time sos run test -J20
real    3m5.234s
user    17m18.540s
sys 1m27.200s
  1. with trunk_size=20 and use -J20 -s force:
real    0m39.243s
user    3m36.968s
sys 0m25.344s
  1. without task and use -s force
real    1m47.409s
user    1m28.336s
sys 0m9.592s
  1. with 500 bash sequential Rscript command:
time for i in `ls -t .sos/default_2_*.R | head -500`; do Rscript $i; done
real    0m52.769s
user    0m37.412s
sys 0m5.676s
  1. within an R loop just for the sake of arguments against using pipelines
real    0m0.751s
user    0m0.176s
sys 0m0.028s

I wonder what your take is on this. I did not know trunk_size can help that much, though, and from that I can now see the file signature was not the problem because with and without trunk_size the number of signatures to process are the same. A lot times when I top my screen all I see are sos using full CPU resources rather than the actual computation.

BoPeng commented 6 years ago

This basically shows high overhead on the dispatching of sos tasks. The tasks were dispatched in parallel by a few threads but it caused a nasty bug because env is shared by these threads so submission of a task affected the status of other pending tasks. Then a task engine has to run a for loop with sleep in between to submit jobs one by one.

I suppose some optimization can be done in this area.

gaow commented 6 years ago

I see ... I guess it might be particularly frustrating to people who work on methods development that often sends lots of jobs out to test for different models on the same dataset. I'm in a collaboration on an RNA project that tries to use CNN-based classifiers that requires lots of runs to tune and we want to keep track of all the outcome ... Anyways, a perhaps easier optimization is to automatically decide trunk size when not specified? Because it seems to save lots of time when properly configured, but this difference in behavior that relies on a not so obvious approach is not as elegant as automatically optimizing it.

BoPeng commented 6 years ago

The pause between the completion of tasks and starting of the next batch of tasks is for checking the status of tasks. Basically, after the tasks are completed (print completed), the task engine has its own timing to check the status of the tasks, which basically calls a sos status command (a separate process) at every 2 seconds (on local host) to check the status of tasks. This is the only way to check the status of remote tasks but we could optimize it for local tasks.

gaow commented 6 years ago

Ah that's consistent with my feeling about the 2 sec pause. Thanks for clarifying! I guess it is completely acceptable to have remote tasks pause every 2 sec. From my observation people who work on methods development tend to use local hosts more, because they often try lots of stuff with very light input data (vs. only a few heavy jobs with large data on cluster for real data analysis). For local task it would be great if we can optimize!

BoPeng commented 6 years ago

I tried to adjust the code (e.g reduce waiting time, shortcut status checking) but do not see any obvious improvement. Basically, the separation of task execution and monitoring come at a cost that is hard to reduce. Unless we use some more efficient communication method (some message queue such as redis for RQ and redis and others for Celery) to exchange information, I doubt we can do much about it.

If you have to use task and do not want the overhead of truly independent tasks, the only option I can see is to bringing back the old task code, with something like

task: inline=True

In this method, the tasks would be executed inside of SoS. They would not have task id and all the status stuff but will be executed in parallel.

gaow commented 6 years ago

I see ... so what's the drawback for having to use the old task code on localhost, other than having to maintain 2 mechanisms? (I understand you'd hate that but as you said it looks like the only way out). When you say "truly independent" you mean tasks are still determined and distributed via DAG, but using something like cmd_run in SoS rather than sending them to extern sos calls? If there are not any bad side effects, does it make sense to make it default for localhost?

BoPeng commented 6 years ago

Basically tasks are executed and monitored outside of SoS with their own signatures. That is to say,

  1. When you submit a task (e.g. to a remote host), you can check its status from another machine, or from Jupyter.
  2. Tasks can be translated to be executed on a remote host.
  3. Tasks can be executed in parallel (because they are independent)
  4. Tasks can be started, killed and restarted outside of SoS (sos execute, sos kill etc) after they are created.
  5. Tasks will continue to execute even if the main sos process is killed.
  6. Tasks will not be re-executed if you try to re-run it from a modified or another workflow as long as the tasks themselves are unchanged.

An inline task would basically be the old task execution method that has none of the above benefits. It essentially creates a worker pool to execute the task. There would be no signature and no external monitoring, and the tasks will be killed when the main process is killed.

BoPeng commented 6 years ago

Technically speaking, internally-executed tasks could also have external task files and could also maintain signatures so 1,3,4,6 could hold. so the only differences would be 2 and 5, because

  1. External tasks are executed by sos execute and monitored by sos status out side of sos.
  2. Internal tasks can be executed directly in separate processes. Because sos creates the process, there is no need to run sos status to check their status, although things would go wrong if the tasks are killed externally.
gaow commented 6 years ago

Ha I was about to post a similar comment that states that in the context of localhost only 5 would truly hurt (I failed to see how 2 hurts because it is going on remote anyways). But for 5, if we talk about computations in the same SoS script then it will still properly build a DAG so that no re-run is needed for steps that has been executed before? I think it is still the case, as far as I recall about the old behavior.

It'd be more practical (and less puzzling + frustrating) to beginners or to people who solely works on desktops to be able to run the old task model, if maintaining 2 mechanisms is not too bad.

BoPeng commented 6 years ago

Not quite sure about the if part. The old internal behavior can co-exist with the task model well because it does not have anything external (no task file, no signature) so what you gain is only a way to run part of the step in parallel. If we implement task file and signature, things can get complicated for cases such as if the same task has been running externally, if the task is killed externally, basically we will have to check the status of tasks, which can again be inefficient.

gaow commented 6 years ago

so what you gain is only a way to run part of the step in parallel.

Sorry I think this is where my major confusion is. Since it seems to be a most obvious practical reason against not using task file and signature I'd like to clarify this: suppose we have a outcome-oriented style workflow that properly builds a full DAG, do I still just get part of the steps running in parallel on localhost ? I agree maybe moving back the codes is good enough; anything beyond that will add lots of overhead.

A practical user case would be prototyping on localhost with inline to get responses quickly, then use the actual task model for computation on larger data with a lot more computational resources.

gaow commented 6 years ago

Another possibility, as I pointed earlier, is to use auto chunk_size. I understand it will make the workflow less portable, but at least on a single machine the automatic task chunk_size should be consistent so we'll be set on one (or to some, the only) computational environment.

BoPeng commented 6 years ago

We are talking about two levels of parallelisms: DAG level and step level. At the step level, currently only tasks can be run in parallel as external tasks. At the DAG level, independent paths will be executed in parallel, and the steps will wait for the completion of tasks, regardless of external or internal.

However, one of the motivations for the current implementation is that sos currently uses workers for both step and tasks, so -j8 will have at most 8 workers for either step or task (steps waiting for tasks do not count). In the internal model of parallelism, sos does not know how many processes are used in each step so at most 64 workers would be used if 8 steps each running 8 internal tasks....

BoPeng commented 6 years ago

Automatic trunk size sounds like a good idea. It basically assumes tasks (from the same step) will take roughly the same time to complete, group tasks and send groups of tasks to a small number of workers. The main benefit is that it checks status of a much smaller number of tasks and tasks within each group can be executed sequentially without checking status of tasks.

The only problems are error handling #772, and checking status of individual tasks.

gaow commented 6 years ago

Ahh I see, it is -j vs -J. Sorry it should have occurred to me ... in localhost environment a restriction of, say 64 workers, should not be too bad, compared to the overhead it involves to run external tasks. I think localhost is in practice a type of machine different enough to warrant seperate implementation.

772 looks like a very practical issue that can be quite inconvenient in research mode ... so the idea of auto trunking is now less appealing to me unless there is a way out of that.

BoPeng commented 6 years ago

A way out is actually not that hard. Since the main problem was frequent checking of task status to submit a new task, we could potentially send groups of tasks to the executor and let it run it one by one. In this way we know the number of concurrent jobs (groups) and the executor can execute the tasks more efficiently.

gaow commented 6 years ago

Oh then it is auto chunking but happens at a different level? It sounds like an efficient solution!

BoPeng commented 6 years ago

The patch makes use of sos execute ID ID ID ... to submit multiple tasks together so that tasks are submitted in batches (controlled by max number of jobs) while executed sequentially in each batch. The status checking code is untouched but it does not affect performance nearly as much because sos only need to submit tasks one or two times, instead of 700/max_running_jobs times.

My numbers (-J4, without trunk_size) are reduced from

real    7m6.809s
user    20m12.248s
sys 7m23.454s

to

real    1m58.839s
user    6m17.964s
sys 1m58.265s

with the patch.

gaow commented 6 years ago

Great! I have just checked. But improvement on my end seems less significant than on yours. It reduced from above: (-J20 not trunk_size)

real    3m5.234s
user    17m18.540s
sys 1m27.200s

to

real    2m1.415s
user    13m45.260s
sys 1m6.136s

So there is certainly improvement, though still not as fast as with trunk_size.

The status checking code is untouched but it does not affect performance nearly as much

Does it mean the checks are still sos status ID (not ID ID ID ...)? Not sure if that hurts. Using top I see now there are finally more R processes showing up (at any given time point) than before, yet still there are a lot sos processes that can occupy all the CPU processes for a while.

BoPeng commented 6 years ago

sos status ID ID ID applies to all running tasks. What I meant was that previously, assuming -J4, we

  1. submit IDs as four sos execute ID.
  2. sos status ID ID ID ID
  3. submit another four.

Right now, we

  1. submit all IDs as four sos execute ID ID ID ID ID ... each with 700/4 tasks.
  2. sos status all IDs
  3. No need to submit more IDs. In practice, not all tasks were ready at step 1 so there can be one or more batches, but that is about it.
gaow commented 6 years ago

Thanks for the clarification. I see that it is the best we can do with the new task model without trunks. My concern is, don't we want to use an upper limit? I believe command line can't be arbitrarily long. I worry what happen if there are say 5000 tasks at one step

BoPeng commented 6 years ago

In my case -J20 is much worse

real    5m32.041s
user    28m7.867s
sys 12m0.910s

then -J4

real    1m58.839s
user    6m17.964s
sys 1m58.265s

because my system cannot actually afford 20 concurrent tasks so there are a lot of competitions.

BoPeng commented 6 years ago

1000 tasks lead roughly to 33k command line length, less than the 131k system limit.

gaow commented 6 years ago

because my system cannot actually afford 20 concurrent tasks so there are a lot of competitions.

It is good thing that we have a reasonable default for -J.

So this fix is in fact an improvement to the current task model whether it be localhost or not. Does it still warrant to bring the old task code back for localhost, while fully realizing its limitations? In that case (one'd do inline + -J 1 -j 20 and the runs / results are not shared cross-script)

BoPeng commented 6 years ago

Do we still need the inline option? It is only useful for large amount of tiny tasks ...

gaow commented 6 years ago

It is only useful for large amount of tiny tasks ...

Agreed. It may also depend on how people organize their projects but I see a value in that. This will be true for methods projects when people would want to try a lot of settings. For me, even in real data analysis related to RNA-expression mapping / feature selection type of studies where I break data first into > 30K genes then do all types of analysis applying various methods, creating a situation of large amount of tiny tasks; and is often restricted to my 40-thread desktop so that I can get quick feedback.

gaow commented 6 years ago

However this will add an additional parameter to the task option list that is a bit difficult to get the idea across to novices. I was thinking about some type of parameter combinations that triggers it by default so we do not have it on the interface; that makes SoS also kind of self-adaptive.

BoPeng commented 6 years ago

It can be an option to input. That is to say, we are processing input groups sequentially but if there are no dependencies, we can say

input: group_by, ...concurrent=True

then the entire input groups will be executed in parallel. This will not change the definition of task (as external) and will help the general execution of input groups.

gaow commented 6 years ago

I see, then for such computing environment we'll just drop task altogether, right? Then -J will not have any effect and -j will be what we'd configure.

if there are no dependencies

Well at least this is what I expect for input. I have not even thought about input dependencies (operations on latter input depends on previous) -- an interesting thought that is good to be aware of!

BoPeng commented 6 years ago

Yes, because task: inline=True just allow parallel execution of task statements inside SoS, which makes it non-task, and input: concurrent=True solves the problem in a more general way. Note that step_output would be unavailable to input groups in this case.

gaow commented 6 years ago

Note that step_output would be unavailable to input groups in this case.

Well, it sounds like corner case that could bother us if not properly handled (do you foresee this will happen a lot?) ... but other than this I'd say I'm all in for concurrent = True in input :)

BoPeng commented 6 years ago

step_output is by design the output of the entire step. I would even say it was incorrect to make it available to the step itself because an input group is not supposed to know the output of other groups. We ended up doing something very complicated with this variable (accumulative of _output and adjust step_output if an input group fails), and it would have been easier if we simply provide step_output only to other steps.

BoPeng commented 6 years ago

With concurrent=True,

real    0m58.640s
user    3m14.922s
sys 0m46.841s
gaow commented 6 years ago

Great! but I'm having this issue:

[GW] time sos run performance.sos -j8 -J 1 -s force

INFO: output:   [file_target('performance_test/1.out'), ...] (500 items)
INFO: Executing default_2: 
INFO: input:   [file_target('performance_test/1.out'), ...] (500 items)
INFO: Waiting on another process for step default_2
Waiting for another process for output [file_target('performance_test/99.rds')] Waiting for another process for output [file_target('performance_test/99.rds')] Waiting for another process for output [file_target('performance_test/99.rds')] Waiting for another process for output [file_target('performance_test/99.rds')] Waiting for another process for output [file_target('performance_test/99.rds')] Waiting for another process for output [file_target('performance_test/99.rds')] Waiting for another process for output [file_target('performance_test/99.rds')] Waiting for another process for output [file_target('performance_test/99.rds')] Waiting for another process for output [file_target('performance_test/99.rds')] Waiting for another process for output [file_target('performance_test/99.rds')] Waiting for another process for output [file_target('performance_test/99.rds')] Waiting for another process for output [file_target('performance_test/99.rds')] Waiting for another process for output [file_target('performance_test/99.rds')] Waiting for another process for output [file_target('performance_test/99.rds')] Waiting for 

it hangs ... I think has something to do with pre-mature stop I created ealier with ctrl-c, not J or j

BoPeng commented 6 years ago

removing .sos and ~/.sos/.runtime and try again?

gaow commented 6 years ago

It worked, I actually tried to remove something from .sos. Here is what I did:

33502  01/02/18 13:53:15 rm -rf .sos/*
33504  01/02/18 13:53:20 time sos run performance.sos  -s force
33507  01/02/18 13:53:47 rm -rf ~/.sos/*.status
33508  01/02/18 13:53:49 time sos run performance.sos  -s force
33509  01/02/18 13:54:33 rm -rf ~/.sos/.runtime/
33510  01/02/18 13:54:35 time sos run performance.sos  -s force
33511  01/02/18 13:54:50 rm -rf .sos
33512  01/02/18 13:54:52 time sos run performance.sos  -s force

only the last command worked! Now performance:

real    0m33.727s
user    1m53.940s
sys 0m12.240s

This is very satisfying.

So other than the hang issue, and without reading the patch in detail: what if I then put in task: concurrent=True? Will it then automatically ignore input: concurrent = True? Or it does not really matter?

BoPeng commented 6 years ago

Tasks are not allowed in concurrent input groups.

gaow commented 6 years ago

Yes I understand. I see now an exception will be raised. It is perhaps too much to ask, but when switching between local to remote executions users will have to significantly modify their script. I wonder if it makes sense that we sweep this under the rug for them.

BoPeng commented 6 years ago

Do you mean ignoring input: concurrent=True for steps with tasks? It is possible to have both options but the complicated cases (e.g. one True the other False) do not make much sense, because tasks are usually a big trunk of input group so input: concurrent=True/False + task would be the same to input:+ task.

BoPeng commented 6 years ago

What I meant was that

input:
task:

will submit tasks sequentially which will then be executed in parallel, and

input: concurrent=True
task:

would make no difference because it only speed up task submission (on paper).

gaow commented 6 years ago

I understand and agree with your point. I was thinking of making switching back/forth a bit easier.

the complicated cases (e.g. one True the other False) do not make much sense

I'd say maybe we only raise exception in this case? ie

  1. Given existing possible input: concurent, check the newly added task to see if concurrent is defined or not.
  2. If concurrent is in both task and input, we check if the True/False status agree. An exception will be raised if not agree
  3. If task does not have concurrent we use that from input and drop it from input, if applicable; otherwise we stick to what task specifies, and ignore whatever there is in input

I would also not complain if we keep the current behavior. After all I was the one who insisted on making them separated cases from the start :)

BoPeng commented 6 years ago

It is not only about concurrent options, but the cases of

input: concurrent=True
statement 1
task:
statement 2

This is currently not allowed because of complications in logic and implementation (the concurrent input groups, now in separate processes, need to be clever enough to handle tasks).

Since input: concurrent=True is the only new feature (new behavior), my proposal was to ignore this option when task is present because the task will be executed in parallel anyway. If you want to run the tasks internally, you will have to remove the task line.

gaow commented 6 years ago

Since input: concurrent=True is the only new feature (new behavior), my proposal was to ignore this option when task is present.

I think this is reasonable.

BTW this new input: concurrent = True takes 23 secs with -j 8, compared to 13 secs when I run 8 Rscript commands in parallel. I think it is very good indeed. xxhash64 did not help at all perhaps because the files are too small anyways.

gaow commented 6 years ago

Sorry I just want to double-check the defaults here: we have -J N by default where 1/2 of the CPU threads. How about -j -- shall we also default it to -j N? Will -j N -J N combination create problems?

BoPeng commented 6 years ago

-j is for the execution inside SoS (DAG and input groups), -J is for the execution outside of SoS (task queue). Currently -J has a default of N=nCPU/2 while -j has a default of 4, which was set more or less arbitrarily. I suppose it makes sense also set -j=N. -j and -J and more or less independent and I have not had a case that has both internal and external parallelism on the same machine (I always use cluster).

gaow commented 6 years ago

I have not had a case that has both internal and external parallelism on the same machine (I always use cluster).

Indeed, I think this justifies setting -j N by default.