opendatacube / datacube-wps

Web Processing Service running on opendatacube
Other
7 stars 3 forks source link

Intermittent test failure (datacube geometry hashing for dask?) #123

Closed benjimin closed 2 years ago

benjimin commented 2 years ago

Intermittent build test failure, where test_api.py::test_wit elicits a UnicodeDecodeError (invalid start bytes for utf8) from pyproj.

The trace involves witprocess cal_area, dask core, and datacube geometry __hash__ and to_wkt.

Previously noted in https://github.com/opendatacube/datacube-wps/issues/122#issuecomment-921424699

benjimin commented 2 years ago
datacube_wps/processes/witprocess.py:60: in process_data
    re_wit = cal_area(aggregated)
datacube_wps/processes/witprocess.py:212: in cal_area
    re = pd.merge(re, ((aggregated.TCW > wet_threshold).astype('int')
/env/lib/python3.8/site-packages/xarray/core/dataarray.py:929: in load
    ds = self._to_temp_dataset().load(**kwargs)
/env/lib/python3.8/site-packages/xarray/core/dataset.py:865: in load
    evaluated_data = da.compute(*lazy_data.values(), **kwargs)
/env/lib/python3.8/site-packages/dask/base.py:570: in compute
    results = schedule(dsk, keys, **kwargs)
/env/lib/python3.8/site-packages/dask/threaded.py:79: in get
    results = get_async(
/env/lib/python3.8/site-packages/dask/local.py:517: in get_async
    raise_exception(exc, tb)
...
/env/lib/python3.8/site-packages/dask/core.py:122: in _execute_task
    elif not ishashable(arg):
/env/lib/python3.8/site-packages/dask/core.py:20: in ishashable
    hash(x)
/env/lib/python3.8/site-packages/datacube/utils/geometry/_base.py:1069: in __hash__
    return hash((*self.shape, self.crs, self.affine))
/env/lib/python3.8/site-packages/datacube/utils/geometry/_base.py:259: in __hash__
    return hash(self.to_wkt())
/env/lib/python3.8/site-packages/datacube/utils/geometry/_base.py:190: in to_wkt
    return self._crs.to_wkt(pretty=pretty, version=version)
pyproj/_crs.pyx:457: in pyproj._crs.Base.to_wkt
pyproj/_crs.pyx:120: in pyproj._crs._to_wkt
pyproj/_crs.pyx:24: in pyproj._crs.cstrdecode
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc1 in position 3: invalid start byte
Click for full error log ``` =================================== FAILURES =================================== ___________________________________ test_wit ___________________________________ def test_wit(): catalog = read_process_catalog("datacube-wps-config.yaml") wit_proc = [entry for entry in catalog if isinstance(entry, WIT)][0] poly = Geometry( { "type": "Polygon", "coordinates": [ [ (147.28271484375003, -35.89238773935897), (147.03277587890628, -35.663990911348115), (146.65237426757815, -35.90684930677119), (147.09182739257815, -36.15894422111004), (147.28271484375003, -35.89238773935897), ] ], }, crs=CRS("EPSG:4326"), ) > results = wit_proc.query_handler(time=("2019-03-05", "2019-07-10"), feature=poly) tests/test_api.py:88: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ datacube_wps/processes/__init__.py:65: in log_wrapper result = func(*args, **kwargs) datacube_wps/processes/__init__.py:520: in query_handler df = self.process_data(data, {"time": time, "feature": feature, **parameters}) datacube_wps/processes/__init__.py:65: in log_wrapper result = func(*args, **kwargs) datacube_wps/processes/witprocess.py:60: in process_data re_wit = cal_area(aggregated) datacube_wps/processes/witprocess.py:212: in cal_area re = pd.merge(re, ((aggregated.TCW > wet_threshold).astype('int') /env/lib/python3.8/site-packages/xarray/core/dataarray.py:929: in load ds = self._to_temp_dataset().load(**kwargs) /env/lib/python3.8/site-packages/xarray/core/dataset.py:865: in load evaluated_data = da.compute(*lazy_data.values(), **kwargs) /env/lib/python3.8/site-packages/dask/base.py:570: in compute results = schedule(dsk, keys, **kwargs) /env/lib/python3.8/site-packages/dask/threaded.py:79: in get results = get_async( /env/lib/python3.8/site-packages/dask/local.py:517: in get_async raise_exception(exc, tb) /env/lib/python3.8/site-packages/dask/local.py:325: in reraise raise exc /env/lib/python3.8/site-packages/dask/local.py:223: in execute_task result = _execute_task(task, data) /env/lib/python3.8/site-packages/dask/core.py:121: in _execute_task return func(*(_execute_task(a, cache) for a in args)) /env/lib/python3.8/site-packages/dask/core.py:121: in return func(*(_execute_task(a, cache) for a in args)) /env/lib/python3.8/site-packages/dask/core.py:122: in _execute_task elif not ishashable(arg): /env/lib/python3.8/site-packages/dask/core.py:20: in ishashable hash(x) /env/lib/python3.8/site-packages/datacube/utils/geometry/_base.py:1069: in __hash__ return hash((*self.shape, self.crs, self.affine)) /env/lib/python3.8/site-packages/datacube/utils/geometry/_base.py:259: in __hash__ return hash(self.to_wkt()) /env/lib/python3.8/site-packages/datacube/utils/geometry/_base.py:190: in to_wkt return self._crs.to_wkt(pretty=pretty, version=version) pyproj/_crs.pyx:457: in pyproj._crs.Base.to_wkt ??? pyproj/_crs.pyx:120: in pyproj._crs._to_wkt ??? pyproj/_crs.pyx:24: in pyproj._crs.cstrdecode ??? _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ cstr = b'`4F\xc1\xbe\x7f' def pystrdecode(cstr): """ Decode a string to a python string. """ try: > return cstr.decode("utf-8") E UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc1 in position 3: invalid start byte /env/lib/python3.8/site-packages/pyproj/compat.py:23: UnicodeDecodeError ----------------------------- Captured stdout call ----------------------------- mask all touch True query_handler self: query_handler time: ('2019-03-05', '2019-07-10') query_handler feature: Geometry({'type': 'Polygon', 'coordinates': (((147.28271484375003, -35.89238773935897), (147.03277587890628, -35.663990911348115), (146.65237426757815, -35.90684930677119), (147.09182739257815, -36.15894422111004), (147.28271484375003, -35.89238773935897)),)}, CRS('EPSG:4326')) process_data self: process_data data: Dimensions: (time: 8, y: 1841, x: 1890) Coordinates: * time (time) datetime64[ns] 2019-03-09T23:56:30.301619 ... 2019-06... * y (y) float64 -3.976e+06 -3.976e+06 ... -4.031e+06 -4.031e+06 * x (x) float64 1.319e+06 1.319e+06 ... 1.376e+06 1.376e+06 spatial_ref int32 3577 Data variables: bs (time, y, x) uint8 dask.array pv (time, y, x) uint8 dask.array npv (time, y, x) uint8 dask.array TCW (time, y, x) float32 dask.array water (time, y, x) int16 dask.array Attributes: crs: EPSG:3577 process_data parameters: {'time': ('2019-03-05', '2019-07-10'), 'feature': Geometry(POLYGON ((147.28271484375 -35.89238773935897, 147.0327758789063 -35.66399091134812, 146.6523742675782 -35.90684930677119, 147.0918273925782 -36.15894422111004, 147.28271484375 -35.89238773935897)), EPSG:4326)} feature in wit Geometry({'type': 'Polygon', 'coordinates': (((147.28271484375003, -35.89238773935897), (147.03277587890628, -35.663990911348115), (146.65237426757815, -35.90684930677119), (147.09182739257815, -36.15894422111004), (147.28271484375003, -35.89238773935897)),)}, CRS('EPSG:4326')) geobox of data GeoBox(Geometry({'type': 'Polygon', 'coordinates': (((146.62347849895943, -35.70259916046566), (146.69357389976432, -36.19700984174693), (147.31910391574698, -36.1370963930901), (147.24608026203015, -35.64305237905194), (146.62347849895943, -35.70259916046566)),)}, CRS('EPSG:4326'))) polygon area 1744720 ```
benjimin commented 2 years ago

Intermittent and definitely not deterministic. On different occurrences, the same error relates to a slightly different byte sequence.

cstr = b' \xf16\\\t\x7f'
...
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xf1 in position 1: invalid continuation byte

I'm not sure if this could be because of some kind of thread race in what dask is doing, or if it simply means there is a memory error (like a buffer overflow or similar corruption).

woodcockr commented 2 years ago

I’ve not looked at the code but proj CRS are not thread safe and you error looks very much like that problem - you can’t create CRS objects and then pass them into a Dask worker. You will need to modify the code to create the CRS object IN the thread using it. I usually pass the CRS epsg code in and then create the CRS in the code running on the worker.

benjimin commented 2 years ago

It seems this was a known symptom of pyproj not being threadsafe (and had impacted other dask applications).

It looks like this was supposed to be mostly fixed in pyproj 3.1.0 earlier this year.

...and looks like the build was using pyproj 2.6.1.

benjimin commented 2 years ago

Now, how to verify a fix of an intermittent problem...

I estimate the fault was previously occurring in around 10% of builds. So could probably force it to occur, by wrapping the pytest invocation in a bash for loop (say try 50 repeats), if further investigation warranted.

Tentatively closing as an upstream issue; build is currently using pyproj 3.2.1.