mars-project / mars

Mars is a tensor-based unified framework for large-scale data computation which scales numpy, pandas, scikit-learn and Python functions.
https://mars-project.readthedocs.io
Apache License 2.0
2.7k stars 326 forks source link

Can the implicit chunks after some built-in function (e.g. mt.repeat()) be merged for fast distributed computing? #461

Open anders0821 opened 5 years ago

anders0821 commented 5 years ago

The chunk_size of the original data can be set. And the built-in function automatically determines the output chunk size by its input chunking. This will lead to some problem. I implemented the kron function which is absent in mars:

def my_mt_kron(A, B):
    # adjust A.ndim == B.ndim
    while A.ndim < B.ndim:
        A = mt.expand_dims(A, 0)
    while B.ndim < A.ndim:
        B = mt.expand_dims(B, 0)

    # repeat A
    A_rep = A
    for i in range(B.ndim):
        A_rep = mt.repeat(A_rep, B.shape[i], i)

    # tile B
    B_tile = mt.tile(B, A.shape)

    return A_rep * B_tile

It is called by:

A = np.random.randn(10, 10)
B = np.random.randn(10, 10)
KAB = my_mt_kron(mt.array(A), mt.array(B))
KAB.visualize(tiled=True).view()

The computing graph is: 1

Furthermore, I have tried a simpler test code for the inner function mt.repeat():

A = mt.array(np.random.randn(10, 10))
A_rep = mt.repeat(mt.repeat(A, 10, 0), 10, 1)
A_rep.visualize(tiled=True).view()

The computing graph is:

2

The mt.repeat() greatly increases the number of chunks in the computing graph. I think it is a waste of resources to use 100 chunks of 10*10 in the following computation. The number of chunks is related to the dimension of the input data in my kroon function, even when I have not set chunk_size to any data. It makes the distributed cluster scheduling heavy. Can the implicit chunks after functions like mt.repeat() be merged or rechunked whenever I like?

qinxuye commented 5 years ago

Yeah, there is indeed a function to adjust the chunk_size. It's called rechunk. I notice that this function is also absent in our docs, sorry about that, I will add it later.

Now just refer to the function defination:

https://github.com/mars-project/mars/blob/0230753dbd4495f5c67a13682b4b7c55431a3b6d/mars/tensor/expressions/rechunk/rechunk.py#L76

Just call tensor.rechunk(new_chunk_size). Welcome to tell me the new result once you have new feedback.