Open jcus0006 opened 1 year ago
Prioritising getting a dedicated server with plenty resources, ideally to setup on the cloud.
Prof Abela asked a question about comparisons with multiprocessing. I initially thought that the difference in performance was due to using 10 processes with multiprocessing and 4 worker processes with Dask.
However, after the meeting I embarked on a journey to compare like with like.
Results for simulation in seconds:
Single node with no multiprocessing and no Dask: 385.93 Parallel 4 processes (with multiprocessing.Pool and shared memory): 160.6 Distributed 4 worker processes (with Dask client.submit and splitting the computation in a more fine-grained level): 1158.74
Multiprocessing with 4 workers is 2x as fast as the single node. but the Dask run is 7x as slow as the multiprocessing run, and 3x as slow as the single node run.
This is not promising. Dask is splitting the work for as many residences as there are, this might be too fine grained. So going for larger batches which are perhaps not as large as pure split by the number of workers: num of residences / num of workers, may be too large. Trying to find a middle ground.
When using dask.delayed, and splitting whole computation on all workers (i.e. num of residences / num workers)
6gb client 3.4gb scheduler 2.3gb average workers x 4
eventually ran out of memory.
dask delayed, splitting whole computation into 128 tasks, then on all workers, took 940.6 seconds (which is slightly better than 1158.74)
11 concurrent tasks per process (taking around 240 seconds each) with the following resource usage:
5.9gb client 3.4gb scheduler 2.3gb average workers
dask.delayed, splitting whole computation into 512 tasks, then on all workers, took 1156.04, which is worse than 128 tasks.
Using 11 concurrent tasks per process (which seems constant) and matches previous case, and taking around 80 seconds each, with a very similar memory usage as the above 2 cases.
The latter 2 cases were borderline using up all memory (first case actually ran out of memory). Hence, trying with 2 workers is a better approach for the time being. Just so it will have around 4gb of breathing room.
Some more ideas:
Dask collections are not usable. Base simulation model is heavily dependent on dicts for fast indexing. Dask Bags cannot be indexed like a dict, Dask DateFrame can be indexed like a dict but that would be much slower.
Did some further testing with the previous setups which indicated that client.submit is still faster:
With dask.delayed: 4 workers, 512 tasks: 1156.04s 2 workers, 512 tasks: 1289.81s 4 workers with 256 tasks: 1032.14s 4 workers, 128 tasks: 1158.87s
With client.submit: 4 workers 512 tasks, runs out of memory when queuing the 200th task 4 workers, 256 tasks also fails 4 workers, 128 tasks: 955.53s / 798.73 4 workers, 64 tasks: 938s
Did some further tests with a new chunking strategy, where "n" tasks are sent as a batch, their results are read, and then a new batch is sent. This once again, creates a situation where although the workers all hit the 100% mark (fluctuating), they don't do this continuously (there is an idle timeout), because of the time it takes for the client to receive and sync the results back with the main process (e.g. 1-3 seconds on every chunk). This alleviates the memory usage significantly though and by using this setup it would mean that we can use more workers on the same machine. 18.1GB peak means we could at the very least add another worker.
Revised this strategy to send chunks of residences to the workers instead of 1 residence per task. This gets rid of the idle time. The memory situation was similar, but the performance was still not good enough. Potentially a mapping strategy, to send a smaller list (control memory usage) is causing problems (slow down on the workers). - to update with timings
Once again tried identical strategy as multiprocessing and getting peak 18gb memory, 2.1gb on each worker and also scheduler. 325-340 seconds each on the worker side, with a total time of 1585.83. - To add more loggings and update with more insight.
Issue fixed. Client.submit - 4.9gb client, 2.9gb scheduler, 1.7gb x 4 workers. 280-320secs With 6 workers, 258.35 secs. The difference is not huge, but this is possibly affected by the fact that 6 workers would have all been on the same machine, and hence, somewhat hogged by each other competing for the same resources.
However, this is much better compared to before, where the distributed setup was much slower than the single node run, throughout. Single node with no multiprocessing and no dask had run at 385.93s. Multiprocessing had run at 160.6s (x 4 workers) but I don't think we can easily compare multiprocessing to Dask, 1) because Dask adds some additional overhead to assign tasks to different workers through a network, while multiprocessing manages this communication internally 2) with multiprocessing, while getting some more performance in the context of a single node, you cannot scale beyond the one node
Tried with 12 workers and 3 nodes (normal batching strategy): 218.19s
Tried an experiment, using client.map with batching. Since batching in this case is implemented part and parcel with the dask distributed framework I figured it might work faster than my custom batching strategy.
12 workers, 3 nodes, with batches of 1000 (itinerary): 364.74s 12 workers, 3 nodes, with batches of 100 (itinerary): 351.40s
Continued experimenting with scatter and pure scatter doesn't work (in the large simulation). Created another test harness to experiment with scatter and it is very slow even on a simple multi dimensional array, let alone a nested dict. However, client.submit does get further than scatter when used for the same purpose.
Was experimenting with client.submit (as a replacement for scatter) and was getting a lot of errors saying something along the lines of:
RuntimeError: dictionary changed size during iteration
Not sure whether this is being returned by the workers or by the client when reading the results of the worker.
Next steps:
Status Update
Modules have to be uploaded to the workers at the beginning of the simulation. Having a pre-configured static environment would be better. (I have left it at the side for now but there are some more things to try)
Implemented the "register_worker_callbacks", which transfers any static data to the workers at the beginning of the simulation and makes them available "statefully" for any subsequent worker methods.
Initially, when using multiprocessing, I was starting 10 processes, making sure any data is available locally within the process, and passing a subset of residences to be taken care of by each process. This clearly doesn't work with Dask. I have changed the itinerary to work at a more fine level, i.e. on a single residence. This requires smaller inputs, and returns smaller outputs.
This allowed me to run the first true "multi-host" distributed setup. However, the 2 extra machines I had both have only 8gb RAM. When using a Linux VM on an existing host, this is hardly enough. I am going to upgrade 1 of the laptops now and I am waiting for the parts to arrive.
In the strategy mentioned in point 5, any data required by a particular Dask task is passed as a parameter to the worker method. This is kept as small as possible (the same applies for the returns), however, it still incurs some communication overhead. I considered using an approach where the required data, rather than split and passed through to each task (with more than 100k residences), is passed to the worker method before starting the tasks. Like this the communication overhead is only incurred once. However, this approach is causing a lot of problems related to the memory. I am using the "client.scatter" option, which is supposedly perfect for this case. However, it uses serialization and deserialization to pass the data from the client, to the workers, through the scheduler, and it seems that it is not ideal for the structure (nested dict).
I have started looking into either changing the data representation and the data structures to something which is flatter, or using Dask collections. Created this issue for potentially optimising the data structures in such a way that they become easier to move around the cluster.
If I were to change the data structures, ensuring fast access of data is as important as improving the communication overhead. Indeed it would not make sense to improve the distribution, but then degrade data access from the worker methods significantly.
I started looking into Dask Collections which seem to be a set of very optimised data collections. Dask.Bag (used for structured and semi structured data, e.g. dicts or lists), Dask.Array (kind of parallel Numpy), and Dask.DataFrame (kind of parallel Pandas). Each of these are implemented by using partitions or blocks, and then splitting the computation on the available workers on the cluster (or threads if not within a cluster). They all feature a lot of methods that provide computation directly on the content of the collection itself, which is not the case for the simulation. However, they work particularly well with the Dask "delayed" library that allows for more custom logic to be used (which is more applicable).
As a starting point I might try to convert my implementation to use Dask.Bag with my nested data structures, and convert my client.submit implementation to use the "delayed" library. But re-thinking the data structures to consider the other data collections, might also make sense (i.e. point 6).