Open lvermue opened 5 years ago
@lvermue Thank you for the PR! The execution time improvement looks significant.
Cloud you writes tests for the new functions?
(Strikethroughed because of the following question)
@lvermue Is just writing something like this code insufficient?
import itertools
from fastdtw import fastdtw
from joblib import Parallel, delayed
import numpy as np
X = np.random.randint(1, 40, size=(100, 100))
results = Parallel(n_jobs=-1)(delayed(fastdtw)(X[i], X[j]) for i, j in itertools.product(range(100), repeat=2))
distance_mat = np.array([r[0] for r in results]).reshape(100, 100)
@slaypni There are two main aspects to this:
It does not calculate the diagonal of the distance matrix
Because of the inner workings of python parallelization, the data transferred to the worker has to be serialized, causing overhead. The example given would require this serialization and de-serialization step for each pairwise comparison causing a lot of overhead. The way I wrote the module now, it looks at how many workers are available and cuts all tasks in even chunks, which are then transferred to the workers only once allowing each worker to receive data once, calculate all its tasks and return the result. The effect of this can be shown by testing both approaches and profiling them:
Simple script
import itertools
from time import time
from fastdtw import fastdtw
from joblib import Parallel, delayed
import numpy as np
n = 1000
X = np.random.randint(1, 40, size=(n, 40))
start = time()
results = Parallel(n_jobs=-1)(delayed(fastdtw)(X[i], X[j]) for i, j in itertools.product(range(n), repeat=2))
distance_mat = np.array([r[0] for r in results]).reshape(n, n)
print('It took {:.0f} seconds'.format(time()-start))
# It took 82 seconds
The profiling
name | Call Count | Time (ms) | Own Time (ms) |
---|---|---|---|
test_parallel_script.py | 1 | 82177 | 3 |
call | 1 | 81111 | 0 |
retrieve | 1 | 81028 | 3027 |
wrap_future_result | 5966 | 77949 | 8 |
result | 5966 | 77941 | 48 |
wait | 5113 | 77874 | 29 |
method 'acquire' of '_thread.lock' objects | 10268 | 77825 | 77825 |
Method as proposed
import numpy as np
from scipy.spatial.distance import euclidean
from time import time
from fastdtw import fastdtw_parallel, get_path
# Using the X-matrix from above
X = X
start = time()
# Same machine with 20 cores
distance_matrix, path_list = fastdtw_parallel(X, n_jobs=-1)
print('It took {:.0f} seconds'.format(time()-start))
# It took 24 seconds
The profiling
name | Call Count | Time (ms) | Own Time (ms) |
---|---|---|---|
test_parallel.py | 1 | 24683 | 0 |
fastdtw._fastdtw.fastdtw_parallel | 1 | 24485 | 91 |
call | 1 | 24161 | 0 |
method 'acquire' of '_thread.lock' objects | 34 | 24019 | 24019 |
retrieve | 1 | 24019 | 0 |
wait | 6 | 24019 | 0 |
result | 20 | 24018 | 0 |
wrap_future_result | 20 | 24018 | 0 |
As to be seen in the two profiles, the proposed module calls each worker only once allowing it to be about 4 times faster than just writing a simple script.
@lvermue As you mentioned, the simple script could reduce the execution time by half replacing itertools.product
with itertools.compinations
to cut unnecessary pairs. Even in the case, the simple version takes 39ms which is still longer than proposed version by 60%.
So I think the proposed version is good for the use of computing distance matrix, but also prefer to have some changes in terms of its code structure.
Glimpsing diff of the code, I noticed there are same pattern of codes which seem redundant. So it is nicer to gather those codes.
And, computing distance matrix is a bit out of the scope of this package, however it would be nice to have convenient function to calculate it. So, I would like to have the function under fastdtw.util
where some utilities can be placed rather than directly under fastdtw
.
Taking those into account, I prefer something like the following distmat
to be implemented instead of dtw_parallel
and fastdtw_parallel
.
from functools import partial
from fastdtw import fastdtw
from fastdtw.util import distmat
dists, paths = distmat(partial(fastdtw, radius=3), X)
Full parallelization was added to the package using the joblib library. Now NxM matrices, i.e. N-time series with M-time points, can be calculated in parallel. To embed different lengths the missing time points can be padded with np.nan values.
The changes were tested on a machine with 20 cores leading to following results: Single core
Parallel
Examples on how to use the new functions were added to the README.rst file and the docstring of the respective functions.