geopandas / dask-geopandas

Parallel GeoPandas with Dask
https://dask-geopandas.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
488 stars 45 forks source link

OSError: [Errno 22] Invalid argument #222

Open FlorisCalkoen opened 1 year ago

FlorisCalkoen commented 1 year ago

Lately I've been running into his error a few times: OSError: [Errno 22] Invalid argument.

It happens when processing dask_geopandas.GeoDataFrame while having defined a dask.distributed.Client in advance. It doesn't happen when no client is defined, i.e, dask_geopandas.read_file(fp, npartitions=30) straight away, without defining a client. Also, it seems that this error doesn't happen when the cluster is defined as Client(processes=False).

So it seems to be the combination of a dask.distributed.Client with dask_geopandas computations on dataframes that hold geometry data.

It's a bit tricky to provide a reproducable example, as this typically happens with larger datasets. But I think/hope that by describing the problem someone will be able to guide me in the right direction. If you want to reproduce this error, please find a link to the data here.

Commands to download the data ~2GB

mkdir -p ~/tmp
cd ~/tmp
wget -O sayre_coastal_segments.mpk https://rmgsc.cr.usgs.gov/outgoing/ecosystems/Global/USGSEsriGlobalCoastalSegmentsv1.mpk
unar sayre_coastal_segments.mpk
ogr2ogr -f gpkg sayre_coastal_segments.gpkg sayre_coastal_segments/v108/ecujillgeographic.gdb
import pathlib
import dask_geopandas
from dask.distributed import Client

client = Client(threads_per_worker=1, local_directory="/tmp")
print(client)  # <Client: 'tcp://127.0.0.1:54111' processes=10 threads=10, memory=64.00 GiB>

fp = pathlib.Path.home().joinpath("tmp", "sayre_coastal_segments.gpkg")
ddf = dask_geopandas.read_file(fp, npartitions=30)
df = ddf.compute()  # OSError: [Errno 22] Invalid argument 
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
Cell In [5], line 1
----> 1 df = ddf.compute()

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/dask/base.py:315, in DaskMethodsMixin.compute(self, **kwargs)
    291 def compute(self, **kwargs):
    292     """Compute this dask collection
    293 
    294     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    313     dask.base.compute
    314     """
--> 315     (result,) = compute(self, traverse=False, **kwargs)
    316     return result

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/dask/base.py:600, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    597     keys.append(x.__dask_keys__())
    598     postcomputes.append(x.__dask_postcompute__())
--> 600 results = schedule(dsk, keys, **kwargs)
    601 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/client.py:3052, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3050         should_rejoin = False
   3051 try:
-> 3052     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3053 finally:
   3054     for f in futures.values():

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/client.py:2226, in Client.gather(self, futures, errors, direct, asynchronous)
   2224 else:
   2225     local_worker = None
-> 2226 return self.sync(
   2227     self._gather,
   2228     futures,
   2229     errors=errors,
   2230     direct=direct,
   2231     local_worker=local_worker,
   2232     asynchronous=asynchronous,
   2233 )

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/utils.py:338, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    336     return future
    337 else:
--> 338     return sync(
    339         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    340     )

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/utils.py:405, in sync(loop, func, callback_timeout, *args, **kwargs)
    403 if error:
    404     typ, exc, tb = error
--> 405     raise exc.with_traceback(tb)
    406 else:
    407     return result

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/utils.py:378, in sync.<locals>.f()
    376         future = asyncio.wait_for(future, callback_timeout)
    377     future = asyncio.ensure_future(future)
--> 378     result = yield future
    379 except Exception:
    380     error = sys.exc_info()

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/tornado/gen.py:762, in Runner.run(self)
    759 exc_info = None
    761 try:
--> 762     value = future.result()
    763 except Exception:
    764     exc_info = sys.exc_info()

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/client.py:2118, in Client._gather(self, futures, errors, direct, local_worker)
   2116     else:
   2117         self._gather_future = future
-> 2118     response = await future
   2120 if response["status"] == "error":
   2121     log = logger.warning if errors == "raise" else logger.debug

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/client.py:2169, in Client._gather_remote(self, direct, local_worker)
   2166                 response["data"].update(data2)
   2168     else:  # ask scheduler to gather data for us
-> 2169         response = await retry_operation(self.scheduler.gather, keys=keys)
   2171 return response

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/utils_comm.py:383, in retry_operation(coro, operation, *args, **kwargs)
    377 retry_delay_min = parse_timedelta(
    378     dask.config.get("distributed.comm.retry.delay.min"), default="s"
    379 )
    380 retry_delay_max = parse_timedelta(
    381     dask.config.get("distributed.comm.retry.delay.max"), default="s"
    382 )
--> 383 return await retry(
    384     partial(coro, *args, **kwargs),
    385     count=retry_count,
    386     delay_min=retry_delay_min,
    387     delay_max=retry_delay_max,
    388     operation=operation,
    389 )

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/utils_comm.py:368, in retry(coro, count, delay_min, delay_max, jitter_fraction, retry_on_exceptions, operation)
    366             delay *= 1 + random.random() * jitter_fraction
    367         await asyncio.sleep(delay)
--> 368 return await coro()

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/core.py:1154, in PooledRPCCall.__getattr__.<locals>.send_recv_from_rpc(**kwargs)
   1152 prev_name, comm.name = comm.name, "ConnectionPool." + key
   1153 try:
-> 1154     return await send_recv(comm=comm, op=key, **kwargs)
   1155 finally:
   1156     self.pool.reuse(self.addr, comm)

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/core.py:919, in send_recv(comm, reply, serializers, deserializers, **kwargs)
    917 await comm.write(msg, serializers=serializers, on_error="raise")
    918 if reply:
--> 919     response = await comm.read(deserializers=deserializers)
    920 else:
    921     response = None

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/distributed/comm/tcp.py:235, in TCP.read(self, deserializers)
    233         chunk = frames[i:j]
    234         chunk_nbytes = chunk.nbytes
--> 235         n = await stream.read_into(chunk)
    236         assert n == chunk_nbytes, (n, chunk_nbytes)
    237 except StreamClosedError as e:

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/tornado/iostream.py:475, in BaseIOStream.read_into(self, buf, partial)
    472 self._read_partial = partial
    474 try:
--> 475     self._try_inline_read()
    476 except:
    477     future.add_done_callback(lambda f: f.exception())

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/tornado/iostream.py:842, in BaseIOStream._try_inline_read(self)
    840     return
    841 self._check_closed()
--> 842 pos = self._read_to_buffer_loop()
    843 if pos is not None:
    844     self._read_from_buffer(pos)

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/tornado/iostream.py:755, in BaseIOStream._read_to_buffer_loop(self)
    748 next_find_pos = 0
    749 while not self.closed():
    750     # Read from the socket until we get EWOULDBLOCK or equivalent.
    751     # SSL sockets do some internal buffering, and if the data is
    752     # sitting in the SSL object's buffer select() and friends
    753     # can't see it; the only way to find out if it's there is to
    754     # try to read it.
--> 755     if self._read_to_buffer() == 0:
    756         break
    758     # If we've read all the bytes we can use, break out of
    759     # this loop.
    760 
    761     # If we've reached target_bytes, we know we're done.

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/tornado/iostream.py:867, in BaseIOStream._read_to_buffer(self)
    865     else:
    866         buf = bytearray(self.read_chunk_size)
--> 867     bytes_read = self.read_from_fd(buf)
    868 except (socket.error, IOError, OSError) as e:
    869     # ssl.SSLError is a subclass of socket.error
    870     if self._is_connreset(e):
    871         # Treat ECONNRESET as a connection close rather than
    872         # an error to minimize log spam  (the exception will
    873         # be available on self.error for apps that care).

File ~/miniconda3/envs/geo/lib/python3.10/site-packages/tornado/iostream.py:1140, in IOStream.read_from_fd(***failed resolving arguments***)
   1138 def read_from_fd(self, buf: Union[bytearray, memoryview]) -> Optional[int]:
   1139     try:
-> 1140         return self.socket.recv_into(buf, len(buf))
   1141     except BlockingIOError:
   1142         return None

OSError: [Errno 22] Invalid argument