aertslab / pySCENIC

pySCENIC is a lightning-fast python implementation of the SCENIC pipeline (Single-Cell rEgulatory Network Inference and Clustering) which enables biologists to infer transcription factors, gene regulatory networks and cell types from single-cell RNA-seq data.
http://scenic.aertslab.org
GNU General Public License v3.0
424 stars 179 forks source link

prune2df fails with TypeError: object of type 'generator' has no len() [BUG] #551

Open dmalzl opened 3 months ago

dmalzl commented 3 months ago

Describe the bug I am following the description of the full interactive pipeline as detailed in this notebook and having trouble running the pruning stage. Using pySCENIC v0.12.1 (installed from source since PyPI package is broken) I get an error associated with dask when running prune2df and I could not find any related issue in the issue section of this repo.

Mote that most errors are due to the input from the user, and therefore should be treated as questions in the Discussions. Please, only report them as bugs if you are quite certain that they are not behaving as expected.

Steps to reproduce the behavior

  1. Command run when the error occurred:
    
    import anndata as ad
    from distributed import Client, LocalCluster

from arboreto.utils import load_tf_names from arboreto.algo import grnboost2

from ctxcore.rnkdb import FeatherRankingDatabase as RankingDatabase from pyscenic.utils import modules_from_adjacencies from pyscenic.prune import prune2df, df2regulons from pyscenic.aucell import aucell

adata = ad.read_h5ad()

with open('../scenic_resource/hs_hgnc_tfs.txt', 'r') as tf_file: tf_names = [line.rstrip() for line in tf_file]

cistarget_db = RankingDatabase( '../scenic_resource/hg38refseq-r8010kb_up_and_down_tss.mc9nr.genes_vs_motifs.rankings.feather', 'hg38refseq-r8010kb_up_and_down_tss.mc9nr' )

manually restrict number of workers used

client = Client( LocalCluster( name='grn_call', n_workers=8, threads_per_worker=1 ) )

adjacencies = grnboost2( expression_data = adata.to_df('counts'), # convert anndata to pandas.DataFrame tf_names = tf_names, client_or_address = client, verbose = True )

inferred_modules = list( modules_from_adjacencies( adjacencies, adata.to_df('counts') ) )

this is actually executed as part of a dict comprehension

because I am computing GRNs for multiple datasets

but the error also occurs in when running it like this

so I kept the code like this for brevity

prune2df( [db], inferred_modules, '../scenic_resource/motifs-v9-nr.hgnc-m0.001-o0.0.tbl', client_or_address = client )


2. Error encountered:
```pytb
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[35], line 12
      2 client = Client(
      3     LocalCluster(
      4         name='grn_call',
   (...)
      7     )
      8 )
     10 dbs = [cistarget_db]
     11 prunded_modules = {
---> 12     k: prune2df(
     13         dbs, 
     14         inferred_modules, 
     15         '../scenic_resource/motifs-v9-nr.hgnc-m0.001-o0.0.tbl',
     16         client_or_address = client
     17     )
     18     for k, inferred_modules
     19     in modules.items()
     20 }

File ~/.conda/envs/scenic/lib/python3.12/site-packages/pyscenic/prune.py:424, in prune2df(rnkdbs, modules, motif_annotations_fname, rank_threshold, auc_threshold, nes_threshold, motif_similarity_fdr, orthologuous_identity_threshold, weighted_recovery, client_or_address, num_workers, module_chunksize, filter_for_annotation)
    418 # Create a distributed dataframe from individual delayed objects to avoid out of memory problems.
    419 aggregation_func = (
    420     partial(from_delayed, meta=DF_META_DATA)
    421     if client_or_address != "custom_multiprocessing"
    422     else pd.concat
    423 )
--> 424 return _distributed_calc(
    425     rnkdbs,
    426     modules,
    427     motif_annotations_fname,
    428     transformation_func,
    429     aggregation_func,
    430     motif_similarity_fdr,
    431     orthologuous_identity_threshold,
    432     client_or_address,
    433     num_workers,
    434     module_chunksize,
    435 )

File ~/.conda/envs/scenic/lib/python3.12/site-packages/pyscenic/prune.py:362, in _distributed_calc(rnkdbs, modules, motif_annotations_fname, transform_func, aggregate_func, motif_similarity_fdr, orthologuous_identity_threshold, client_or_address, num_workers, module_chunksize)
    357 client, shutdown_callback = _prepare_client(
    358     client_or_address,
    359     num_workers=num_workers if num_workers else cpu_count(),
    360 )
    361 try:
--> 362     return client.compute(create_graph(client), sync=True)
    363 finally:
    364     shutdown_callback(False)

File ~/.conda/envs/scenic/lib/python3.12/site-packages/pyscenic/prune.py:340, in _distributed_calc.<locals>.create_graph(client)
    300 delayed_or_future_dbs = list(map(wrap, rnkdbs))
    301 # 3. The gene signatures: these signatures become large when chunking them, therefore chunking is overruled
    302 # when using dask.distributed.
    303 # See earlier.
   (...)
    337 # again be unavoidable. TBI + See following stackoverflow question:
    338 # https://stackoverflow.com/questions/47776936/why-is-a-computation-much-slower-within-a-dask-distributed-worker
--> 340 return aggregate_func(
    341     (
    342         delayed(transform_func)(db, gs_chunk, delayed_or_future_annotations)
    343         for db in delayed_or_future_dbs
    344         for gs_chunk in chunked_iter(modules, module_chunksize)
    345     )
    346 )

File ~/.conda/envs/scenic/lib/python3.12/site-packages/dask_expr/io/_delayed.py:100, in from_delayed(dfs, meta, divisions, prefix, verify_meta)
     97 if isinstance(dfs, Delayed) or hasattr(dfs, "key"):
     98     dfs = [dfs]
--> 100 if len(dfs) == 0:
    101     raise TypeError("Must supply at least one delayed object")
    103 if meta is None:

TypeError: object of type 'generator' has no len()

Expected behavior Expected behaviour is simply that it runs without any error as all the passed arguments comply to the types inferred from the above mentioned notebook.

Please complete the following information:

dmalzl commented 3 months ago

looking through the code I found that this can be fixed by just running the function in local mode by passing client_or_address = 'custom_multiprocessing' which will then prompt prune2df use pd.concat for aggregating results instead of dask.from_delayed and thus bypasses the underlying problem (see statement here). However, make sure you also pass the right number of cpus you want to use via num_workers or your machine might be spammed with concurrent processes.

lpalvin commented 3 months ago

I encountered the same error when running pyscenic ctx on the command line. After a series of runs I found that there was probably a problem with the dask, so I removed the --mode "dask_multiprocessing" parameter and it worked fine. I guess this is probably a dask environment configuration issue.