dask / dask

Parallel computing with task scheduling
https://dask.org
BSD 3-Clause "New" or "Revised" License
12.22k stars 1.69k forks source link

Add NumPy's kron for Dask Arrays #3657

Open jakirkham opened 6 years ago

jakirkham commented 6 years ago

Would be nice to have a Dask Array implementation of NumPy's kron. A generalization of outer products for two tensors.

jakirkham commented 6 years ago

NumPy implements kron using outer and reshaping. Would be possible to do the same thing for Dask Arrays. That said, expect there are better options. Suggestions welcome.

mrocklin commented 5 years ago

Is this doable with atop or slicing syntax do you think?

Marking this as good first issue, but it may not be. I think that this requires someone to think about it a bit, and then look at the dask/array/core.py::atop function and see if there is a nice way to implement it.

jakirkham commented 5 years ago

Avoiding reshaping is probably better. It may be possible with atop, but agree it requires a bit of thought.

mathdugre commented 5 years ago

Hi, is this issue solved? I would like to work on it

jakirkham commented 5 years ago

It is still open. Please feel free to work on it if it is of interest. Would be happy to review a PR. 😄

Madhu94 commented 4 years ago

@mathdugre Please do let me know if you are working on this, I'd like to give it a try

mathdugre commented 4 years ago

@Madhu94 I didn't get the time to work on it, feel free to do so :)

Madhu94 commented 4 years ago

thanks! Starting on it.

gitkarma commented 4 years ago

@Madhu94 any progress on it. I was also looking for the feature

Madhu94 commented 4 years ago

@gitkarma No I could not find the time. Still interested in exploring this, but if anyone else wants to push this through in the interests of time, please take it up

eric-czech commented 4 years ago

Is this rechunking into 1x1 chunks for one of the arguments likely to be as bad or worse than reshaping? I can't figure out how to do it otherwise with blockwise, though this seems like it could be on the right track:

x = da.arange(36).reshape(9, 4).rechunk((1, 1))
y = da.ones(36).reshape(9, 4).rechunk((3, 2))
z = da.blockwise(
    np.multiply, 'ij', x, 'ij', y, 'xy', concatenate=True, dtype='f8',
    adjust_chunks={'i': y.shape[0], 'j': y.shape[1]}
)
z.shape # -> (81, 16)
# Compute on kron inputs to silence "Not implemented by Dask ..." warning
assert np.all(np.kron(x.compute(), y.compute()) == z.compute())
z.compute()
array([
       [ 0.,  0.,  0., ...,  3.,  3.,  3.],
       [ 0.,  0.,  0., ...,  3.,  3.,  3.],
       [ 0.,  0.,  0., ...,  3.,  3.,  3.],
       ...,
       [32., 32., 32., ..., 35., 35., 35.],
       [32., 32., 32., ..., 35., 35., 35.],
       [32., 32., 32., ..., 35., 35., 35.]
])

Separately, is this a bug or expected behavior?

x = da.arange(36).reshape(9, 4).rechunk((1, 1))
y = da.ones(36).reshape(9, 4).rechunk((3, 2))
z = da.blockwise(
    np.multiply, 'ij', x, 'ij', y, 'xy', concatenate=True, dtype='f8',
    # Leave this out to show that inferred shape is wrong
    # adjust_chunks={'i': x.shape[0], 'j': y.shape[1]}
)
print(z.shape) # --> (9, 4) 
print(z.compute().shape) # --> (81, 16)
# And this still works:
assert np.all(np.kron(x.compute(), y.compute()) == z.compute())
jakirkham commented 4 years ago

Thanks for sharing Eric! 😄

Yeah that's a good question. Rechunking is generally expensive. However reshaping will result in some smearing and remixing of chunks, which comes with its own cost. Probably requires some benchmarking to know for sure.

That said, there might be ways to tweak what you came up with to improve things further before measuring timings. 🙂

eric-czech commented 4 years ago

This might be an improvement:

x = da.arange(36).reshape(9, 4).rechunk((3, 2))
y = da.ones(36).reshape(9, 4).rechunk((3, 2))
z = da.blockwise(
    np.kron, 'ab', x, 'ij', y, 'ij', concatenate=True, dtype='f8',
    new_axes={'a': x.shape[0] * y.shape[0], 'b': x.shape[1] * y.shape[1]}
)
assert np.all(np.kron(x, y) == z.compute())
assert z.shape == z.compute().shape
z.compute()
array([[ 0.,  0.,  0., ...,  3.,  3.,  3.],
       [ 0.,  0.,  0., ...,  3.,  3.,  3.],
       [ 0.,  0.,  0., ...,  3.,  3.,  3.],
       ...,
       [32., 32., 32., ..., 35., 35., 35.],
       [32., 32., 32., ..., 35., 35., 35.],
       [32., 32., 32., ..., 35., 35., 35.]])

I don't think it's what I want though since passing concatenate=False gives blocks in lists and building one giant result as the concatenation of all kronecker products would be a bad idea. I'll keep experimenting ..

jakirkham commented 4 years ago

Thanks for the continued exploration here Eric! 😄

Agree this is looking better. 🙂

Yeah concatenate=True does come with some overhead, but it can be a little tricky to work with concatenate=False. One thing that can help here is writing a kernel function that wraps np.kron while working with the nested list/array structure. 😉

eric-czech commented 4 years ago

Thanks @jakirkham! With some guidance from @mrocklin I tried building a custom graph for this and benchmarked it a bit, in this gist, against the first blockwise version above as well as an implementation with da.block, which is as simple as this:

x = da.arange(36).reshape(9, 4).rechunk((3, 2))
y = da.ones(36, dtype=x.dtype).reshape(9, 4).rechunk((3, 2))
z = da.block([
        [x[i, j] * y for j in range(x.shape[1])]
        for i in range(x.shape[0])
])

I found that the original blockwise version (w/ 1x1 rechunking) was fastest followed by my custom graph -- the da.block specification was a distant 3rd. It isn't a thorough benchmark but all of them are at least 60x slower than np.kron and 100x slower than an element-wise multiplication (via Dask) involving the same number of FLOPs so unfortunately I don't think any of these are very good. I may circle back after learning more about how to profile what's going on effectively, but I'll leave it at this for now unless anybody sees obvious avenues for improvement.

jakirkham commented 4 years ago

Thanks for the update Eric!

This more recent implementation reminds me of issue ( https://github.com/dask/dask/issues/2946 ) with da.repeat. I’m wondering if solving that would be a useful building block here. Or possibly illuminate another method to approach this issue with.

mrocklin commented 4 years ago

@eric-czech in your case I recommend trying the method you have above with da.block but where x is a Numpy array rather than a Dask array.

eric-czech commented 4 years ago

That ended up being about 2x slower than the Dask-only version (updated gist).

mrocklin commented 4 years ago

Here is my run through of your code:

In [1]: import dask.array as da, numpy as np

In [2]: def kron_v2(x, y):^M
   ...:     return da.block([^M
   ...:         [x[i, j] * y for j in range(x.shape[1])]^M
   ...:         for i in range(x.shape[0])^M
   ...:     ])
   ...:

In [3]: xl = da.arange(50*20).reshape(50, 20).rechunk((25, 4)).persist()

In [4]: yl = da.ones(xl.size, dtype=xl.dtype).reshape(xl.shape).rechunk(xl.chunksize).persist()

In [5]: %time zl = kron_v2(xl, yl)
Wall time: 616 ms

In [6]: %time _ = zl.compute()
Wall time: 1.79 s

In [7]: xl = xl.compute()

In [8]: %time zl = kron_v2(xl, yl)
Wall time: 419 ms

In [9]: %time _ = zl.compute()
Wall time: 1.66 s

In [10]: len(zl.__dask_graph__())
Out[10]: 30010

In [11]: from dask.utils import format_time

In [12]: format_time(1.66 / len(zl.__dask_graph__()))
Out[12]: '55.31 us'
Copy-pasteable version here ```python import dask.array as da, numpy as np def kron_v2(x, y): return da.block([ [x[i, j] * y for j in range(x.shape[1])] for i in range(x.shape[0]) ]) xl = da.arange(50*20).reshape(50, 20).rechunk((25, 4)).persist() yl = da.ones(xl.size, dtype=xl.dtype).reshape(xl.shape).rechunk(xl.chunksize).persist() %time zl = kron_v2(xl, yl) %time _ = zl.compute() xl = xl.compute() %time zl = kron_v2(xl, yl) %time _ = zl.compute() len(zl.__dask_graph__()) from dask.utils import format_time format_time(1.66 / len(zl.__dask_graph__())) ```

My guess is that we're bound based on the task throughput of the scheduler. 50us/task is pretty good for us. If we wanted to go faster than this then we would probably want to start bundling a few chunks together at once, perhaps by calling np.kron within the kron_v2 function, rather than using *, and pulling off chunks of the input numpy array. To do this well we would also want to look at the chunk size, and the ideal chunk size (see dask.config.get("array.chunk-size") and choose blocks of x accordingly.

mrocklin commented 4 years ago

Or maybe the answer is just to rechunk y before blocking. That might be simpler. Mostly we want to avoid the situation where we have very many very small chunks. This creates unpleasant execution overhead.

eric-czech commented 4 years ago

Hey @mrocklin, thanks for taking a deeper look! Knowing 50us/task is helpful as is that rechunking is an approach you would take to reduce the number of tasks.

I was able to get comparable times to what you had if I switch to the single-threaded scheduler, but could I get some intuition on why the distributed scheduler even with 1 worker and 1 thread per worker is >10x slower? I was thinking that it makes more sense to optimize the process for that scheduler -- would you agree? Or is it likely that optimizing any process for a local scheduler will translate well (i.e. we still just want to avoid small chunks -- nothing more complicated)?

Thanks again for taking a look.

mrocklin commented 4 years ago

The distributed scheduler has higher per-task overheads than the single-node scheduler, often in the few-hundred microseconds per task. Because of this, we want to keep task compute times well above the millisecond range (possibly much higher, depending on the scale that you're looking for)

This is made easier if we keep chunk sizes large-ish, and also try hard to fuse operations (another goal of yours, I believe).

This example has chunksizes of (25, 4), which is small for us. In practice chunk sizes tend to be in the 100MB range. See best practice docs here:

mrocklin commented 4 years ago

So for this particular example we can significantly reduce compute time by reducing the chunk count of yl

In [5]: yl = yl.rechunk("auto").persist()                                                                

In [6]: %time zl = kron_v2(xl, yl) 
   ...: %time _ = zl.compute()                                                                           
CPU times: user 642 ms, sys: 29.5 ms, total: 671 ms
Wall time: 627 ms
CPU times: user 305 ms, sys: 119 µs, total: 305 ms
Wall time: 291 ms

This is still bound by the size of xl though, which has 1000 elements, and so creates 1000 chunks, regardless of how large yl is.

Here is another, perhaps more extreme, example that shows the effect where xl is quite small, but where we apply kron repeatedly (as I think is probably also your use case)

In [14]: xl = np.array([[1, 2], [3, 4]])                                                                 
In [15]: yl = da.ones((100, 100)).persist()                                                              

In [16]: %time zl = kron_v2(xl, yl)                                                                      
CPU times: user 12.2 ms, sys: 0 ns, total: 12.2 ms
Wall time: 11.7 ms

In [17]: %time zl = kron_v2(xl, zl)  # reapply kron onto zl repeatedly
CPU times: user 4.11 ms, sys: 1 µs, total: 4.11 ms
Wall time: 3.84 ms

In [18]: %time zl = kron_v2(xl, zl)                                                                      
CPU times: user 11.1 ms, sys: 0 ns, total: 11.1 ms
Wall time: 10.6 ms

In [19]: %time zl = kron_v2(xl, zl)                                                                      
CPU times: user 12.6 ms, sys: 0 ns, total: 12.6 ms
Wall time: 12.1 ms

In [20]: zl                                                                                              
Out[20]: dask.array<concatenate, shape=(1600, 1600), dtype=float64, chunksize=(100, 100), chunktype=numpy.ndarray>

In [21]: %time zl = kron_v2(xl, zl)  # with larger graphs on the right, graph construction times start to increase                
CPU times: user 18.1 ms, sys: 0 ns, total: 18.1 ms
Wall time: 17.6 ms

In [22]: %time zl = kron_v2(xl, zl)                                                                      
CPU times: user 43.1 ms, sys: 92 µs, total: 43.2 ms
Wall time: 42.3 ms

In [23]: %time zl = kron_v2(xl, zl)                                                                      
CPU times: user 111 ms, sys: 4.04 ms, total: 115 ms
Wall time: 114 ms

In [24]: zl                                                                                              
Out[24]: dask.array<concatenate, shape=(12800, 12800), dtype=float64, chunksize=(100, 100), chunktype=numpy.ndarray>

In [25]: zl = zl.rechunk("auto")  # but fortunately 100x100 is still quite small, so let's rechunk

In [26]: %time zl = kron_v2(xl, zl) # and it's decently fast again
CPU times: user 12.6 ms, sys: 14 µs, total: 12.6 ms
Wall time: 11.7 ms

In [27]: %time zl = kron_v2(xl, zl)                                                                      
CPU times: user 13.9 ms, sys: 0 ns, total: 13.9 ms
Wall time: 13 ms

In [28]: zl                                                                                              
Out[28]: dask.array<concatenate, shape=(51200, 51200), dtype=float64, chunksize=(4000, 4000), chunktype=numpy.ndarray>

In [29]: %time zl.sum().compute()                                                                        
CPU times: user 42.5 s, sys: 21.2 s, total: 1min 3s
Wall time: 12.1 s
Out[29]: 10000000000000.0
eric-czech commented 4 years ago

Hey @mrocklin sorry for the delay.

That much makes sense, and I have one last performance question then. A more representative version of the use case I have would look like this:

import dask.array as da, numpy as np
import dask
dask.config.set(scheduler='single-threaded')

def kron_v2(x, y):
    return da.block([
        [x[i, j] * y for j in range(x.shape[1])]
        for i in range(x.shape[0])
    ])

# Multiply tiny 2x2 by giant tall and skinny matrix
x = np.ones((2, 2))
y = da.ones((1000000, 1000)).rechunk('auto').persist() # 1M x 1k
y.chunksize, y.numblocks # 125MB chunks
# > ((15625, 1000), (64, 1))
z = kron_v2(x, y)

%time _ = z.compute()
# CPU times: user 4.52 s, sys: 3.91 s, total: 8.43 s
# Wall time: 8.43 s

len(z.__dask_graph__())
# 576

And vs the distributed scheduler:

from dask.distributed import Client
Client(n_workers=1, threads_per_worker=1, processes=True)

%time _ = z.compute()
# CPU times: user 32.5 s, sys: 48.5 s, total: 1min 20s
# Wall time: 1min 22s

In short, I see that it takes 1min 22s vs 8s using the distributed and single node scheduler, respectively, when operating on a small number (64) of large chunks.

If I'm understanding you correctly, then the overhead from using the distributed scheduler for this should be something like 576 times a few hundred microseconds right? Even if the overhead was a millisecond per-task that would still only explain a minute portion of the difference. Could there be something else going on here?

mrocklin commented 4 years ago

I was just responding to this :) it's fun seeing github change live so many times :)

Yeah, the overhead you're running into here is likely moving the data from the remote worker into your client process. Recall that .compute() produces a concrete numpy array in local memory, so you're moving those various numpy arrays that are spread around your workers to one worker (in your case this is already true), then concatenating them into a single numpy array, and then moving that numpy array back over to your ipython/jupyter/python session.

Your output array is 32GB, which for 83s comes out to a bandwidth of about 385 MB/s, which isn't terrible for commodity network hardware and TCP.

The right way to test this is probably

%%time
z = z.persist()  # watch out, this is asynchronous

from dask.distributed import wait
wait(z)
eric-czech commented 4 years ago

Ah interesting, so even on a single host (this one isn't actually remote), the communication is still going through the network interface. That makes sense, thanks!

hammer commented 4 years ago

even on a single host (this one isn't actually remote), the communication is still going through the network interface.

Why? Can’t you just pass a file handle to the other local process?

mrocklin commented 4 years ago

Yeah, you have some options here. If you use processes=False then we'll just shuttle things across an in-memory Python queue.

If you wanted to get fancy and you're on Linux then you could try out ucx and use their shared memory transport. I would bet money that you'd run into problems with this today though (the NVIDIA folks are working hard on this, but there are a lot of lower level systems to stitch together).

In practice though you probably won't want to bring back giant results to your local session. So this comes up more often in benchmarking than in anything else.

mrocklin commented 4 years ago

Why? Can’t you just pass a file handle to the other local process?

You definitely could. You could save your data to a file (HDF5, Zarr, whatever) and then pass that file handle back.

You would have to make that choice explicitly though. Dask isn't going to do a high level analysis of your code and say "Hrm, they asked me to send back this numpy array, but probably they should have stored it to disk. Let me do that instead".

Typically today we leave high level decisions like that to the user, or to systems built on top of Dask.

jakirkham commented 4 years ago

Yeah we continue to improve UCX. I think today most of the issues are particularly combinations of features UCX supports as opposed to run-of-the-mill things. This could be a handy thing to try as it supports things like shared memory, which should level the playing field when using interprocess communication on the same machine/node. If you do try this, I'd be very interested to hear about your experience 😄

If you look into intermediate storage, you might find this doc helpful.

hammer commented 4 years ago

Sorry I am not being clear and I am not close to the code. But when I hear we are making a network call to a local process I think of HDFS short-circuit reads and their use of a UNIX domain socket to make this sort of call very fast. Is that not applicable in this situation?

mrocklin commented 4 years ago

The data is in memory on one process, another process is asking for that data. Dask uses point-to-point transfers for communication rather than using a global file system. So something like short-circuit reads don't make sense.

Now, ideally for transfers on a shared machine we would use something like posix shared memory (maybe this is your point for UNIX domain sockets). This is what the UCX conversation above with @jakirkham refers to, and is a major focus of the NVIDIA Dask team (although mostly for other network transfers rather than shared memory).

Pragmatically though, this is rarely an issue, and it isn't something that I encourage you to focus on. Typically for array based workloads we just have a single process per node and use a thread pool for on-node parallelism. The cost to transfer data within a single node is typically zero for numpy-style workloads. In @eric-czech 's example above he can engage this on the single-node cluster case by setting processes=False.

hammer commented 4 years ago

The data is in memory on one process, another process is asking for that data. Dask uses point-to-point transfers for communication rather than using a global file system. So something like short-circuit reads don't make sense.

I don't think I've explained clearly the motivation for short-circuit local reads. The JIRA for this feature has more details: HDFS-347. There is no global file system involved; this feature is invoked when an HDFS client process happens to be on the same node as the DataNode process which has the data the client wants to read.

In this setting, there are two processes on the same server which are going to exchange a large volume of data by inter-process communication over a socket. UNIX provides a standardized way to bypass the network stack in this setting called a UNIX domain socket.

While it's not trivial to make use of this optimization while also respecting access control policies, it is generally far less invasive to a codebase that's already using sockets for IPC to use UNIX domain sockets than to try to implement a shared memory approach.

mrocklin commented 4 years ago

Ah, my apologies. When people talk about short-circuit reads with HDFS I mostly think of reading local files rather than peer-to-peer communication of in-memory data. My understanding of the HDFS internals is not super deep.

Googling about it seems like it's not terribly hard to convince Tornado (our standard networking stack) to use UNIX domain sockets. It would be a fun experiment.

it is generally far less invasive to a codebase that's already using sockets for IPC to use UNIX domain sockets than to try to implement a shared memory approach.

The fun thing about UCX is that it's motivated by NVLink and Infiniband work (which NVIDIA needs desparately), but shared memory comes along for free (hypothetically).

But yeah UNIX domain sockets should be easy enough to play with. I've raised a separate issue here: https://github.com/dask/distributed/issues/3630

I still think though that this isn't likely to be your main bottleneck for these workloads. It would be great to improve (and presumably fairly doable), but it isn't where I would guess profiles would point first.

gitkarma commented 4 years ago

@mrocklin Can kron_v2 implementation used on single dimension array ?

mrocklin commented 4 years ago

Thanks for the comment @gitkarma I think that probably you wanted to direct that question to @eric-czech ? He is the author of the kron implementations here. I don't know much about them.