Unidata / MetPy

MetPy is a collection of tools in Python for reading, visualizing and performing calculations with weather data.
https://unidata.github.io/MetPy/
BSD 3-Clause "New" or "Revised" License
1.25k stars 414 forks source link

[BUG] Metpy-Dask Integration to Calculate Wetbulb Temperature #1601

Open lennijusten opened 3 years ago

lennijusten commented 3 years ago

Hello, I'm using Dask to chunk xarrays and pass them into the metpy.calc.wet_bulb_temperature function and am getting a cryptic error message that I believe is due to something breaking internally in the wetbulb function. I understand from Unidata/MetPy#1479 that Dask integration is an ongoing project, so my intention is just to document this issue.

import metpy.calc
import xarray as xr
from dask.distributed import Client, LocalCluster

# Setup adaptive dask client
cluster = LocalCluster(dashboard_address=':8901',memory_limit='5GB',threads_per_worker=1 )
cluster.adapt(minimum=1, maximum=10)  # scale between 0 and 10 workers
client=Client(cluster)

# Load pressure, temperature, and water vapor mixing ratio
def init_data(chunks_dict):
    ds_press = xr.open_dataset('cesmLE1/atm/proc/tseries/monthly/PS/b.e11.B20TRC5CNBDRD.f09_g16.001.cam.h0.PS.185001-200512.nc', chunks=chunks_dict)
    ds_temp = xr.open_dataset('cesmLE1/atm/proc/tseries/daily/TREFHT/b.e11.BRCP85C5CNBDRD.f09_g16.001.cam.h1.TREFHT.20810101-21001231.nc', chunks=chunks_dict)
    ds_qbot = xr.open_dataset('cesmLE1/atm/proc/tseries/daily/QBOT/b.e11.BRCP85C5CNBDRD.f09_g16.001.cam.h1.QBOT.20810101-21001231.nc', chunks=chunks_dict)

    return ds_temp, ds_press, ds_qbot

temp, press, qbot = init_data(chunks_dict={'lat':20,'lon':20})     # Partition 20x20 chunks

# Get time mean of pressure 
press_mean = press1.mean('time')
press_mean['PS'] = press_mean.PS.assign_attrs(units='Pa')

# Calculate vapor pressure and then dewpoint
vapor_pressure = metpy.calc.vapor_pressure(press_mean['PS'], qbot['QBOT'])  
dewpoint = metpy.calc.dewpoint(vapor_pressure)

# Calculate wetbulb
wetbulb = metpy.calc.wet_bulb_temperature(press_mean['PS'], temp['TREFHT'], dewpoint)

The dask cluster works fine when calculating dewpoint and for the first set of metpy.calc.wet_bulb_temperature processes like opening the datasets, boradcasting, etc, but then it freezes progress at the process shown in the screenshot and returns the following error:

Dask progress
/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/dask/array/core.py:1395: FutureWarning: The `numpy.place` function is not implemented by Dask array. You may want to use the da.map_blocks function or something similar to silence this warning. Your code may stop working in a future release.
  FutureWarning,
/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/dask/array/core.py:1395: FutureWarning: The `numpy.place` function is not implemented by Dask array. You may want to use the da.map_blocks function or something similar to silence this warning. Your code may stop working in a future release.
  FutureWarning,
/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/dask/array/core.py:1395: FutureWarning: The `numpy.place` function is not implemented by Dask array. You may want to use the da.map_blocks function or something similar to silence this warning. Your code may stop working in a future release.
  FutureWarning,
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at /home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
  File "/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/distributed/comm/core.py", line 297, in _
    handshake = await asyncio.wait_for(comm.read(), 1)
  File "/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/asyncio/tasks.py", line 351, in wait_for
    yield from waiter
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/distributed/comm/core.py", line 304, in _
    raise CommClosedError() from e
distributed.comm.core.CommClosedError
Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at /home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
  File "/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/distributed/comm/core.py", line 297, in _
    handshake = await asyncio.wait_for(comm.read(), 1)
  File "/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/asyncio/tasks.py", line 351, in wait_for
    yield from waiter
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/distributed/comm/core.py", line 304, in _
    raise CommClosedError() from e
distributed.comm.core.CommClosedError
Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at /home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
  File "/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/distributed/comm/core.py", line 297, in _
    handshake = await asyncio.wait_for(comm.read(), 1)
  File "/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/asyncio/tasks.py", line 351, in wait_for
    yield from waiter
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home1/ljusten/miniconda3/envs/climate36/lib/python3.6/site-packages/distributed/comm/core.py", line 304, in _
    raise CommClosedError() from e
distributed.comm.core.CommClosedError

The process actually keeps running after this, but the dask interface shows that none of the workers are being used.

Relevant packages in my Conda environment:

# Name                    Version                   Build  Channel
ca-certificates           2020.10.14                    0  
certifi                   2020.11.8        py36h06a4308_0  
dask                      2.30.0                     py_0  
dask-core                 2.30.0                     py_0  
dask-labextension         3.0.0                      py_0    conda-forge
jupyter                   1.0.0                    py36_7  
jupyter-server-proxy      1.5.0                      py_0    conda-forge
jupyter_client            6.1.7                      py_0  
jupyter_console           6.2.0                      py_0  
jupyter_core              4.6.3                    py36_0  
jupyterlab                2.2.9                      py_0    conda-forge
jupyterlab_pygments       0.1.2                      py_0  
jupyterlab_server         1.2.0                      py_0  
libnetcdf                 4.7.3                hb80b6cc_0  
metpy                     1.0.0rc2.post81+ge63b136c          pypi_0    pypi
netcdf4                   1.5.3            py36hbf33ddf_0  
nodejs                    10.13.0              he6710b0_0  
numpy                     1.19.2           py36h54aff64_0  
pint                      0.16.1                   pypi_0    pypi  
python                    3.6.12               hcff3b4d_2  
scipy                     1.5.2            py36h0b6359f_0  
xarray                    0.16.1                     py_0  
aurelgriesser commented 3 years ago

Can I just check quickly - metpy v1.0.1 doesn't use Dask yet for wet bulb temperature calculations does it? Because none of my workers are doing anything when I use it (passing in chunked xarray DataArrays).

dopplershift commented 2 years ago

@aurelgriesser Due to the current implementation of moist_lapse, wet_bulb_temperature is iterating point-by-point through the arrays using nditer; there's probably any number of reasons in that chain of calls where the Dask arrays end up converted into plain numpy arrays. We definitely want to improve this is the somewhat near future.

lennijusten commented 2 years ago

Can I just check quickly - metpy v1.0.1 doesn't use Dask yet for wet bulb temperature calculations does it? Because none of my workers are doing anything when I use it (passing in chunked xarray DataArrays).

@aurelgriesser To speed up the wetbulb calculations I ended up calculating wetbulb over a range of reasonable temperature, pressure, and dewpoint. The result is a 3D volume of calculated wetbulb temperatures that can be queried for any combination of temp, pressure, and dewpoint.

Here is a link to that project if you are interested: https://github.com/lennijusten/Wetbulb-Temp/tree/main/Reference%20Grid