pydata / xarray

N-D labeled arrays and datasets in Python
https://xarray.dev
Apache License 2.0
3.57k stars 1.07k forks source link

Appending data to a dataset stored in Zarr format produce PermissonError or NaN values in the final result #5511

Open josephnowak opened 3 years ago

josephnowak commented 3 years ago

What happened: I was trying to append new data to an existing Zarr file with a time-series dataset (a financial index) and I start to notice that sometimes it produce PermissonError or randomly appear some NaN, so I check and the problem looks like is something related to multiple threads/process trying to write the same chunk (probably the lasts that has different size).

What you expected to happen: I would like to be able to store the data perfectly or it should be sufficient if it raise a NotImplemented error in case that this kind of appends is incorrect

Minimal Complete Verifiable Example: Probably you have to run many times this code to reproduce the errors, basically, you will see the PermissonError or an increment in the number of NaNs (it should has always 0)

import numpy as np
import pandas as pd
import xarray as xr

# Dummy data to recreate the problem, the 308 is because my original data had this number of dates
dates = pd.bdate_range('2017-09-05', '2018-11-27')[:308]
index = xr.DataArray(
    data=np.random.rand(len(dates)),
    dims=['date'],
    coords={'date': np.array(dates, np.datetime64)}
)

# Store a slice of the index in a Zarr file (new_index) using chunks with size 30
start_date = np.datetime64('2017-09-05')
end_date = np.datetime64('2018-03-13')
index.loc[start_date: end_date].to_dataset(
    name='data'
).chunk(
    {'date': 30}
).to_zarr(
    'new_index',
    mode='w'
)

# Append the rest of the data to the new_index Zarr file
start_date = np.datetime64('2018-03-14')
end_date = np.datetime64('2018-11-27')

# Sometimes this section of code can produce PermissionError, probably two or more process/threads of Dask are trying
# to write at the same time in the same chunks and I suppose that last chunks that end with a different size
# and is necessary to 'rewrite' them are those chunks that cause the problem.
index.loc[start_date: end_date].to_dataset(
    name='data'
).chunk(
    {'date': 30}
).to_zarr(
    'new_index',
    append_dim='date'
)

# The final result can contain many nan even when there is not nan in the original dataset
# this behaviour is aleatory so I suppose that is related with the aforementioned error
print(xr.open_zarr('new_index')['data'].isnull().sum().compute())
print(index.isnull().sum().compute())

Anything else we need to know?:

Environment:

Output of xr.show_versions() INSTALLED VERSIONS ------------------ commit: None python: 3.8.5 (default, Sep 3 2020, 21:29:08) [MSC v.1916 64 bit (AMD64)] python-bits: 64 OS: Windows OS-release: 10 machine: AMD64 processor: Intel64 Family 6 Model 165 Stepping 2, GenuineIntel byteorder: little LC_ALL: None LANG: None LOCALE: ('es_ES', 'cp1252') libhdf5: 1.10.4 libnetcdf: None xarray: 0.18.2 pandas: 1.2.4 numpy: 1.20.2 scipy: 1.6.2 netCDF4: None pydap: None h5netcdf: None h5py: 2.10.0 Nio: None zarr: 2.8.3 cftime: 1.5.0 nc_time_axis: None PseudoNetCDF: None rasterio: None cfgrib: None iris: None bottleneck: 1.3.2 dask: 2021.06.0 distributed: 2021.06.1 matplotlib: 3.3.4 cartopy: None seaborn: 0.11.1 numbagg: None pint: None setuptools: 52.0.0.post20210125 pip: 21.1.2 conda: 4.10.1 pytest: 6.2.4 IPython: 7.22.0 sphinx: 4.0.1
shoyer commented 3 years ago

Thanks for the report! This does look broken, which I was able to verify by running your code.

My guess is that something in Xarray's logic for appending datasets implicitly assumes that the existing datasets has been written in complete "chunks", which is not the case here.

josephnowak commented 3 years ago

Hi, (sorry if this sound annoying) I check a little bit the code used to append data to Zarr files, and from my perspective the logic is correct and it takes into account the case where the last chunks have differents shape because it works with the shape of the unmodified array and then it resizes it to write in regions with Dask: image

I ran the same code that I let in the previous comment but I passed a synchronizer to the 'to_zarr' method (synchronizer=zarr.ThreadSynchronizer()) and all the problems related to the nans and to PermissonErrors disappeared, so this looks more like a synchronization problem between Zarr and Dask.

Hope this helps in something to fix the bug.

josephnowak commented 3 years ago

Hi again, I check a little bit more the behavior of Zarr and Dask and I found that the problem only occurs when the lock option in the 'dask.store' method is set as None or False, below you can find an example:


import numpy as np
import zarr
import dask.array as da

# Writing an small zarr array with 42.2 as the value
z1 = zarr.open('data/example.zarr', mode='w', shape=(152), chunks=(30), dtype='f4')
z1[:] = 42.2

# resizing the array 
z2 = zarr.open('data/example.zarr', mode='a')
z2.resize(308)

# New data to append
append_data = da.from_array(np.array([50.3] * 156), chunks=(30))

# If you pass to the lock parameters None or False you will get the PermissonError or some 0s in the final result
# so I think this is the problem when Xarray writes to Zarr with Dask, (I saw in the code that by default use lock = None)
# If you put lock = True all the problems disappear.
da.store(append_data, z2, regions=[tuple([slice(152, 308)])], lock=None)

# the result can contain many 0s or throw an error
print(z2[:])

Hope this help to fix the bug.

josephnowak commented 1 year ago

Hi @shoyer, sorry for bothering you with this issue again, I know that it is old right now, but I have been dealing with it again some days ago and I have also noticed the same problem using the region parameter, so I was thinking that based on this issue I opened on Zarr (https://github.com/zarr-developers/zarr-python/issues/1414) it would be good to implement any of this options to solve the problem:

  1. A warning on the docs indicating that it is necessary to add a synchronizer if you want to append or update data to a Zarr file, or that you need to manually align the chunks based on the size of the missing data on the last chunk to be able to get independent writes.

  2. Automatically align the chunks to get independent writes (which I think can produce slower writes due to the modification of the chunks).

  3. Raise an error if there is no synchronizer and the chunks are not properly aligned, I think that the error can be controlled using the parameter safe_chunks that you offer on the to_zarr method.

shoyer commented 1 year ago

If we can find cases where we know concurrent writes are unsafe, we can definitely start raising errors. Better to be safe than to suffer from silent data corruption!