Cloud-Drift / clouddrift

CloudDrift accelerates the use of Lagrangian data for atmospheric, oceanic, and climate sciences.
https://clouddrift.org/
MIT License
34 stars 8 forks source link

fail to "vectorize" velocity_from_position #68

Open selipot opened 1 year ago

selipot commented 1 year ago

I am attempting to apply velocity_from_position to xarray.DataArrays of lon, lat, and time. I have been following a tutorial for a similar situation. With the following ds Dataset:

ds.info()
xarray.Dataset {
dimensions:
    trajectory = 593297 ;
    obs = 1440 ;

variables:
    float64 time(trajectory, obs) ;
    float32 lat(trajectory, obs) ;
    float32 lon(trajectory, obs) ;
    int32 obs(obs) ;
    int64 trajectory(trajectory) ;
}

I can easily do:

u,v = velocity_from_position(ds.lon.isel(trajectory=0),ds.lat.isel(trajectory=0),ds.time.isel(trajectory=0))

or

u2,v2 = xr.apply_ufunc(
    velocity_from_position,
    ds.lon.isel(trajectory=0),
    ds.lat.isel(trajectory=0),
    ds.time.isel(trajectory=0),
    input_core_dims=[["obs"], ["obs"], ["obs"]],
    output_core_dims=[["obs"], ["obs"]],
    dask="allowed",
)

but the following fails:

u2,v2 = xr.apply_ufunc(
    velocity_from_position,  # first the function
    ds.lon.isel(trajectory=slice(0,10)),
    ds.lat.isel(trajectory=slice(0,10)),
    ds.time.isel(trajectory=slice(0,10)),
    input_core_dims=[["obs"], ["obs"], ["obs"]],
    output_core_dims=[["obs"],["obs"]],
    dask="allowed",
    vectorize=True,
)

and the bottom line of the error is

File ~/miniconda3/envs/research/lib/python3.10/site-packages/clouddrift/analysis.py:65, in velocity_from_position(x, y, time, coord_system, difference_scheme)
     57 # Compute dx, dy, and dt
     58 if difference_scheme == "forward":
     59 
     60     # All values except the ending boundary value are computed using the
   (...)
     63 
     64     # Time
---> 65     dt[:-1] = np.diff(time)
     66     dt[-1] = dt[-2]
     68     # Space

ValueError: could not broadcast input array from shape (10,1439) into shape (9,1440)

So yes, I get the error but I don't understand if the fix is to apply ufunc differently or make velocity_from_position more flexible?

milancurcic commented 1 year ago

It should work if you apply the function to one trajectory at a time. I think the function could be made to correctly handle positions and times as 2-d arrays.

selipot commented 1 year ago

Indeed, it does work as shown above but it should be able to be fed to xarray.apply_ufunc. So I do not know not know if it is an issue with the function or my code. Maybe @philippemiron has some insight?

milancurcic commented 1 year ago

I haven't used appy_ufunc before, but your code may very well be correct. velocity_from_position definitely does not work on 2-d arrays in its current form. We could make it so, we just need to discuss and decide whether there's a good use case for that, considering that Lagrangian data will be ragged arrays most of the time.

selipot commented 1 year ago

I am not sure that's the solution. The example linked above examines the case of a function that takes 1-d arrays as an input, like our function.

milancurcic commented 1 year ago
     64     # Time
---> 65     dt[:-1] = np.diff(time)
     66     dt[-1] = dt[-2]
     68     # Space

ValueError: could not broadcast input array from shape (10,1439) into shape (9,1440)

The issue here when using our function is that np.diff() differentiates along the last (fastest-varying) axis, whereas the slice syntax (i.e. dt[:-1]) slices along the first (slowest-varying) axis. In the case of 1-d array inputs, the first and last axes are the same. But in the case of a 2-d array inputs, we get a shape mismatch.

selipot commented 1 year ago

To be more useful and widely applicable the function should take an argument indicating the dimension along which to conduct the time derivative operation so that it is applicable to arrays of any dimension. In addition, it should also be able to handle ragged arrays of position.

philippemiron commented 1 year ago

It's been a few months since I played with apply_ufunc. If I remember correctly from the EarthCube Notebook, the functions are applied by chunk; I'm not sure it would work as expected using slice...

I believe the easiest would be to use Awkward Array and perform a simple loop:

u,v = ak.zeros_like(lon), ak.zeros_like(lat)   # with lon,lat ak.Array
for i in range(0, len(lon)):
   u[i], v[i] = velocity_from_position(lon[i], lat[i])
selipot commented 1 year ago

I'll try that. But will it be fast and use dask/parallel computing? As an example I have 5.5M 60-day long hourly trajectories ... so perhaps if I chunk per trajectories? In general we need the analysis function of clouddrift to be "vectorizable" in a seamless way for users of xarray and awkward.

milancurcic commented 1 year ago

I haven't used Dask, but this looks like should do the job: https://examples.dask.org/applications/embarrassingly-parallel.html. On its own, I don't think Philippe's for-loop snippet will run in parallel.

selipot commented 1 year ago

More developments: I found that the following works if the input DataArrays are not chunked:

u,v = xr.apply_ufunc(
    velocity_from_position,  # first the function
    ds.lon.isel(trajectory=slice(0,10000)),  # now arguments in the order expected by 'velocity_from_position'
    ds.lat.isel(trajectory=slice(0,10000)),
    ds.time.isel(trajectory=slice(0,10000)),
    #input_core_dims=[[], ['time']],
    input_core_dims=[["obs"],["obs"], ["obs"]],
    output_core_dims=[["obs"], ["obs"]],
    vectorize=True,
)

but if the dataArrays are chunked then we need the following : (note the option dask="allowed" and the .load() function)

u2,v2 = xr.apply_ufunc(
    velocity_from_position,  # first the function
    dsc.lon.isel(trajectory=slice(0,10000)).load(),  # now arguments in the order expected by 'velocity_from_position'
    dsc.lat.isel(trajectory=slice(0,10000)).load(),
    dsc.time.isel(trajectory=slice(0,10000)).load(),
    #input_core_dims=[[], ['time']],
    input_core_dims=[["obs"],["obs"], ["obs"]],
    output_core_dims=[["obs"], ["obs"]],
    dask="allowed",
    vectorize=True,
)

Anyone understand why the load function is needed?

Note that the above applies to a subset of my example dataset, 10000 out of 593297 trajectories. If I try to apply this to the entire dataset, the first option succeeds on my desktop but the chunked option fails with a local cluster, which I am finding odd ...

selipot commented 1 year ago

Note that the chunked case above works only if the size of the chunks in the trajectory dimension is 1. That is if the function is fed 1-d array arguments I think. So the way forward appears to modify our function to handle n-d array.

milancurcic commented 1 year ago

This is now implemented in the main branch. Give it a try. And I will try running directly with Dask (without the Xarray wrapper) and see if that produces the expected result.

milancurcic commented 1 year ago

FWIW, I ran velocity_from_position on inputs of shape (600, 1000) (traj, obs) using dask.delayed on my 6-core laptop. The result is the same (and correct) as if I run the function without Dask, which is a good outcome. I don't get any speed up, though, and actually the Dask variant is slightly slower. It's possible that the inputs are too small to yield any benefit from parallelism.

selipot commented 1 year ago

@milancurcic Can you share that test code please?

philippemiron commented 1 year ago

I think it should more efficient to use Dask.bag instead of delayed due to the the high number of items. See Avoid too many tasks in the Dask best practices.

milancurcic commented 1 year ago

Here's my original snippet.

from clouddrift.analysis import velocity_from_position
import dask
import numpy as np

num_obs = 1000
num_traj = 6000

lon = np.reshape(np.tile(np.linspace(-180, 180, num_obs), num_traj), (num_traj, num_obs))

lat = np.zeros((lon.shape))
for n in range(num_traj):
    lat[n] = n / num_traj * 60 # from 0 to 60N

time = np.reshape(np.tile(np.linspace(0, 1e7, num_obs), num_traj), (num_traj, num_obs))

client = dask.distributed.Client(threads_per_worker=6, n_workers=1)
velocity_from_position_parallel = dask.delayed(velocity_from_position)
res = velocity_from_position_parallel(lon, lat, time)
u, v = res.compute()

Too many tasks is not the problem here, actually the opposite. I make only one function call with dask.delayed, and that doesn't seem to be what it's made for. After reading more about dask.delayed, it seems to be made for running multiple functions concurrently (sigh). Despite this, the dask.delayed variant of velocity_from_position runs twice as fast on my laptop vs. the serial execution of the function if I increase num_traj from 600 to 6000. I don't understand why that is.

I'll experiment with other dask idioms as well as dask+xarray.

philippemiron commented 1 year ago

I initially thought you were calling the function num_traj times. I think the use case of dask would be when the number of trajectories is two high to be performed all at once.

So something like this could replace the calculation part:

n_chunks = 10

delayed_results = []
for i in range(0, n_chunks-1):
    arr = range(i*int(num_traj/n_chunks), min(num_traj, (i+1)*int(num_traj/n_chunks))) 
    res = dask.delayed(velocity_from_position)(lon[arr], 
                                               lat[arr], 
                                               time[arr])
    delayed_results.append(res)

results = dask.compute(delayed_results)

I don't have super large dataset to test this with, but maybe @selipot could give it a try?

milancurcic commented 1 year ago

You're right. I naively assumed that Dask would chunk the data for me under the hood.

philippemiron commented 1 year ago

With apply_ragged this should be possible right? cc @selipot

selipot commented 1 year ago

Yes I think this issue is old before apply_ragged was written. But I don't fully understand how apply_ragged is parallelized. No chunk or dask involved?

philippemiron commented 1 year ago

The function is executed on a different thread for each trajectory and the results are reassembled afterwards. See this section,

# parallel execution
with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
    res = executor.map(lambda x: func(*x, **kwargs), iter)
# concatenate the outputs
res = [item if isinstance(item, Iterable) else [item] for item in res]

max_workers = None by default which is set to: Default value of max_workers is changed to min(32, os.cpu_count() + 4), according to the concurrent.futures doc. But you can also set the parameter.

def apply_ragged(
    func: callable,
    arrays: list[np.ndarray],
    rowsize: list[int],
    *args: tuple,
    max_workers: int = None,
    **kwargs: dict,
)
milancurcic commented 1 year ago

We can benchmark, but my understanding is that the concurrency (different from parallelism) used in apply_ragged will maybe parallelize some functions sometimes. Functions that do slow stuff like I/O, downloading data (and perhaps even plotting) are likely to run faster. CPU-bound stuff (numerical calculations) are likely to run similarly or slower.

I think this issue intended to ask how to run things truly in parallel, i.e. distribute the computation on different CPUs (whether via Dask, MPI, multiprocessing, or otherwise). I don't think we have a solution yet, but getting it to run with Dask is probably the lowest-hanging fruit among the possible approaches.

(and vectorization is yet a different concept from concurrency and parallelization; it's about putting multiple array elements into a long register and running one operation on all of them.)