pySTEPS / pysteps

Python framework for short-term ensemble prediction systems.
https://pysteps.github.io/
BSD 3-Clause "New" or "Revised" License
441 stars 160 forks source link

fft_method pyfftw causes unexpected noise additions when using multi-threading #337

Closed RubenImhoff closed 4 months ago

RubenImhoff commented 1 year ago

If I run an ensemble using multiple cores (so, dask will parallellize the ensemble members over the cores), it seems that the ensemble order is lost, resulting in weird transitions (due to noise from a different member that ends up in that member). See for instance (these are three 15-min instance in the forecast): 2023-08-17 15_21_02-P_fc in 202308170600_blended_nowcast_olddask

2023-08-17 15_21_19-P_fc in 202308170600_blended_nowcast_olddask

2023-08-17 15_21_30-P_fc in 202308170600_blended_nowcast_olddask

Or maybe even clearer: MicrosoftTeams-image

If I run on 1 core, this problem does not occur, so it gives the impression that this has to do with parallellizing using dask. After having contact with @mpvginde, I could not reproduce the error with the Belgian test case that we have (the figures above are with our Dutch data and setup). Only difference between our setups turned out to be the setting that 'fft_method' = pyfftw instead of numpy in the Belgian setup. After changing this, the problems disappeared when running multi-threaded. This gives the impression that pyfftw should not be used for nowcasting and blended forecasting when using more than 1 worker/thread. Is this a familiar issue to you and is there anything we can do about it?

RubenImhoff commented 9 months ago

Update on this possible bug: I see that it still comes back in the STEPS nowcasts (not the blending anymore) with both pyfftw and numpy as fft_methods. Seems that in the newer Dask versions, we have to more explicitly pin the activities to specific cores and keep track of it. For the nowcast loop this still goes well multi-threaded, provided that numpy is used as method, but for all other multi-threaded processes (cascade decomposition, noise initialization, etc.) it seems to go wrong. We should either fix those options to one worker or find a different solution, I'm afraid.

dnerini commented 9 months ago

Hi @RubenImhoff thanks for the update. Would it be possible to get a better idea of the changes you are suggesting? If I understand well, one option would be to explicitly set some arguments for dask, right?

RubenImhoff commented 9 months ago

Hi @dnerini, of course. The simple solution is to only use one worker (thread) for the parts where it goes wrong. I have tested it by fixing num_workers to 1 in the steps.py code in the nowcasting module, except for the nowcasting main loop:

precip_forecast = nowcast_main_loop(
        precip,
        velocity,
        state,
        timesteps,
        extrap_method,
        _update,
        extrap_kwargs=extrap_kwargs,
        velocity_pert_gen=velocity_perturbators,
        params=params,
        ensemble=True,
        num_ensemble_members=n_ens_members,
        callback=callback,
        return_output=return_output,
        num_workers=num_ensemble_workers,
        measure_time=measure_time,
    )

where num_workers can be > 1.

I think ideally, we would make full use of dask. In that case, we would have to pin the work to specific cores or so. I believe that is possible in Dask too, but I have no experience with it. Maybe you do or @pulkkins?

mpvginde commented 6 months ago

Hi everyone, I think I found the issue: In the _update helper function in nowcasts.steps the numpy-array holding the final forecasted precipation: precip_forecast_out is created from a list which is initialized at the start of the _update function: https://github.com/pySTEPS/pysteps/blob/be8eea432239f77bdfb7edf51a9bfaa94d012752/pysteps/nowcasts/steps.py#L708-L709

The workers inside the _update function then append their result to this list: https://github.com/pySTEPS/pysteps/blob/be8eea432239f77bdfb7edf51a9bfaa94d012752/pysteps/nowcasts/steps.py#L822-L831

Finally the list is converted to a numpy array: https://github.com/pySTEPS/pysteps/blob/be8eea432239f77bdfb7edf51a9bfaa94d012752/pysteps/nowcasts/steps.py#L833-L846

When dask is not used their is no concern since the workers will always be triggered in the same order. But when dask is used the order in which the workers are triggered is quite random. This will cause that during the _update the ensemble members might get shuffled.

I have proposed a fix in PR #347.

@RubenImhoff could you check if this solves your issues?

RubenImhoff commented 4 months ago

After some testing, I can confirm that #347 fixes the issue. :)