scikit-hep / uproot5

ROOT I/O in pure Python and NumPy.
https://uproot.readthedocs.io
BSD 3-Clause "New" or "Revised" License
239 stars 76 forks source link

With dask cluster uproot.dask breaks #1102

Closed ivukotic closed 10 months ago

ivukotic commented 10 months ago

I can get a few branches read in using uproot.dask no problem. As long as I don't have a dask cluster. With a distributed dask cluster it breaks. Here the simplest reproducible example. File is already cached so no need for authentication.

from dask.distributed import Client
client = Client()

import dask_awkward as dak
import uproot

def my_name_filter(name):
    return name in [
        "AnalysisElectronsAuxDyn.pt",
        "AnalysisElectronsAuxDyn.eta",
        "AnalysisElectronsAuxDyn.phi",
        "AnalysisElectronsAuxDyn.m",
    ]

tree = uproot.dask(
[{'root://xcache.af.uchicago.edu:1094//root://fax.mwt2.org:1094//pnfs/uchicago.edu/atlaslocalgroupdisk/rucio/mc20_13TeV/c3/e9/DAOD_PHYSLITE.34869306._000001.pool.root.1': 'CollectionTree'}],
    filter_name=my_name_filter
)

ak_arr = tree.compute()
ak_arr.show()

Here error:

2024-01-25 20:16:57,382 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7fc1c118b550>
 0. from-uproot-8587817a131b75a5ffe8abc755185aa3
>.
Traceback (most recent call last):
  File "/venv/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
TypeError: cannot pickle '_thread.lock' object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/venv/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 68, in dumps
    pickler.dump(x)
TypeError: cannot pickle '_thread.lock' object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/venv/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 81, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "/venv/lib/python3.9/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
TypeError: cannot pickle '_thread.lock' object
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File /venv/lib/python3.9/site-packages/distributed/protocol/pickle.py:63, in dumps(x, buffer_callback, protocol)
     62 try:
---> 63     result = pickle.dumps(x, **dump_kwargs)
     64 except Exception:

TypeError: cannot pickle '_thread.lock' object

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
File /venv/lib/python3.9/site-packages/distributed/protocol/pickle.py:68, in dumps(x, buffer_callback, protocol)
     67 buffers.clear()
---> 68 pickler.dump(x)
     69 result = f.getvalue()

TypeError: cannot pickle '_thread.lock' object

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
File /venv/lib/python3.9/site-packages/distributed/protocol/serialize.py:353, in serialize(x, serializers, on_error, context, iterate_collection)
    352 try:
--> 353     header, frames = dumps(x, context=context) if wants_context else dumps(x)
    354     header["serializer"] = name

File /venv/lib/python3.9/site-packages/distributed/protocol/serialize.py:76, in pickle_dumps(x, context)
     74     writeable.append(not f.readonly)
---> 76 frames[0] = pickle.dumps(
     77     x,
     78     buffer_callback=buffer_callback,
     79     protocol=context.get("pickle-protocol", None) if context else None,
     80 )
     81 header = {
     82     "serializer": "pickle",
     83     "writeable": tuple(writeable),
     84 }

File /venv/lib/python3.9/site-packages/distributed/protocol/pickle.py:81, in dumps(x, buffer_callback, protocol)
     80     buffers.clear()
---> 81     result = cloudpickle.dumps(x, **dump_kwargs)
     82 except Exception:

File /venv/lib/python3.9/site-packages/cloudpickle/cloudpickle.py:1479, in dumps(obj, protocol, buffer_callback)
   1478 cp = Pickler(file, protocol=protocol, buffer_callback=buffer_callback)
-> 1479 cp.dump(obj)
   1480 return file.getvalue()

File /venv/lib/python3.9/site-packages/cloudpickle/cloudpickle.py:1245, in Pickler.dump(self, obj)
   1244 try:
-> 1245     return super().dump(obj)
   1246 except RuntimeError as e:

TypeError: cannot pickle '_thread.lock' object

The above exception was the direct cause of the following exception:

TypeError                                 Traceback (most recent call last)
Cell In[3], line 16
      3     return name in [
      4         "AnalysisElectronsAuxDyn.pt",
      5         "AnalysisElectronsAuxDyn.eta",
      6         "AnalysisElectronsAuxDyn.phi",
      7         "AnalysisElectronsAuxDyn.m",
      8     ]
     10 tree = uproot.dask(
     11     [{'root://xcache.af.uchicago.edu:1094//root://fax.mwt2.org:1094//pnfs/uchicago.edu/atlaslocalgroupdisk/rucio/mc20_13TeV/c3/e9/DAOD_PHYSLITE.34869306._000001.pool.root.1': 'CollectionTree'}],
     12     filter_name=my_name_filter,
     13     # library='ak'
     14 )
---> 16 ak_arr = tree.compute()
     17 ak_arr.show()

File /venv/lib/python3.9/site-packages/dask/base.py:342, in DaskMethodsMixin.compute(self, **kwargs)
    318 def compute(self, **kwargs):
    319     """Compute this dask collection
    320 
    321     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    340     dask.compute
    341     """
--> 342     (result,) = compute(self, traverse=False, **kwargs)
    343     return result

File /venv/lib/python3.9/site-packages/dask/base.py:628, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    625     postcomputes.append(x.__dask_postcompute__())
    627 with shorten_traceback():
--> 628     results = schedule(dsk, keys, **kwargs)
    630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /venv/lib/python3.9/site-packages/distributed/protocol/serialize.py:379, in serialize(x, serializers, on_error, context, iterate_collection)
    377     except Exception:
    378         raise TypeError(msg) from exc
--> 379     raise TypeError(msg, str_x) from exc
    380 else:  # pragma: nocover
    381     raise ValueError(f"{on_error=}; expected 'message' or 'raise'")

TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x7fc1c118b550>\n 0. from-uproot-8587817a131b75a5ffe8abc755185aa3\n>')
jpivarski commented 10 months ago

@martindurant (and @douglasdavis), should this be a dask-awkward issue? (I don't have permissions to transfer it.)

martindurant commented 10 months ago

It says that the graph contains a lock object. That must have been introduced in the from_map call that I think uproot uses - perhaps proxying an open file? Printing out the details within the graph or translating it to a low-level-graph might help.

jpivarski commented 10 months ago

It says that the graph contains a lock object.

Uh oh, I missed that. (I was looking at the bottom of the stack trace.) I'll figure out what that lock is for.

martindurant commented 10 months ago

I wish we had a better tool to tell you where in the object the unpicklable thing lives

jpivarski commented 10 months ago

There's no issue in a random ROOT file,

>>> import uproot, skhep_testdata
>>> from dask.distributed import Client
>>> client = Client()
>>> 
>>> a = uproot.dask({skhep_testdata.data_path("uproot-HZZ.root"): "events"})
>>> a.compute()
<Array [{NJet: 0, Jet_Px: [], ...}, ..., {...}] type='2421 * {NJet: int32, ...'>

so it must be something special in DAOD_PHYSLITE. Yes, it is.

jpivarski commented 10 months ago

It's this one:

https://github.com/scikit-hep/uproot5/blob/c128e4bebdd5aad2bfaa5bc5f4ce896fe7bd9e72/src/uproot/behaviors/TBranch.py#L2473

The thing that's special about this file (these branches, to be specific) is that it has incompletely written ("embedded") TBaskets, which have to be read single-threaded because reading it changes the TBranch object.

I'll adapt the __setstate__/__getstate__ to drop and recreate the lock on pickling.

jpivarski commented 10 months ago

PR #1103 fixes this.