Open konrad-roze opened 7 years ago
I'm not able to reproduce with this simple example:
In [1]: import dask.dataframe as dd
In [2]: from distributed import Client
In [4]: with open("file.csv", "w") as f: f.write("A,B\n1,2\n3,4\n5,6")
In [5]: client = Client()
In [6]: df = dd.read_csv("s3://dask-data/airline-data/1987.csv", storage_options={'anon': True})
In [7]: client.publish_dataset(ds_name=df)
In [9]: ds = client.get_dataset('ds_name')
In [10]: ds.compute()
Out[10]:
A B
0 1 2
1 3 4
2 5 6
Could you try adapting that example until you reproduce the failure?
file.csv
for your real file.You can reproduce it on a local cluster, but you need to load CSV data from S3.
So the minimal difference in your case would be
df = dd.read_csv(_some_s3_url_)
Thanks, that's valuable information. I've updated the example.
Same issue here. Any update?
It would be useful to see a full traceback from a minimal example
A simple example to read from a s3 file and persist on cluster. We use server-side encryption for the s3 bucket.
client = Client(scheduler_file=get_dask_scheduler_file())
boto_session = session.Session()
boto_session = configure_session(boto_session, credential)
df = dd.read_csv('s3://data.csv', storage_options={'botocore_session': boto_session})
df = client.persist(df)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/distributed/client.py", line 2168, in persist
loose_restrictions, resources=resources)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/distributed/client.py", line 1906, in _graph_to_futures
'tasks': valmap(dumps_task, dsk3),
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/toolz-0.8.2-py2.7.egg/toolz/dicttoolz.py", line 84, in valmap
rv.update(zip(iterkeys(d), map(func, itervalues(d))))
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/distributed/worker.py", line 731, in dumps_task
'args': pickle.dumps(task[1:])}
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/distributed/protocol/pickle.py", line 51, in dumps
return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 829, in dumps
cp.dump(obj)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 233, in dump
return Pickler.dump(self, obj)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 568, in save_tuple
save(element)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 727, in save_reduce
save(state)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 564, in save_instancemethod
obj=obj)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 709, in save_reduce
save(args)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 554, in save_tuple
save(element)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 727, in save_reduce
save(state)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 692, in _batch_setitems
save(v)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 727, in save_reduce
save(state)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 727, in save_reduce
save(state)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 564, in save_instancemethod
obj=obj)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 709, in save_reduce
save(args)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 554, in save_tuple
save(element)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 360, in save_function
self.save_function_tuple(obj)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 436, in save_function_tuple
save(f_globals)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 727, in save_reduce
save(state)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 727, in save_reduce
save(state)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 642, in _batch_appends
save(tmp[0])
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 727, in save_reduce
save(state)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/lib/python2.7/site-packages/cloudpickle/cloudpickle.py", line 727, in save_reduce
save(state)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/home/renjie/workplace/renjie-mac-topline-distributed/env/ToplineDistributedComputing-1.0/runtime/python2.7/lib/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects
My first guess is that the boto_session
object is finding its way into the task but that that object isn't serializable.
I can try other ways to pass the credentials. But this is the recommended way inside our company. Do you think dask SerializableLock can help? I see you have solved similar issues for read_hdf().
My guess is that the boto library has a lock in it somewhere that we're not going to be able to touch. You could ask them upstream, but they'll probably say "why would you want to pass around session objects? This may be unsafe"
Alternatively, what I often see in production is that some other mechanism is used to manage security so that when workers go to grab the default credentials they already have them automatically. For example maybe environment variables or .boto files are pre-populated. Moving around credentials within a computational framework (like dask, hadoop, spark, ...) is sometimes considered unsafe.
Thanks for the suggestions! Currently we are still in the early stage for proof of concept. Will think about other ways for this credential thing.
I seem to encounter the same problem using a dataset published using read_parquet()
on s3 files, configuring credentials through env variables or key/secret param in storage_options
. It works before publishing.
We're using dask distributed scheduler with multiprocessing workers on an EC2 cluster. dask 0.15.4 and distributed 1.19.3
I'm trying to publish named dataset (dataframe) and then retrieve and continue working on it. Basically:
This results in
'TypeError: can't pickle thread.lock objects'
error.I suppose this might be related to: https://github.com/dask/distributed/issues/780 https://github.com/dask/dask/issues/1683 https://github.com/dask/distributed/issues/539
I don't know how to work around this issue because read_csv() doesn't seem to accept lock argument.
full traceback: traceback.txt