IN-CORE / pyincore

pyIncore is a component of IN-CORE. It is a python package consisting of two primary components: 1) a set of service classes to interact with the IN-CORE web services, and 2) IN-CORE analyses . The pyIncore allows users to apply various hazards to infrastructure in selected areas, propagating the effect of physical infrastructure damage and loss of functionality to social and economic impacts.
Mozilla Public License 2.0
25 stars 7 forks source link

Raster io doesn't work with multiprocessor #512

Open longshuicy opened 7 months ago

longshuicy commented 7 months ago
File <stringsource>:2, in rasterio._io.DatasetReaderBase.__reduce_cython__()
TypeError: self._hds cannot be converted to a Python object for pickling

Complete error see below comments

To reproduce the error try this notebook box link: https://uofi.app.box.com/folder/252253923688


Might related to rasterio open with multiprocess
https://github.com/IN-CORE/pyincore/blob/3c2eca45a99aab0e446e19061aa86cbf249e2ed9/pyincore/dataset.py#L221

Similar issues: https://github.com/rasterio/rasterio/issues/1731 https://github.com/dymaxionlabs/dask-rasterio/issues/3

longshuicy commented 7 months ago
---------------------------------------------------------------------------
_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/Users/cwang138/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/multiprocessing/queues.py", line 264, in _feed
    obj = _ForkingPickler.dumps(obj)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/cwang138/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "<stringsource>", line 2, in rasterio._io.DatasetReaderBase.__reduce_cython__
TypeError: self._hds cannot be converted to a Python object for pickling
"""

The above exception was the direct cause of the following exception:

TypeError                                 Traceback (most recent call last)
Cell In[6], line 10
      8 w_bldg_dmg.set_input_hazard("hazard", hurricane)
      9 w_bldg_dmg.set_parameter("num_cpu", 1)
---> 10 w_bldg_dmg.run_analysis()

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/site-packages/pyincore/baseanalysis.py:400, in BaseAnalysis.run_analysis(self)
    397         print("Error reading parameter: " + result[1])
    398         return result
--> 400 return self.run()

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/site-packages/pyincore/analyses/buildingdamage/buildingdamage.py:87, in BuildingDamage.run(self)
     84     inventory_args.append(inventory_list[count:count + avg_bulk_input_size])
     85     count += avg_bulk_input_size
---> 87 (ds_results, damage_results) = self.building_damage_concurrent_future(self.building_damage_analysis_bulk_input,
     88                                                                       num_workers,
     89                                                                       inventory_args)
     91 self.set_result_csv_data("ds_result", ds_results, name=self.get_parameter("result_name"))
     92 self.set_result_json_data("damage_result",
     93                           damage_results,
     94                           name=self.get_parameter("result_name") + "_additional_info")

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/site-packages/pyincore/analyses/buildingdamage/buildingdamage.py:117, in BuildingDamage.building_damage_concurrent_future(self, function_name, parallelism, *args)
    115 output_dmg = []
    116 with concurrent.futures.ProcessPoolExecutor(max_workers=parallelism) as executor:
--> 117     for ret1, ret2 in executor.map(function_name, *args):
    118         output_ds.extend(ret1)
    119         output_dmg.extend(ret2)

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/concurrent/futures/process.py:642, in _chain_from_iterable_of_lists(iterable)
    636 def _chain_from_iterable_of_lists(iterable):
    637     """
    638     Specialized implementation of itertools.chain.from_iterable.
    639     Each item in *iterable* should be a list.  This function is
    640     careful not to keep references to yielded objects.
    641     """
--> 642     for element in iterable:
    643         element.reverse()
    644         while element:

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/concurrent/futures/_base.py:619, in Executor.map.<locals>.result_iterator()
    616 while fs:
    617     # Careful not to keep a reference to the popped future
    618     if timeout is None:
--> 619         yield _result_or_cancel(fs.pop())
    620     else:
    621         yield _result_or_cancel(fs.pop(), end_time - time.monotonic())

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/concurrent/futures/_base.py:317, in _result_or_cancel(***failed resolving arguments***)
    315 try:
    316     try:
--> 317         return fut.result(timeout)
    318     finally:
    319         fut.cancel()

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/concurrent/futures/_base.py:456, in Future.result(self, timeout)
    454     raise CancelledError()
    455 elif self._state == FINISHED:
--> 456     return self.__get_result()
    457 else:
    458     raise TimeoutError()

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/concurrent/futures/_base.py:401, in Future.__get_result(self)
    399 if self._exception:
    400     try:
--> 401         raise self._exception
    402     finally:
    403         # Break a reference cycle with the exception in self._exception
    404         self = None

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/multiprocessing/queues.py:264, in Queue._feed(buffer, notempty, send_bytes, writelock, reader_close, writer_close, ignore_epipe, onerror, queue_sem)
    261     return
    263 # serialize the data before acquiring the lock
--> 264 obj = _ForkingPickler.dumps(obj)
    265 if wacquire is None:
    266     send_bytes(obj)

File ~/opt/miniconda3/envs/incore1.17.0rc1/lib/python3.12/multiprocessing/reduction.py:51, in ForkingPickler.dumps(cls, obj, protocol)
     48 @classmethod
     49 def dumps(cls, obj, protocol=None):
     50     buf = io.BytesIO()
---> 51     cls(buf, protocol).dump(obj)
     52     return buf.getbuffer()

File <stringsource>:2, in rasterio._io.DatasetReaderBase.__reduce_cython__()

TypeError: self._hds cannot be converted to a Python object for pickling
longshuicy commented 7 months ago
# Raster files' path
# Specify the directory and file name for the new JSON file
hurricane = Hurricane.from_json_file(destination_file_path)
hurricane.hazardDatasets[0].from_file((os.path.join(raster_path, "max_wave_height.tif")),
                                      data_type="ncsa:deterministicHurricaneRaster")
hurricane.hazardDatasets[1].from_file(os.path.join(raster_path, "inundation_duration.tif"),
                                      data_type="ncsa:deterministicHurricaneRaster")
hurricane.hazardDatasets[2].from_file(os.path.join(raster_path, "max_surge_dept.tif"),
                                      data_type="ncsa:deterministicHurricaneRaster")
hurricane.hazardDatasets[3].from_file(os.path.join(raster_path, "wave_direction.tif"),
                                      data_type="ncsa:deterministicHurricaneRaster")
hurricane.hazardDatasets[4].from_file(os.path.join(raster_path, "max_wave_velocity.tif"),
                                      data_type="ncsa:deterministicHurricaneRaster")
hurricane.hazardDatasets[5].from_file(os.path.join(raster_path, "max_wind_velocity.tif"),
                                      data_type="ncsa:deterministicHurricaneRaster")

bldg_dataset_id = "63ff6b135c35c0353d5ed3ac"  # island

bldg_dmg = BuildingDamage(client)
bldg_dmg.load_remote_input_dataset("buildings", bldg_dataset_id)

# Hurricane building mapping (with equation)
mapping_id = "62fef3a6cef2881193f2261d"
fragility_service = FragilityService(client)
mapping_set = MappingSet(fragility_service.get_mapping(mapping_id))
bldg_dmg.set_input_dataset('dfr3_mapping_set', mapping_set)
bldg_dmg.set_input_hazard("hazard", hurricane)
bldg_dmg.set_parameter("result_name", "galveston_local_hurr_dmg_result")
bldg_dmg.set_parameter("num_cpu", 4)
bldg_dmg.run_analysis()# Raster files' path

this actually works... not sure what was wrong
longshuicy commented 4 months ago

I found "why" it is not running... there is a problem with the function earthquake.read_hazard_values(), I assume it occurs exactly with earthquake.read_local_raster_hazard_values(). (I have not gone to check in detail why) If you create a local hazard and avoid calling this function, then you'll be able to run your damage analysis. However, if you call the earthquake.read_hazard_values() function at any point, and after that you try to run the analysis you receive the error self._hds cannot be converted to a Python object for pickling from rasterio._io.DatasetReaderBase.__reduce_cython__() (I am glad the problem is not with how rasterio reads the raster or it would be more difficult to solve as you commented in the other thread!). Hope it helps for the issue https://github.com/IN-CORE/pyincore/issues/512!