sgkit-dev / sgkit

Scalable genetics toolkit
https://sgkit-dev.github.io/sgkit
Apache License 2.0
235 stars 32 forks source link

path 'variant_contig' contains an array #1043

Closed barneyhill closed 1 year ago

barneyhill commented 1 year ago

Hi everyone - really excited to try out this package. I'm trying to import the imputed UK Biobank bgens into sgkit:

ds = read_bgen(path="/well/lindgren/UKBIOBANK/DATA/IMPUTATION/ukb_imp_chr22_v3.bgen", metafile_path="/users/lindgren/hjo721/barney/gxe/meta/meta", sample_path="/well/lindgren/UKBIOBANK/DATA/SAMPLE_FAM/ukb11867_imp_chr1_v3_s487395.sample").set_index({"samples": "sample_id"})
ds = sg.io.bgen.rechunk_bgen(ds, output="/users/lindgren/hjo721/barney/gxe/zarr/22.zarr")

And getting the following exception:

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/sgkit/io/bgen/bgen_reader.py:510, in rechunk_bgen(ds, output, chunk_length, chunk_width, compressor, probability_dtype, max_mem, pack, tempdir)
    503 target_options = {
    504     var: {k: v for k, v in encoding[var].items() if k != "chunks"}
    505     for var in encoding
    506 }
    507 with tempfile.TemporaryDirectory(
    508     prefix="bgen_to_zarr_", suffix=".zarr", dir=tempdir
    509 ) as tmpdir:
--> 510     rechunked = rechunker_api.rechunk(
    511         ds,
    512         max_mem=max_mem,
    513         target_chunks=target_chunks,
    514         target_store=output,
    515         target_options=target_options,
    516         temp_store=tmpdir,
    517         executor="dask",
    518     )
    519     rechunked.execute()
    521 zarr.consolidate_metadata(output)

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/rechunker/api.py:302, in rechunk(source, target_chunks, max_mem, target_store, target_options, temp_store, temp_options, executor)
    299 if isinstance(executor, str):
    300     executor = _get_executor(executor)
--> 302 copy_spec, intermediate, target = _setup_rechunk(
    303     source=source,
    304     target_chunks=target_chunks,
    305     max_mem=max_mem,
    306     target_store=target_store,
    307     target_options=target_options,
    308     temp_store=temp_store,
    309     temp_options=temp_options,
    310 )
    311 plan = executor.prepare_plan(copy_spec)
    312 return Rechunked(executor, plan, source, intermediate, target)

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/rechunker/api.py:426, in _setup_rechunk(source, target_chunks, max_mem, target_store, target_options, temp_store, temp_options)
    423 variable_attrs = _encode_zarr_attributes(variable.attrs)
    424 variable_attrs[DIMENSION_KEY] = encode_zarr_attr_value(variable.dims)
--> 426 copy_spec = _setup_array_rechunk(
    427     dask.array.asarray(variable),
    428     variable_chunks,
    429     max_mem,
    430     target_group,
    431     target_options=options,
    432     temp_store_or_group=temp_group,
    433     temp_options=options,
    434     name=name,
    435 )
    436 copy_spec.write.array.attrs.update(variable_attrs)  # type: ignore
    437 copy_specs.append(copy_spec)

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/rechunker/api.py:546, in _setup_array_rechunk(source_array, target_chunks, max_mem, target_store_or_group, target_options, temp_store_or_group, temp_options, name)
    543 int_chunks = tuple(int(x) for x in int_chunks)
    544 write_chunks = tuple(int(x) for x in write_chunks)
--> 546 target_array = _zarr_empty(
    547     shape,
    548     target_store_or_group,
    549     target_chunks,
    550     dtype,
    551     name=name,
    552     **(target_options or {}),
    553 )
    554 try:
    555     target_array.attrs.update(source_array.attrs)

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/rechunker/api.py:153, in _zarr_empty(shape, store_or_group, chunks, dtype, name, **kwargs)
    151 if name is not None:
    152     assert isinstance(store_or_group, zarr.hierarchy.Group)
--> 153     return store_or_group.empty(
    154         name, shape=shape, chunks=chunks, dtype=dtype, **kwargs
    155     )
    156 else:
    157     return zarr.empty(
    158         shape, chunks=chunks, dtype=dtype, store=store_or_group, **kwargs
    159     )

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/zarr/hierarchy.py:1140, in Group.empty(self, name, **kwargs)
   1137 def empty(self, name, **kwargs):
   1138     """Create an array. Keyword arguments as per
   1139     :func:`zarr.creation.empty`."""
-> 1140     return self._write_op(self._empty_nosync, name, **kwargs)

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/zarr/hierarchy.py:895, in Group._write_op(self, f, *args, **kwargs)
    892     lock = self._synchronizer[group_meta_key]
    894 with lock:
...
--> 495         raise ContainsArrayError(path)
    496     elif contains_group(store, path, explicit_only=False):
    497         raise ContainsGroupError(path)

ContainsArrayError: path 'variant_contig' contains an array

Has anyone ever seen anything like this before - the error is a bit opaque to me.

Many Thanks, Barney

tomwhite commented 1 year ago

Hi Barney,

The ContainsArrayError indicates that there are already files in that directory. Do you get this when running from scratch (i.e. with no /users/lindgren/hjo721/barney/gxe/zarr/22.zarr directory)?

Tom

barneyhill commented 1 year ago

Ah of course! Thank you very much. I'm now running rechunk_bgen but it seems to be taking very long - after 2 hours I'm at 2% progress. Do you know if this is expected (1255683 variants, 487409 samples) and if so is there anything I can do to speed up the conversion?

tomwhite commented 1 year ago

What are you running it on? A cluster or large multi-core machine is usually needed for that size of dataset.

barneyhill commented 1 year ago

Running with 24 cores, 350GB ram.

tomwhite commented 1 year ago

One thing you might try is using larger chunks. The default for rechunk_bgen is chunk_length=10000 and chunk_width=1000. This can be rather small in the samples dimension (chunk_width) when you have so many samples, so might be worth increasing.

@benjeffery might have some thoughts here as he is working with similar sized data.

jeromekelleher commented 1 year ago

Maybe you could show us some pictures of your Dask dashboard @barneyhill? That would help us get some context.

barneyhill commented 1 year ago

Ah! Sorry I was just following along with the gwas example + the docs but I realise I probably have to start client = dask.distributed.Client(n_workers=24, threads_per_worker=1) to use all my cores? Now running the following:

client = dask.distributed.Client(n_workers=24, threads_per_worker=1)
with ProgressBar():
    ds = read_bgen(path="/well/lindgren/UKBIOBANK/DATA/IMPUTATION/ukb_imp_chr22_v3.bgen",
                metafile_path="/users/lindgren/hjo721/barney/gxe/meta/meta",
                sample_path="/well/lindgren/UKBIOBANK/DATA/SAMPLE_FAM/ukb11867_imp_chr1_v3_s487395.sample").set_index({"samples": "sample_id"})

My client looks like it has been initalised properly image but I now get the following error:

2023-03-03 13:26:20,713 - distributed.protocol.pickle - ERROR - Failed to serialize subgraph_callable-8c6d285a-e7e9-4dce-b3a5-25fd5e950cb6.
Traceback (most recent call last):
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/worker.py", line 2986, in dumps_function
    result = cache_dumps[func]
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/collections.py", line 24, in __getitem__
    value = super().__getitem__(key)
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/collections/__init__.py", line 1106, in __getitem__
    raise KeyError(key)
KeyError: subgraph_callable-8c6d285a-e7e9-4dce-b3a5-25fd5e950cb6

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <ufunc 'len (vectorized)'>: attribute lookup len (vectorized) on __main__ failed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 68, in dumps
    pickler.dump(x)
_pickle.PicklingError: Can't pickle <ufunc 'len (vectorized)'>: attribute lookup len (vectorized) on __main__ failed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 81, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
_pickle.PicklingError: Can't pickle <ufunc 'len (vectorized)'>: attribute lookup len (vectorized) on __main__ failed
Output exceeds the [size limit](command:workbench.action.openSettings?[). Open the full output data [in a text editor](command:workbench.action.openLargeOutput?a559d97e-0b58-4d9e-9658-11c9e9bcf7c4)
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/worker.py:2986, in dumps_function(func)
   2985     with _cache_lock:
-> 2986         result = cache_dumps[func]
   2987 except KeyError:

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/collections.py:24, in LRU.__getitem__(self, key)
     23 def __getitem__(self, key):
---> 24     value = super().__getitem__(key)
     25     cast(OrderedDict, self.data).move_to_end(key)

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/collections/__init__.py:1106, in UserDict.__getitem__(self, key)
   1105     return self.__class__.__missing__(self, key)
-> 1106 raise KeyError(key)

KeyError: subgraph_callable-8c6d285a-e7e9-4dce-b3a5-25fd5e950cb6

During handling of the above exception, another exception occurred:

PicklingError                             Traceback (most recent call last)
File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/pickle.py:63, in dumps(x, buffer_callback, protocol)
     62 try:
---> 63     result = pickle.dumps(x, **dump_kwargs)
     64 except Exception:

PicklingError: Can't pickle <ufunc 'len (vectorized)'>: attribute lookup len (vectorized) on __main__ failed

During handling of the above exception, another exception occurred:

PicklingError                             Traceback (most recent call last)
File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/pickle.py:68, in dumps(x, buffer_callback, protocol)
     67 buffers.clear()
---> 68 pickler.dump(x)
     69 result = f.getvalue()

PicklingError: Can't pickle <ufunc 'len (vectorized)'>: attribute lookup len (vectorized) on __main__ failed

During handling of the above exception, another exception occurred:

PicklingError                             Traceback (most recent call last)
Cell In[5], line 2
      1 with ProgressBar():
----> 2     ds = read_bgen(path="/well/lindgren/UKBIOBANK/DATA/IMPUTATION/ukb_imp_chr22_v3.bgen",
      3                 metafile_path="/users/lindgren/hjo721/barney/gxe/meta/meta",
      4                 sample_path="/well/lindgren/UKBIOBANK/DATA/SAMPLE_FAM/ukb11867_imp_chr1_v3_s487395.sample").set_index({"samples": "sample_id"})
      5     ds = sg.io.bgen.rechunk_bgen(ds, output="/users/lindgren/hjo721/barney/gxe/zarr/22.zarr")
      6     prs = pd.read_csv("~/barney/gxe/data/BMI_imp_int_pgs_chrom.txt.gz", sep="\t", index_col="sid")

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/sgkit/io/bgen/bgen_reader.py:290, in read_bgen(path, metafile_path, sample_path, chunks, lock, persist, contig_dtype, gp_dtype)
    288 if persist:
    289     df = df.persist()
--> 290 arrs = dataframe_to_dict(df, METAFILE_DTYPE)
    292 variant_id = arrs["id"]
    293 variant_contig: ArrayLike = arrs["chrom"].astype(contig_dtype)

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/sgkit/io/utils.py:38, in dataframe_to_dict(df, dtype)
     35 kind = np.dtype(dt).kind
     36 if kind in ["U", "S"]:
     37     # Compute fixed-length string dtype for array
---> 38     max_len = int(max_str_len(a))
     39     dt = f"{kind}{max_len}"
     40 arrs[c] = a.astype(dt)

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/dask/array/core.py:1878, in Array.__int__(self)
   1877 def __int__(self):
-> 1878     return self._scalarfunc(int)

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/dask/array/core.py:1875, in Array._scalarfunc(self, cast_type)
   1873     raise TypeError("Only length-1 arrays can be converted to Python scalars")
   1874 else:
-> 1875     return cast_type(self.compute())

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/dask/base.py:314, in DaskMethodsMixin.compute(self, **kwargs)
    290 def compute(self, **kwargs):
    291     """Compute this dask collection
    292 
    293     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    312     dask.base.compute
    313     """
--> 314     (result,) = compute(self, traverse=False, **kwargs)
    315     return result

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/dask/base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    596     keys.append(x.__dask_keys__())
    597     postcomputes.append(x.__dask_postcompute__())
--> 599 results = schedule(dsk, keys, **kwargs)
    600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/client.py:3116, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3043 def get(
   3044     self,
   3045     dsk,
   (...)
...
--> 632         return Pickler.dump(self, obj)
    633     except RuntimeError as e:
    634         if "recursion" in e.args[0]:

PicklingError: Can't pickle <ufunc 'len (vectorized)'>: attribute lookup len (vectorized) on __main__ failed

Has anyone seen this kind of error before / have any BDI people had success getting sgkit working on BMRC? Many thanks.

jeromekelleher commented 1 year ago

@benjeffery perhaps you could have a chat with @barneyhill ?

benjeffery commented 1 year ago

From a quick look at the stack trace, it seems the bgen reader is trying to find the max string length in the metafile, but something is too big to be pickled. My guess here is that the metafile isn't chunked, the chunks that sgkit uses are the "partitions" in the original metafile - how many partitions does the metafile have? You can tell by checking the npartitions attribute on a file opened with cbgen.bgen_metafile.

barneyhill commented 1 year ago

Ah that would make sense, I'm getting:

>>> mf = cbgen.bgen_metafile("meta")
>>> mf.npartitions
1121
benjeffery commented 1 year ago

Well, that certainly seems like enough partitions. Can you recreate the error with:

from sgkit.io.bgen import bgen_reader
from sgkit.io import utils
df = bgen_reader.read_metafile("meta")  # assuming "meta" is the filepath of the metafile.
arrs = utils.dataframe_to_dict(df, bgen_reader.METAFILE_DTYPE)

?

barneyhill commented 1 year ago

Hmm...

>>> from sgkit.io.bgen import bgen_reader
>>> from sgkit.io import utils
>>> df = bgen_reader.read_metafile("meta")  # assuming "meta" is the filepath of the metafile.
>>> arrs = utils.dataframe_to_dict(df, bgen_reader.METAFILE_DTYPE)
>>> arrs
{'id': dask.array<astype, shape=(1255683,), dtype=|S428, chunksize=(1121,), chunktype=numpy.ndarray>, 'rsid': dask.array<astype, shape=(1255683,), dtype=|S304, chunksize=(1121,), chunktype=numpy.ndarray>, 'chrom': dask.array<astype, shape=(1255683,), dtype=|S2, chunksize=(1121,), chunktype=numpy.ndarray>, 'pos': dask.array<values, shape=(1255683,), dtype=int32, chunksize=(1121,), chunktype=numpy.ndarray>, 'a1': dask.array<astype, shape=(1255683,), dtype=|S97, chunksize=(1121,), chunktype=numpy.ndarray>, 'a2': dask.array<astype, shape=(1255683,), dtype=|S414, chunksize=(1121,), chunktype=numpy.ndarray>, 'offset': dask.array<values, shape=(1255683,), dtype=int64, chunksize=(1121,), chunktype=numpy.ndarray>}
benjeffery commented 1 year ago

Great, I assume that is with the default dask cluster? How about with client = dask.distributed.Client(n_workers=24, threads_per_worker=1) before you run the snippet above?

barneyhill commented 1 year ago

Ah yeah of course:

>>> arrs = utils.dataframe_to_dict(df, bgen_reader.METAFILE_DTYPE)

2023-03-08 11:51:19,402 - distributed.protocol.pickle - ERROR - Failed to serialize subgraph_callable-d978e014-1704-4593-9cdf-81300063e6af.
Traceback (most recent call last):
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <ufunc 'len (vectorized)'>: attribute lookup len (vectorized) on __main__ failed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 68, in dumps
    pickler.dump(x)
_pickle.PicklingError: Can't pickle <ufunc 'len (vectorized)'>: attribute lookup len (vectorized) on __main__ failed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 81, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
_pickle.PicklingError: Can't pickle <ufunc 'len (vectorized)'>: attribute lookup len (vectorized) on __main__ failed
2023-03-08 11:51:19,403 - distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/core.py", line 109, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/msgpack/__init__.py", line 38, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer._pack
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/core.py", line 100, in _encode_default
    frames.extend(create_serialized_sub_frames(obj))
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/core.py", line 60, in create_serialized_sub_frames
    sub_header, sub_frames = serialize_and_split(
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 444, in serialize_and_split
    header, frames = serialize(x, serializers, on_error, context)
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 266, in serialize
    return serialize(
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 316, in serialize
    headers_frames = [
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 317, in <listcomp>
    serialize(
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 366, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type SubgraphCallable', 'subgraph_callable-d978e014-1704-4593-9cdf-81300063e6af')
2023-03-08 11:51:19,403 - distributed.comm.utils - ERROR - ('Could not serialize object of type SubgraphCallable', 'subgraph_callable-d978e014-1704-4593-9cdf-81300063e6af')
Traceback (most recent call last):
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/comm/utils.py", line 55, in _to_frames
    return list(protocol.dumps(msg, **kwargs))
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/core.py", line 109, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/msgpack/__init__.py", line 38, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer._pack
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/core.py", line 100, in _encode_default
    frames.extend(create_serialized_sub_frames(obj))
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/core.py", line 60, in create_serialized_sub_frames
    sub_header, sub_frames = serialize_and_split(
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 444, in serialize_and_split
    header, frames = serialize(x, serializers, on_error, context)
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 266, in serialize
    return serialize(
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 316, in serialize
    headers_frames = [
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 317, in <listcomp>
    serialize(
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 366, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type SubgraphCallable', 'subgraph_callable-d978e014-1704-4593-9cdf-81300063e6af')
2023-03-08 11:51:19,405 - distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/tornado/gen.py", line 769, in run
    value = future.result()
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/comm/tcp.py", line 271, in write
    frames = await to_frames(
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/comm/utils.py", line 72, in to_frames
    return _to_frames()
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/comm/utils.py", line 55, in _to_frames
    return list(protocol.dumps(msg, **kwargs))
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/core.py", line 109, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/msgpack/__init__.py", line 38, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer._pack
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/core.py", line 100, in _encode_default
    frames.extend(create_serialized_sub_frames(obj))
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/core.py", line 60, in create_serialized_sub_frames
    sub_header, sub_frames = serialize_and_split(
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 444, in serialize_and_split
    header, frames = serialize(x, serializers, on_error, context)
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 266, in serialize
    return serialize(
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 316, in serialize
    headers_frames = [
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 317, in <listcomp>
    serialize(
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 366, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type SubgraphCallable', 'subgraph_callable-d978e014-1704-4593-9cdf-81300063e6af')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/sgkit/io/utils.py", line 38, in dataframe_to_dict
    max_len = int(max_str_len(a))
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/dask/array/core.py", line 1878, in __int__
    return self._scalarfunc(int)
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/dask/array/core.py", line 1875, in _scalarfunc
    return cast_type(self.compute())
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/dask/base.py", line 314, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/dask/base.py", line 599, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/client.py", line 3136, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/client.py", line 2305, in gather
    return self.sync(
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/utils.py", line 338, in sync
    return sync(
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/utils.py", line 405, in sync
    raise exc.with_traceback(tb)
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/utils.py", line 378, in f
    result = yield future
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/tornado/gen.py", line 769, in run
    value = future.result()
  File "/well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/client.py", line 2169, in _gather
    raise exc
concurrent.futures._base.CancelledError: ('amax-aggregate-874c03a6b7d900048ffce9b03ebd7be5',)
benjeffery commented 1 year ago

Great, that's given me enough info to recreate locally in the sgkit test suite! Looks like we have dask operations that are fine when you are running in a single process, but not happy when they need to be sent over the network.

We should be running the test suite with a multi-process dask cluster - will raise a PR for this.

barneyhill commented 1 year ago

Not to be a bother but I've updated to the latest commit and and now getting the following error:

Cell In[4], line 5
      1 with ProgressBar():
      2     ds = read_bgen(path="/well/lindgren/UKBIOBANK/DATA/IMPUTATION/ukb_imp_chr22_v3.bgen",
      3                 metafile_path="/users/lindgren/hjo721/barney/gxe/meta/meta",
      4                 sample_path="/well/lindgren/UKBIOBANK/DATA/SAMPLE_FAM/ukb11867_imp_chr1_v3_s487395.sample").set_index({"samples": "sample_id"})
----> 5     ds = sg.io.bgen.rechunk_bgen(ds, output="/users/lindgren/hjo721/barney/gxe/zarr/22.zarr")
      6     prs = pd.read_csv("~/barney/gxe/data/BMI_imp_int_pgs_chrom.txt.gz", sep="\t", index_col="sid")

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/sgkit/io/bgen/bgen_reader.py:518, in rechunk_bgen(ds, output, chunk_length, chunk_width, compressor, probability_dtype, max_mem, pack, tempdir)
    506 with tempfile.TemporaryDirectory(
    507     prefix="bgen_to_zarr_", suffix=".zarr", dir=tempdir
    508 ) as tmpdir:
    509     rechunked = rechunker_api.rechunk(
    510         ds,
    511         max_mem=max_mem,
   (...)
    516         executor="dask",
    517     )
--> 518     rechunked.execute()
    520 zarr.consolidate_metadata(output)
    522 ds = xr.open_zarr(output, concat_characters=False)  # type: ignore[no-untyped-call]

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/rechunker/api.py:78, in Rechunked.execute(self, **kwargs)
     63 def execute(self, **kwargs):
     64     """
     65     Execute the rechunking.
     66 
   (...)
     76     :func:`rechunker.rechunk`.
     77     """
---> 78     self._executor.execute_plan(self._plan, **kwargs)
     79     return self._target

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/rechunker/executors/dask.py:48, in DaskPipelineExecutor.execute_plan(self, plan, **kwargs)
     47 def execute_plan(self, plan: Delayed, **kwargs):
---> 48     return dask.compute(*plan, **kwargs)

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/dask/base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    596     keys.append(x.__dask_keys__())
    597     postcomputes.append(x.__dask_postcompute__())
--> 599 results = schedule(dsk, keys, **kwargs)
    600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/client.py:3168, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3166         should_rejoin = False
   3167 try:
-> 3168     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3169 finally:
   3170     for f in futures.values():

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/client.py:2328, in Client.gather(self, futures, errors, direct, asynchronous)
   2326 else:
   2327     local_worker = None
-> 2328 return self.sync(
   2329     self._gather,
   2330     futures,
   2331     errors=errors,
   2332     direct=direct,
   2333     local_worker=local_worker,
   2334     asynchronous=asynchronous,
   2335 )

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/utils.py:345, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    343     return future
    344 else:
--> 345     return sync(
    346         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    347     )

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/utils.py:412, in sync(loop, func, callback_timeout, *args, **kwargs)
    410 if error:
    411     typ, exc, tb = error
--> 412     raise exc.with_traceback(tb)
    413 else:
    414     return result

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/utils.py:385, in sync.<locals>.f()
    383         future = wait_for(future, callback_timeout)
    384     future = asyncio.ensure_future(future)
--> 385     result = yield future
    386 except Exception:
    387     error = sys.exc_info()

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/tornado/gen.py:769, in Runner.run(self)
    766 exc_info = None
    768 try:
--> 769     value = future.result()
    770 except Exception:
    771     exc_info = sys.exc_info()

File /well/lindgren/users/hjo721/conda/skylake/envs/sgkit/lib/python3.10/site-packages/distributed/client.py:2191, in Client._gather(self, futures, errors, direct, local_worker)
   2189         exc = CancelledError(key)
   2190     else:
...
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

TypeError: 'Future' object is not subscriptable

I removed the sgkit conda package and ran pip install git+https://github.com/pystatgen/sgkit.git cbgen rechunker which I know might not be best practice...

benjeffery commented 1 year ago

This looks like a separate issue, don't be worried about making new issues unless things look very related.

A quick google shows that a similar error was experienced at https://github.com/dask/dask/issues/4361 where the error was only found under certain combinations of versions. Is there a reason you're using conda here? Try from a fresh 3.10 pip env with just pip install git+https://github.com/pystatgen/sgkit.git cbgen rechunker.