Closed fbriol closed 4 years ago
Do you have any more information on "sometimes?"
If I access a 5/10 files the processing works. If I try to read all the data one of the workers always crashes.
Didn't understand, what are the two cases there? How are you authenticating in this case? Is this happening in the client process or workers?
The difference between the two cases is the number of parquet files handled by dask. The files to be processed are filtered by pyarrow according to date, one file per month. If I process a small number of files (a dozen for one year) the processing works. If I process several years (in this case I have to process more than a hundred files), the processing starts and after a while, the reading of one of the files to be processed raises an error.
@martindurant do you think some of the work you've done on thread safety recently may resolve this?
@fbriol any info you can provide to make this easier to isolate would be welcome (though understood that this is difficult). Are you using this from within Dask or xarray, or are you using gcsfs directly?
do you think some of the work you've done
Hope so! Specifically, the issue was around reconnections at the time of deserialisation.
Just to add some infos. I have a similar problem (Dask 2.6.0) where the job crash with this exact same error when a I try to access AVRO files directly from a gcs:// path. I don't have this problem with local data. Could it be related to some credentials/token expiring during a long loading ?
Is there a quick fix for this ?
@mlaprise , please try using gcsfs and fsspec from master (on your client and workers too)
ok I just checked and I didn't had the latest version of gcsfs & fsspec. So I installed the latest version (0.3.2, 0.5.2)... same problem. I'll try installing directly the master version
Same problem with the master version. The weird thing is that it's only happening with multiple files (gs://foo/*.avro). With a single file as input (gs://foo/00000.avro) it's working fine. Also, IRC the latest version I used (2.3.0) didn't had this issue
which scheduler are you using?
I'm using the distributed scheduler on a single big instance. I don't pass any token to the client, instead I let the client pick the credential from the JSON defined in the GOOGLE_APPLICATION_CREDENTIALS env. var. Here's a typical traceback:
Traceback (most recent call last): File "scripts/preprocess_graph_dataset.py", line 138, in
edgelist_df.to_parquet(os.path.join(output_path, 'edgelist.parquet')) File "/opt/conda/lib/python3.7/site-packages/dask/dataframe/core.py", line 3629, in to_parquet return to_parquet(self, path, *args, kwargs) File "/opt/conda/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py", line 527, in to_parquet out = out.compute() File "/opt/conda/lib/python3.7/site-packages/dask/base.py", line 165, in compute (result,) = compute(self, traverse=False, kwargs) File "/opt/conda/lib/python3.7/site-packages/dask/base.py", line 436, in compute results = schedule(dsk, keys, kwargs) File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 2545, in get results = self.gather(packed, asynchronous=asynchronous, direct=direct) File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 1845, in gather asynchronous=asynchronous, File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 762, in sync self.loop, func, *args, callback_timeout=callback_timeout, *kwargs File "/opt/conda/lib/python3.7/site-packages/distributed/utils.py", line 333, in sync raise exc.with_traceback(tb) File "/opt/conda/lib/python3.7/site-packages/distributed/utils.py", line 317, in f result[0] = yield future File "/opt/conda/lib/python3.7/site-packages/tornado/gen.py", line 735, in run value = future.result() File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 1701, in _gather raise exception.with_traceback(traceback) File "/opt/conda/lib/python3.7/site-packages/dask/utils.py", line 29, in apply return func(args, kwargs) File "/opt/conda/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 480, in write_partition fil, df, fmd.schema, compression=compression, fmd=fmd File "/opt/conda/lib/python3.7/site-packages/fastparquet/writer.py", line 645, in make_part_file f.write(MARKER) File "/opt/conda/lib/python3.7/site-packages/fsspec/spec.py", line 1161, in exit self.close() File "/opt/conda/lib/python3.7/site-packages/fsspec/spec.py", line 1129, in close self.flush(force=True) File "/opt/conda/lib/python3.7/site-packages/fsspec/spec.py", line 1004, in flush self._initiate_upload() File "</opt/conda/lib/python3.7/site-packages/decorator.py:decorator-gen-19>", line 2, in _initiate_upload File "/opt/conda/lib/python3.7/site-packages/gcsfs/core.py", line 54, in _tracemethod return f(self, *args, *kwargs) File "/opt/conda/lib/python3.7/site-packages/gcsfs/core.py", line 1009, in _initiate_upload json={'name': self.key, 'metadata': self.metadata}) File "</opt/conda/lib/python3.7/site-packages/decorator.py:decorator-gen-2>", line 2, in _call File "/opt/conda/lib/python3.7/site-packages/gcsfs/core.py", line 54, in _tracemethod return f(self, args, **kwargs) File "/opt/conda/lib/python3.7/site-packages/gcsfs/core.py", line 463, in _call params=kwargs, json=json, headers=headers, data=data, timeout=self.requests_timeout) File "/opt/conda/lib/python3.7/site-packages/google/auth/transport/requests.py", line 204, in request self.credentials.before_request( AttributeError: 'AuthorizedSession' object has no attribute 'credentials'
ok, so I just did a test with the "Local Processes" scheduler instead and it didn't encountered the problem. An other thing I noticed is that pickling the fs.session.credentials is not possible:
import gcsfs
import pickle
fs = gcsfs.GCSFileSystem(project=settings.GCP_PROJECT_ID)
pickle.dumps(fs.session.credentials)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-13-cf0a51346f35> in <module>
----> 1 pickle.dumps(fs.session.credentials)
TypeError: can't pickle CompiledFFI objects
Dunno, if the session credential is serialized and sent to the worker, it might be related.
Would you mind trying for one file, and then for multiple files in the same session (using threading)? It would be interesting to see if that works, and what the contents of GCSFileSystem._cache are at each stage.
You have a binder available here to read the data. To reproduce the problem, increases the time period.
Sorry, there are two notebooks and quite a lot of code there - what should I run to reproduce? Perhaps you could make a pared-down example?
fwiw I was seeing this on gcsfs master and fsspec==0.5.2 but no longer see it after upgrading fsspec to master...but if it might be random then that's probably not very informative 🤷♂
To reproduce the problem. You run the following notebook "analysis.ipynb" on binder. You have to execute the 11 cells of the notebook.
The first cell is used to correct the problem encountered. You pushed this modification on the master branch, so this code is useless for future versions.
Then you execute all the cells up to cell 8. On cell 9, you change the start year 2018 by 2005 :
ddf = get_dask_dataframe( path, datetime.date(2005, 1, 1), datetime.date(2018, 2, 1))
Then you execute cells 9, 10 and 11.
I am currently AFK, so it is impossible to do better at the moment. If that is not enough, I will make a more compact example next week.
I ran to cell 11 without your modified code and got no error :|
Increases the size of the interval (cell 9):
ddf = get_dask_dataframe( path, datetime.date(1995, 1, 1), datetime.date(2018, 2, 1))
That runs successfully too! Let's reconnect next week.
Interesting, I had this intuition that it might be some race condition issue. I tried the master with the new fix and I'm still having the same issue. The traceback is different however, might be useful:
Exception: AttributeError("'AuthorizedSession' object has no attribute 'credentials'")
/src/gcsfs/gcsfs/core.py:266: DeprecationWarning: invalid escape sequence \A
"""
Traceback (most recent call last):
File "scripts/preprocess_graph_dataset.py", line 136, in <module>
pvs_bag = bag.read_avro(pvs_dataset_path).filter(lambda r: len(r['uid']) > 20)
File "/opt/conda/lib/python3.7/site-packages/dask/bag/avro.py", line 105, in read_avro
out = compute(*[dhead(fs, path, compression) for path in paths])
File "/opt/conda/lib/python3.7/site-packages/dask/base.py", line 436, in compute
results = schedule(dsk, keys, **kwargs)
File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 2545, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 1845, in gather
asynchronous=asynchronous,
File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 762, in sync
self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
File "/opt/conda/lib/python3.7/site-packages/distributed/utils.py", line 333, in sync
raise exc.with_traceback(tb)
File "/opt/conda/lib/python3.7/site-packages/distributed/utils.py", line 317, in f
result[0] = yield future
File "/opt/conda/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 1701, in _gather
raise exception.with_traceback(traceback)
File "/opt/conda/lib/python3.7/site-packages/dask/bag/avro.py", line 63, in open_head
with OpenFile(fs, path, compression=compression) as f:
File "/opt/conda/lib/python3.7/site-packages/fsspec/core.py", line 88, in __enter__
f = self.fs.open(self.path, mode=mode)
File "/opt/conda/lib/python3.7/site-packages/fsspec/spec.py", line 708, in open
path, mode=mode, block_size=block_size, autocommit=ac, **kwargs
File "</opt/conda/lib/python3.7/site-packages/decorator.py:decorator-gen-17>", line 2, in _open
File "/src/gcsfs/gcsfs/core.py", line 54, in _tracemethod
return f(self, *args, **kwargs)
File "/src/gcsfs/gcsfs/core.py", line 871, in _open
**kwargs)
File "/src/gcsfs/gcsfs/core.py", line 907, in __init__
**kwargs)
File "/opt/conda/lib/python3.7/site-packages/fsspec/spec.py", line 875, in __init__
self.details = fs.info(path)
File "/opt/conda/lib/python3.7/site-packages/fsspec/spec.py", line 504, in info
out = self.ls(self._parent(path), detail=True, **kwargs)
File "</opt/conda/lib/python3.7/site-packages/decorator.py:decorator-gen-11>", line 2, in ls
File "/src/gcsfs/gcsfs/core.py", line 54, in _tracemethod
return f(self, *args, **kwargs)
File "/src/gcsfs/gcsfs/core.py", line 700, in ls
combined_listing = self._ls(path, detail) + self._ls(path + "/",
File "</opt/conda/lib/python3.7/site-packages/decorator.py:decorator-gen-12>", line 2, in _ls
File "/src/gcsfs/gcsfs/core.py", line 54, in _tracemethod
return f(self, *args, **kwargs)
File "/src/gcsfs/gcsfs/core.py", line 712, in _ls
listing = self._list_objects(path)
File "</opt/conda/lib/python3.7/site-packages/decorator.py:decorator-gen-5>", line 2, in _list_objects
File "/src/gcsfs/gcsfs/core.py", line 54, in _tracemethod
return f(self, *args, **kwargs)
File "/src/gcsfs/gcsfs/core.py", line 553, in _list_objects
listing = self._do_list_objects(path)
File "</opt/conda/lib/python3.7/site-packages/decorator.py:decorator-gen-6>", line 2, in _do_list_objects
File "/src/gcsfs/gcsfs/core.py", line 54, in _tracemethod
return f(self, *args, **kwargs)
File "/src/gcsfs/gcsfs/core.py", line 569, in _do_list_objects
delimiter="/", prefix=prefix, maxResults=max_results
File "</opt/conda/lib/python3.7/site-packages/decorator.py:decorator-gen-2>", line 2, in _call
File "/src/gcsfs/gcsfs/core.py", line 54, in _tracemethod
return f(self, *args, **kwargs)
File "/src/gcsfs/gcsfs/core.py", line 463, in _call
params=kwargs, json=json, headers=headers, data=data, timeout=self.requests_timeout)
File "/opt/conda/lib/python3.7/site-packages/google/auth/transport/requests.py", line 211, in request
self.credentials.before_request(
AttributeError: 'AuthorizedSession' object has no attribute 'credentials'
Task exception was never retrieved
future: <Task finished coro=<BaseTCPConnector.connect() done, defined at /opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py:341> exception=CommClosedError('in <distributed.comm.tcp.TCPConnector object at 0x7fc6a7d0a6a0>: ConnectionRefusedError: [Errno 111] Connection refused')>
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 348, in connect
ip, port, max_buffer_size=MAX_BUFFER_SIZE, **kwargs
File "/opt/conda/lib/python3.7/site-packages/tornado/tcpclient.py", line 280, in connect
af, addr, stream = await connector.start(connect_timeout=timeout)
File "/opt/conda/lib/python3.7/site-packages/tornado/tcpclient.py", line 143, in on_connect_done
stream = future.result()
tornado.iostream.StreamClosedError: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 360, in connect
convert_stream_closed_error(self, e)
File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 130, in convert_stream_closed_error
raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
distributed.comm.core.CommClosedError: in <distributed.comm.tcp.TCPConnector object at 0x7fc6a7d0a6a0>: ConnectionRefusedError: [Errno 111] Connection refused
@mlaprise you tried with gcsfs & fsspec master?
Note, I think that this will also be solved by https://github.com/intake/filesystem_spec/pull/181, but I need to verify that.
I still get this error ( AttributeError: 'AuthorizedSession' object has no attribute 'credentials'
) with the latest releases, i.e. dask 2.7.0 and gcsfs 0.3.1.
Maybe this issue could be reopened, if a few users are still experiencing it? (Was it closed unintentionally by @martindurant's comment Maybe Fixes #180
in the merged PR #192, or because it couldn't be reproduced?)
I'm reasonably confident that this was fixed by https://github.com/intake/filesystem_spec/pull/181. Trying out @fbriol's binder to try to confirm.
https://github.com/intake/filesystem_spec/pull/181 will be in a release later today or tomorrow.
Ok, I was able to confirm that things are fixed on master, at least for @fbriol's example reading from gcs with pyarrow.
I was able to reproduce the ArrowIOError: Arrow error: IOError: 'AuthorizedSession' object has no attribute 'credentials'
with the versions in the image (I think gcsfs 0.3.0 and fsspec 0.5.2).
Upgraded to master on the client and workers, and I can no longer reproduce.
I'll do some releases later today.
Might be good to also ask the S3 crowd (e.g., @birdsarah ) if all is now well for them too.
I still have the same problem. I updated my repo, just run it to reproduce the error. I hope I have taken the correct version of the corrected software.
@fbriol do you mean in https://github.com/fbriol/pangeo-argo/commit/0c9b4521bda5c9e23f7bf9aa80c61d5ac6373175? That's not getting gcsfs and fsspec master AFAICT.
In my experiment, I manually updated the versions on the client and workers with
def update():
subprocess.call(['pip', 'install', '-U', ...])
but that's fragile. We'll have releases later today, and I'll look into updating pangeo-stacks as well.
I hope I have taken the correct version
Since this is just merged, you will require a pip install of gcsfs and fsspec from github.
pangeo-stacks doesn't pin fsspec or gcsfs, so this will be available via pangeo stacks in the next update.
Okay, thank you very much.
I no longer get the error with fsspec 0.6.0. Many thanks!
Sometimes when reading parquet files I get this error: