jcus0006 / mtdcovabm

Distributed Covid 19 Agent Based Model modelled on Maltese data.
0 stars 0 forks source link

Dask Optimisation #7

Open jcus0006 opened 1 year ago

jcus0006 commented 1 year ago

Is the current implementation of passing the partial structures optimal in terms of memory consumption, or is using Dask data structures potentially better?

Why doesn't client.scatter work? About the scatter, I can potentially try to debug it and find out at which point it gets stuck.

I can also potentially consider other means of communicating the data.

I have asked a question re the above on Stackoverflow and Dask discourse, but still awaiting feedback.

jcus0006 commented 1 year ago
jcus0006 commented 1 year ago

Tried the revised Batch task allocation, to avoid queuing all tasks at one go. Tried 3 runs (x 2): the non-recurring case, i.e. batch size of 120 to start off with, and then allocating a task, every time another task is "completed, and the recurring cases: i.e. recurring batch sizes of 120, and recurring batch sizes of 1.

The initial implementation with no batch task allocation took 1146.49 (and 1055.85) seconds. The client was using around 6.3gb of memory, the scheduler around 2.8gb, and the workers around 2.3gb each.

Tested on a single node with 4 workers, and with the "batch" approaches, the memory situation was better.

In the non-recurring case, the run took 992.67 (and 1111.07) seconds. The workers did not use up any more than 2gb (approx 200mb each less than the initial implementation), and the scheduler used 1.3gb, which is 1.5gb less than the 2.8gb in the initial implementation (significant memory improvement). The VM was using up around 85% of the memory. The first 3 workers were rather busy, with the CPU usage fluctuating between 20-80%. The 4th worker was not very busy, peaking at around 20%. This is likely because the allocation was finding the first 3 workers available, deeming the last worker rather unnecessary. image

The recurring batch allocation took 1241.74 (and 1480) seconds. The memory usage was very similar. The CPU seemed to be fluctuating significantly and using up more than the 80% peak of the non-recurring case. However, there were significant gaps in the occupancy chart, pertaining to the time where the client was syncing the results in the main process, and not allocating further tasks for processing. This seems to be the slowest approach, and the "gaps" may be the reason why.

Also tried a recurring batch size of 1, which took 2016.54 seconds. This initially returned issues because Python by default includes a Recursion limit of 1000, and the recursive method was re-calling itself for every subsequent call and analysis. The first worker was getting the brunt of it and peaking at around 60%, the other 4 workers were not doing much work, and peaking at 10%. When assigning 1 task at a time, the scheduler always assigns the task to the first available worker, which in this case is the first worker, and the next task is only assigned when the first task is returned to the main process, analysed and synced with the data structure in the main memory. Once the second task is assigned, the first worker will already be available, and hence will be assigned the task again and the other workers never get to do anything. This is essentially the same as running a single process application, with some additional communication overhead introduced by Dask.

A hybrid approach may be considered, whereby we assign 120 tasks as a batch, and then assign a new batch every time a result is analysed and synchronised with the main data structures (until there are no more tasks to process). The batch should be some multiple of the number of workers, but is likely not to make sense to be as much as 120. for e.g. x 2 or x 3 to the number of workers is a good starting point.

Note: 100 may be used instead of 120 as now handling is in place to make sure that the 999 recursion limit is never hit anyway. (120 was used to make sure there are less than 999 calls, now it is no longer required)

jcus0006 commented 1 year ago

Using client.scatter returns this error

distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.scatter local=tcp://127.0.0.1:58858 remote=tcop://127.0.1.1:35767>: Stream is closed

Using client.submit, and passing the structure as a JSON string, loading it into an object from the worker method, returning it as a response, and holding on to it as a future from the client side, then passing it around to the itinerary worker method, is causing some different issues.

  1. Considerable spike in memory usage on the workers (double the memory usage & more). Which is not proportional to the memory that is being shared with the workers. Around 200mb + 20mb.
  2. TypeError: Could not serialize object of type Vars (this is likely happening because the futures are held on a primary worker, and then parts of it are passed around to the other works on demand. it could likely be fixed by making sure that the class contains the required implementation to be pickled)
  3. RuntimeError: dictionary changed size during iteration (this is likely happening when trying to share partial parts of the future to the different workers, and these same data structures are being modified on the same worker on which it primary resides)

Also tried to send the whole structure to the client.submit and returning it immediately. The idea would be to hold on the returned future and pass it around. This client times out trying to look for the scheduler and is killed by Dask.

Probably have to also open another ticket to ask about it.

jcus0006 commented 1 year ago

Before I got your message, I had tried an approach that involves converting the structures into JSON text, passing the text as parameter to a "load_object_from_json" worker method, which loads an object from the JSON text. I then hold on to the futures and pass it around to all the subsequent tasks to the actual worker method. This results in excessive memory bloating. The spike of more than double the memory usage is not representative of the 200mb of data that I am trying to share with the workers.

Using UserDict instead of dict seems to get me a step further.

When using client.scatter with the broadcast=True param, it takes ages to see any tasks in the Task Stream, presumably because it is moving the data to all the workers prior to starting the work. This is still weird though, because I am only passing around 200 mb of data on 3 local workers. It takes so long that I always end up giving up.

When using broadcast=False, the work starts significantly fast. However, in the task stream, I constantly see red marks, pertaining to communication across the workers. And these take very long (6-16 seconds).

When using client.submit and queuing all the tasks at once, the memory usage is excessive, such that running with 4 workers is impossible (as it quickly runs out of memory). I tried an approach which assigns batches of tasks rather than all at the same time, however, this results in a lot of idle workers. When using the client.map with all tasks at once, the memory usage is problem, because the program runs out of memory before I get to see any tasks in the Task stream.

I also got 2 error messages when running these tests:

  1. TypeError: Could not serialize object of type Vars (this is likely happening because the futures are held on a specific worker, and then parts of it are passed around to the other works on demand. it could likely be fixed by making sure that the class contains the required implementation to be pickled)
  2. RuntimeError: dictionary changed size during iteration (this is likely happening when trying to share partial parts of the data passed around as futures to the different workers, as these same data structures are being modified on the worker on which it resides primarily)

All in all it seems that passing "partitions" of the data structures back and forth between for each tasks is the way to go. This approach doesn't pass around futures as parameters to the workers, it simply keeps the data being passed back and forth from each worker as small as possible.

I will now try to understand more, how can I possibly use Dask colelctions to optimise further.

It may also be possible to use some combination of "delayed", with Dask collections, or even with the Python collections, e.g. possibly passing the data structure as a lazy (delayed) object, and then passing a reference to it to another delayed function. (Note that using delayed, initially, did not return the best results)

x = np.array(...)    # some large array
x = dask.delayed(x)  # delay the data once
results = [dask.delayed(train)(x, i) for i in range(1000)]