jbusecke / xmovie

A simple way of creating movies from xarray objects
https://xmovie.readthedocs.io/
MIT License
251 stars 38 forks source link

Using dask to save frames in parallel #6

Closed jbusecke closed 3 years ago

jbusecke commented 5 years ago

I am opening a separate issue for this functionality (previously mentioned in #2).

Currently the most time intensive step in creating a movie/gif is the rendering of each frame, which is done in serial.

In an earlier test version I implemented a parallelization of the printing using dask. The speedup was very nice, but I encountered quite strange errors with some matplotlib elements: E.g. the colorbar label would shift around and other odd things that looked less than premium.

I would like to try to implement this again with the substantially refactored code. The proposed changes would replace the default save_frames with save_frames_parallel if a dask array is passed, so that mov.save(moviename.mp4) would work in parallel out of the box and serial rendering could be forced with something like mov.save(moviename.mp4, render_parallel=False).

jbusecke commented 5 years ago

I have implemented an alternative way to save out the frames using dask:

    def dask_frame_wrapper(self, tt, odir=None):
        fig = self.render_frame(tt)
        frame_save(
            fig, tt, odir=odir, frame_pattern=self.frame_pattern, dpi=self.dpi
        )

    def save_frames_parallel(self, odir, partition_size=5, progress=False):
        """Save movie frames out to file.
        Parameters
        ----------
        odir : path
            path to output directory
        progress : bool
            Show progress bar. Requires tqmd.
        """
        frame_range = range(len(self.data[self.framedim].data))
        frame_bag = db.from_sequence(
            frame_range, partition_size=partition_size
        )
        mapped_frame_bag = frame_bag.map(self.dask_frame_wrapper, odir=odir)
        if progress:
            with ProgressBar():
                mapped_frame_bag.compute(processes=False)
        else:
            mapped_frame_bag.compute(processes=False)

The speedup is very nice (needs ~1/4 of the time), but I am still getting absolutely strange plotting behavior. This gif is rendered using the serial (simple for loop) method and it looks completely normal: dask_test_serial But when the frames are rendered in parallel the colorbar ticklabels and the label go bananas: dask_test_parallel

Does anyone out there have an idea what could be happening? Is there a better way to execute figure plotting using dask? CC: @dcherian @rabernat

dcherian commented 5 years ago

I couldn't reproduce with:

import xarray as xr
import dask.bag as db
from dask.diagnostics import ProgressBar

da = xr.DataArray(np.random.randn(50, 60, 10)).chunk({'dim_2': 1})
da.attrs['long_name'] = "what's in a name?"
da.attrs['units'] = 'units'
da.load()

def make_plot(tt, data):
    hfig = plt.figure()
    data.isel(dim_2=tt).plot(vmin=-2, vmax=4, cmap=mpl.cm.RdBu_r, extend='both')
    hfig.savefig(f"images/{tt}.png")
    del hfig

frame_range = range(len(da['dim_2'].data))
frame_bag = db.from_sequence(frame_range, partition_size=2)
mapped_frame_bag = frame_bag.map(make_plot, data=da)

mapped_frame_bag.compute()

maybe it's a cartopy thing?

I have a few questions on your approach to making embarrassingly-parallel movies:

  1. Are you doing this with dask arrays or numpy arrays? If I remove the da.load() line in the above example, things don't work. Is that expected?

  2. What would your approach be to calling a plot or render function on every slice along one axis of a dask array. E.g. if my dask array has a dimension time and I want a frame per time step.

jbusecke commented 5 years ago

Thanks a lot. This is helpful and encouraging. I have been working on more 'presets', and I think before merging this feature, I will implement more basic presets, so I can confidently test several presets (#12) and see if only the ones using cartopy (or just certain projections) fail. I think the default preset should be exactly what xarray would plot, the rotating globe should be optional (I just implemented first, because I had it available and it looks cool hehe).

  1. Hmm that is strange. I intended it the way that it would not matter, but I am not testing with dask arrays currently. I will add datasets with dask arrays for sure (#11). The one difference I see is that in xmovie the data variable is set in the Movie initialization and then pulled from the class, but that should not really matter if I am not missing something.

  2. Movie has a kwarg framedim, which defaults to 'time' which is passed to each 'preset' plot function. You can however also just write your own function and assign the passed frame to any dimension you like. I try to keep things as flexible as possible so that custom functions (e.g.func(ds, fig, timestamp, **kwargs)) get access to the full datastructure and the only input that is definitely needed (the frame - or timestamp). I have experimented with these options quite succesfully but I realize that the most urgent step at this point is to start a thorough documentation (#13), explaining in detail the options. Do you agree?

jbusecke commented 5 years ago

Also, random question: Why is parallelization with dask always embarrassing 🤣

dcherian commented 5 years ago

This is what the graph looks like with a dask array in the above example, clearly there's no benefit: I don't know how to get rid of that finalize step: mapped_frame_bag.visualize(), image

Also, random question: Why is parallelization with dask always embarrassing 🤣

Haha, it's not always I don't think. I think it's used for problems that are easy to parallelize because each step is independent of the other; so the problem tends to scale well with compute power. (This is my amateur opinion). In MATLAB this kind of thing is easily solved by parfor.

dcherian commented 5 years ago

I have a solution here: https://github.com/ncar-hackathons/scientific-computing/issues/6. It basically abuses map_blocks to work on dask array chunks in parallel and passes on dims, coords, attrs so that I can reconstruct an xarray datarray out of the numpy-fied chunk that map_blocks provides.

The graph looks parallel! image

and because the chunk is xarray-fied you get all the default plotting features like automatic labelling: image

Also Matt Long pointed out that since matplotlib is not thread-safe, you need to prevent dask from using multithreading. Otherwise I think you get the weird behaviour you see.

This is the line we use on Cheyenne to make that happen: cluster = dask_jobqueue.PBSCluster(cores=18, processes=18,).

jbusecke commented 5 years ago

Sorry I havent had time to get back to this in a while. This looks pretty nice though! Thanks for putting this together.

Just to clarify, my original approach did work and also speed things up (Id have to check the dask graph to make sure it is as nicely parallel as this approach), it just produced these super weird glitches.

The one thing that I could think of here is that this approach basically limits us to a single datarray, right? Or can we pass a full dataset? I had some plans to be able to pass a full dataset and overlay different plotting methods for several data_variables in that dataset. I will definitely return to this shortly, just have a review due and some other not so fun stuff for today/tomorrow.

dcherian commented 5 years ago

Hmm.. yes probably limited to a dataset. Did your original approach work with dask arrays? I had no luck using both dask.bag and dask.array.

dcherian commented 5 years ago

Actually map_blocks lets you pass multiple dask arrays, so wrapper and animate could be extended to do that...

jbusecke commented 5 years ago

Oh then thats really cool! If we can work it out so that the internally constructed xarray object is equivalent to ds.sel(framedim=index), then things should work pretty smoothly.

For reference, this is what I have been working with... it mostly differs from your example in the sense that its all organized in the Movie class. Not sure if/how that would affect things.

def save_frames_serial(self, odir, progress=False):
        """Save movie frames as picture files.
        Parameters
        ----------
        odir : path
            path to output directory
        progress : bool
            Show progress bar. Requires tqmd.
        """
        # create range of frames
        frame_range = range(len(self.data[self.framedim].data))
        if tqdm_avail and progress:
            frame_range = tqdm(frame_range)
        elif ~tqdm_avail and progress:
            warnings.warn("Cant show progess bar at this point. Install tqdm")

        for fi in frame_range:
            fig = self.render_frame(fi)
            frame_save(
                fig,
                fi,
                odir=odir,
                frame_pattern=self.frame_pattern,
                dpi=self.dpi,
            )

    def dask_frame_wrapper(self, tt, odir=None):
        fig = self.render_frame(tt)
        frame_save(
            fig, tt, odir=odir, frame_pattern=self.frame_pattern, dpi=self.dpi
        )

    def save_frames_parallel(self, odir, partition_size=5, progress=False):
        """Save movie frames out to file.
        Parameters
        ----------
        odir : path
            path to output directory
        progress : bool
            Show progress bar. Requires tqmd.
        """
        frame_range = range(len(self.data[self.framedim].data))
        frame_bag = db.from_sequence(
            frame_range, partition_size=partition_size
        )
        mapped_frame_bag = frame_bag.map(self.dask_frame_wrapper, odir=odir)
        if progress:
            with ProgressBar():
                mapped_frame_bag.compute(processes=False)
        else:
            mapped_frame_bag.compute(processes=False)

or could it be the (processes=False)? I remember a while back, that was somehow necessary to get it to work, and it is faster, but I am not sure if we might be able to speed this up even more.

jbusecke commented 5 years ago

I am having trouble interpreting your example graph: Why does the plotting step only show 5 steps? Shouldnt there be 10? If the plotting steps are run in parallel, I still expect a significant speedup, since that is what takes most time (at least when invoking fancy mapping etc). But I am all for going with map.blocks if we can retain the flexibility for later features, honestly. Also again thanks for working on this!

jbusecke commented 5 years ago

Urghhhh I want to work on this sooo bad now...but I need to finish this paper first hahaha.

dcherian commented 5 years ago

I think you're looking at the first graph which was me trying to run with dask.bag with 2 partitions and 5 cores (or something).

The second one has 10 in parallel (map_blocks). image

  1. Your rendering problem is probably fixed by forcing single-threaded workers.
  2. Does your approach only work with numpy arrays? That was the limitation I ran in to. dask.bag with a loaded dataset works awesomely well but craps out with a dask dataset.

If we can work it out so that the internally constructed xarray object is equivalent to ds.sel(framedim=index)

Currently this is like da.sel(framedim=index). I think we can extend it (I think) but it might be painful.

jbusecke commented 5 years ago

I tried it with dask arrays and it worked, but I will add some explicit tests for that for sure.

dcherian commented 4 years ago

Finally figured this out using the new map_blocks functionality.


def save_image(block):
    import cartopy.crs as ccrs
    import matplotlib.pyplot as plt
    if sum(block.shape) > 0:
        # workaround 1:
        # xarray passes a zero shaped array to infer what this function returns. 
        # we can't run plot, so avoid doing that
        f = plt.figure()
        ax = f.subplots(1, 1, subplot_kw={"projection": ccrs.PlateCarree()}, squeeze=True)

        # xarray plotting goodness is available here!
        block.plot(ax=ax, robust=True, vmin=5, vmax=28, cmap=mpl.cm.Spectral_r, cbar_kwargs={"extend": "both"})

        # on pangeo.io, this will need some tweaking to work with gcsfs.
        # haven't tried that. On cheyenne, it works beautifully.
        f.savefig(f"images/aqua/{block.time.values[0]}.png", dpi=180)
        plt.close(f)

    # workaround 2:
    # map_blocks expects to receive an xarray thing back.
    # Just send back one value. If we send back "block" that's like computing the whole dataset!
    return block["time"]

# I want to animate in time, so chunk so that there is 1 block per timestep.
tasks = merged.sst.chunk({"time": 1, "lat": -1, "lon": -1}).map_blocks(save_image)
tasks.compute()
jbusecke commented 4 years ago

Oh sweet! Thanks for keeping at this @dcherian. Could you put a PR together? I would love to see if this takes care of those weird rendering issues I had as well! Realistically I can maybe devote some time later this week to xmovie, but today and Mon/Tue are pretty packed already.

dhruvbalwada commented 3 years ago

did this ever get implemented in?

I am trying to making a 4500 frame movie, and don't want to wait 40 hours :P

dcherian commented 3 years ago

Map_blocks works really well

jbusecke commented 3 years ago

I've used it for other occasions and am planning to refactor this eventually, but the damn time is precious hahaha. Seems like there is more damn though, so maybe i'll get myself to do it some time soon!

tomchor commented 3 years ago

Dropping by because I'm also interested! I've been using xmovie and I love how simple it is, but waiting for matplotlib (which is very slow) to plot every single frame take a lot of time even for a relatively short video.

Are there any workarounds to use dask for this issue that I can use right now? Maybe modifying the plot_func()?

Thanks!

jbusecke commented 3 years ago

Are there any workarounds to use dask for this issue that I can use right now? Maybe modifying the plot_func()?

I don't think this would work. Xmovie still draws a full figure for each timestep, but we could use dask to do many frames in parallel (since they do not need information about each other in most cases). I had a version that was working on a branch, but it was introducing super weird artifacts in the movies. @dcherian suggested using the new xarray.map_blocks functionality and I just haven't gotten around to implementing that.

It is on the list, but unfortunately there are a few high priority things currently occupying me. I am very keen to get back to this however. Apologies for the delay

tomchor commented 3 years ago

@jbusecke thanks for the answer! Yes I saw the videos with the weird label behavior. Honestly I don't mind that behavior too much. It's a small price to pay to have something that doesn't take an hour to generate each video. Is that branch still available?

jbusecke commented 3 years ago

Its here: https://github.com/jbusecke/xmovie/tree/jbusecke_dasksave, but use at your own risk! It is quite old at this point!