pangeo-forge / pangeo-forge-recipes

Python library for building Pangeo Forge recipes.
https://pangeo-forge.readthedocs.io/
Apache License 2.0
118 stars 54 forks source link

`DetermineSchema` throws `UnicodeEncodeError` if en-dash in ds.attrs #586

Open jbusecke opened 10 months ago

jbusecke commented 10 months ago

@cisaacstern and I just debugged a class of failed CMIP6 recipes and came up with a relatively easy reproducer:

from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
    OpenURLWithFSSpec, OpenWithXarray, DetermineSchema
)
import apache_beam as beam

urls = [
    'https://esgf-data1.llnl.gov/thredds/fileServer/css03_data/CMIP6/CMIP/MPI-M/MPI-ESM1-2-LR/historical/r17i1p1f1/SImon/sifb/gn/v20210901/sifb_SImon_MPI-ESM1-2-LR_historical_r17i1p1f1_gn_199001-200912.nc',
]

pattern = pattern_from_file_sequence(urls,concat_dim='time')
recipe = (
    beam.Create(pattern.items())
        | OpenURLWithFSSpec()
        | OpenWithXarray(xarray_open_kwargs={"use_cftime":True})
        | DetermineSchema(combine_dims = pattern.combine_dim_keys)
               )
with beam.Pipeline() as p:
    p | recipe

gives:

``` /srv/conda/envs/cmip6-leap-feedstock/lib/python3.9/site-packages/pangeo_forge_recipes/openers.py:53: UserWarning: Unknown file type specified without xarray engine, backend engine will be automatically selected by xarray warnings.warn( Traceback (most recent call last): File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 625, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1607, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1720, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 208, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start File "apache_beam/runners/worker/opcounters.py", line 213, in apache_beam.runners.worker.opcounters.OperationCounters.update_from File "apache_beam/runners/worker/opcounters.py", line 265, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample File "apache_beam/coders/coder_impl.py", line 1495, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 1506, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 1055, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 379, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 443, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream File "apache_beam/coders/coder_impl.py", line 443, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream File "apache_beam/coders/coder_impl.py", line 413, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream UnicodeEncodeError: 'utf-8' codec can't encode characters in position 61-63: surrogates not allowed During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/jovyan/AREAS/CMIP6-PGF/reproducer.py", line 24, in p | recipe File "/srv/conda/envs/cmip6-leap-feedstock/lib/python3.9/site-packages/apache_beam/pipeline.py", line 600, in __exit__ self.result = self.run() File "/srv/conda/envs/cmip6-leap-feedstock/lib/python3.9/site-packages/apache_beam/pipeline.py", line 577, in run return self.runner.run_pipeline(self, self._options) File "/srv/conda/envs/cmip6-leap-feedstock/lib/python3.9/site-packages/apache_beam/runners/direct/direct_runner.py", line 129, in run_pipeline return runner.run_pipeline(pipeline, options) File "/srv/conda/envs/cmip6-leap-feedstock/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 202, in run_pipeline self._latest_run_result = self.run_via_runner_api( File "/srv/conda/envs/cmip6-leap-feedstock/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 224, in run_via_runner_api return self.run_stages(stage_context, stages) File "/srv/conda/envs/cmip6-leap-feedstock/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 455, in run_stages bundle_results = self._execute_bundle( File "/srv/conda/envs/cmip6-leap-feedstock/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 783, in _execute_bundle self._run_bundle( File "/srv/conda/envs/cmip6-leap-feedstock/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1012, in _run_bundle result, splits = bundle_manager.process_bundle( File "/srv/conda/envs/cmip6-leap-feedstock/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1348, in process_bundle result_future = self._worker_handler.control_conn.push(process_bundle_req) File "/srv/conda/envs/cmip6-leap-feedstock/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 379, in push response = self.worker.do_instruction(request) File "/srv/conda/envs/cmip6-leap-feedstock/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 625, in do_instruction return getattr(self, request_type)( File "/srv/conda/envs/cmip6-leap-feedstock/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 663, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/srv/conda/envs/cmip6-leap-feedstock/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1040, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/srv/conda/envs/cmip6-leap-feedstock/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 232, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 568, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 570, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 261, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 264, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1425, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1513, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 625, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1607, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1720, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 264, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1425, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1513, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 625, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1607, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1720, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 264, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1425, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1513, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 839, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 983, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1607, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1720, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 264, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1425, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1513, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 839, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 983, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1607, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1720, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 264, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1425, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1533, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 625, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1607, in apache_beam.runners.common._OutputHandler.handle_process_outputs File "apache_beam/runners/common.py", line 1720, in apache_beam.runners.common._OutputHandler._write_value_to_tag File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 208, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start File "apache_beam/runners/worker/opcounters.py", line 213, in apache_beam.runners.worker.opcounters.OperationCounters.update_from File "apache_beam/runners/worker/opcounters.py", line 265, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample File "apache_beam/coders/coder_impl.py", line 1495, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 1506, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 1055, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 379, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 443, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream File "apache_beam/coders/coder_impl.py", line 443, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream File "apache_beam/coders/coder_impl.py", line 413, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream UnicodeEncodeError: 'utf-8 [while running 'Create|OpenURLWithFSSpec|OpenWithXarray|DetermineSchema/DetermineSchema/Map(wrapper)']' codec can't encode characters in position 61-63: surrogates not allowed ```

We narrowed this down to some issue with the dataset attributes, more specifically the reference attribute:

'MPI-ESM: Mauritsen, T. et al. (2019), Developments in the MPI‐M Earth System Model version 1.2 (MPI‐ESM1.2) and Its Response to Increasing CO2, J. Adv. Model. Earth Syst.,11, 998-1038, doi:10.1029/2018MS001400,Mueller, W.A. et al. (2018): A high‐resolution version of the Max Planck Institute Earth System Model MPI‐ESM1.2‐HR. J. Adv. Model. EarthSyst.,10,1383–1413, doi:10.1029/2017MS001217'

There seems to be different types of dashes used here which trip up the encoding.

We concluded this needs a custom sanitizer stage in the CMIP6 recipe, but wanted to leave this issue up if other PGF users came across it.

cisaacstern commented 10 months ago

Here's a way to sanitize en-dashes:

"MPI‐M".encode("utf-8").replace(b"\xe2\x80\x90", b"-").decode()
jbusecke commented 10 months ago

The above did not work, it basically triggered the original error on a different line.

    wrapper = lambda x: [fn(x)]
  File "/home/jovyan/AREAS/CMIP6-PGF/reproducer.py", line 20, in _strip_attrs
    new_value = att_value.encode("utf-8").replace(b"\xe2\x80\x90", b"-").decode()
UnicodeEncodeError: 'utf-8 [while running 'Create|OpenURLWithFSSpec|OpenWithXarray|StripAttrs|DetermineSchema/StripAttrs/Strip Attrs']' codec can't encode characters in position 61-63: surrogates not allowed
Exception ignored in: <function File.close at 0x7f1dca683940>
Traceback (most recent call last):
  File "/srv/conda/envs/cmip6-leap-feedstock/lib/python3.9/site-packages/h5netcdf/core.py", line 1200, in close
  File "/srv/conda/envs/cmip6-leap-feedstock/lib/python3.9/site-packages/h5py/_hl/files.py", line 585, in close
TypeError: bad operand type for unary ~: 'NoneType'

What worked is either

.encode("utf-8", 'ignore').decode()
.encode("utf-8", 'replace').decode()

The latter fills ??? for offending characters. Aside from this triggering some serious cravings for my favorite childhood audiobooks I think in the example above that this sanitized version (with (.., 'replace')):

Sanitized datasets attributes field references: 
 MPI-ESM: Mauritsen, T. et al. (2019), Developments in the MPI‐M Earth System Model version 1.2 (MPI‐ESM1.2) and Its Response to Increasing CO2, J. Adv. Model. Earth Syst.,11, 998-1038, doi:10.1029/2018MS001400,
Mueller, W.A. et al. (2018): A high‐resolution version of the Max Planck Institute Earth System Model MPI‐ESM1.2‐HR. J. Adv. Model. EarthSyst.,10,1383–1413, doi:10.1029/2017MS001217 
 ----> 
 MPI-ESM: Mauritsen, T. et al. (2019), Developments in the MPI???M Earth System Model version 1.2 (MPI???ESM1.2) and Its Response to Increasing CO2, J. Adv. Model. Earth Syst.,11, 998-1038, doi:10.1029/2018MS001400,
Mueller, W.A. et al. (2018): A high???resolution version of the Max Planck Institute Earth System Model MPI???ESM1.2???HR. J. Adv. Model. EarthSyst.,10,1383???1413, doi:10.1029/2017MS001217

more clearly indicates that things were replaced than this (with(..., 'ignore'):

Sanitized datasets attributes field references: 
 MPI-ESM: Mauritsen, T. et al. (2019), Developments in the MPI‐M Earth System Model version 1.2 (MPI‐ESM1.2) and Its Response to Increasing CO2, J. Adv. Model. Earth Syst.,11, 998-1038, doi:10.1029/2018MS001400,
Mueller, W.A. et al. (2018): A high‐resolution version of the Max Planck Institute Earth System Model MPI‐ESM1.2‐HR. J. Adv. Model. EarthSyst.,10,1383–1413, doi:10.1029/2017MS001217 
 ----> 
 MPI-ESM: Mauritsen, T. et al. (2019), Developments in the MPIM Earth System Model version 1.2 (MPIESM1.2) and Its Response to Increasing CO2, J. Adv. Model. Earth Syst.,11, 998-1038, doi:10.1029/2018MS001400,
Mueller, W.A. et al. (2018): A highresolution version of the Max Planck Institute Earth System Model MPIESM1.2HR. J. Adv. Model. EarthSyst.,10,13831413, doi:10.1029/2017MS001217
keewis commented 10 months ago

you've got two different versions of dash that make use of surrogates: b"\xe2\x80\x90" and b"\xe2\x80\x93". To replace both, you'd have to:

s = "..."
s.encode("utf-8").replace(b"\xe2\x80\x90", b"-").replace(b"\xe2\x80\x93", b"-").decode("utf-8")

This also works for me:

s.replace("‐", "-").replace("–", "-")

(use .encode("utf-8") to verify that any multi-byte characters are gone)

Edit: here's some more details of what those three bytes mean, in case you're interested (i.e. the encoding rules for UTF-8). If we look at the binary representation of the first byte (\xe2), we get 11100010, where the number of set bits at the beginning followed by a 0 tells us the number of total bytes (the only exception is 1 byte which starts with just 0 to be compatible with ascii – the 10 is the data byte prefix, see below). In other words: nbytes prefix data bits
1 0 7
2 110 5
3 1110 4
4 11110 3

In this case the prefix is 1110, so three bytes, leaving us with 0010 as data. The other bytes always start with 10, so after removing that from \x80 (10000000) we get 000000, and from \x90 we get 010000. That gives us 0010000000010000, or U+2010, which is the unicode code point for "hyphen". b"\xe2\x80\x93" is then U+2013, or "en dash".

cisaacstern commented 10 months ago

(Deleted last comment because I realized you were saying that there are two forms of the dash that use surrogates!)

jbusecke commented 10 months ago

I think for now I am ok with replacing all weird characters with ??? (I hope I am understanding my solution up top correctly) in my use-case, but this is super helpful @keewis. @cisaacstern I wonder if this is something for -recipes to take care of properly? Maybe I am just too tired to think about string en/decoding rn and will pick that up again tomorrow, haha.