aertslab / scenicplus

SCENIC+ is a python package to build gene regulatory networks (GRNs) using combined or separate single-cell gene expression (scRNA-seq) and single-cell chromatin accessibility (scATAC-seq) data.
Other
165 stars 27 forks source link

A Parallel calculation error with joblib when execute the export_pseudobulk function #282

Open JohnWang1997 opened 5 months ago

JohnWang1997 commented 5 months ago

Dear authors and developers, Thanks for all your work.

Describe the bug When I was reproducing the export_pseudobulk step of the tutorial (10x multiome pbmc), I encountered the problem that the .bed.gz file and .bw file could not be generated normally. This issue was resolved by replacing pseudobulk_peak_calling.py with pycisTopic_improve/pycisTopic/python/pycisTopic/pseudobulk_peak_calling.py, as suggested in issue #277.

However, a new CPU parallel computing error appeared, which seems to be related to the joblib package. Specifically, when the parameter n_cpu of the export_pseudobulk function is greater than 1, an error will occur. When n_cpu=1, the function can run normally and output .bed.gz files and .bw files.

To Reproduce Commands relevant to reproduce the error.

from pycisTopic.pseudobulk_peak_calling import export_pseudobulk
bw_paths, bed_paths = export_pseudobulk(input_data = cell_data,
                 variable = 'celltype',                                                                     # variable by which to generate pseubulk profiles, in this case we want pseudobulks per celltype
                 sample_id_col = 'sample_id',
                 chromsizes = chromsizes,
                 bed_path = os.path.join(work_dir, 'test/consensus_peak_calling/pseudobulk_bed_files/'),  # specify where pseudobulk_bed_files should be stored
                 bigwig_path = os.path.join(work_dir, 'test/consensus_peak_calling/pseudobulk_bw_files/'),# specify where pseudobulk_bw_files should be stored
                 path_to_fragments = fragments_dict,
                 n_cpu = 8,                                                                                 # specify the number of cores to use, we use ray for multi processing
                 normalize_bigwig = True,
                 remove_duplicates = True,
                 #_temp_dir = tmp_dir,
                 split_pattern = '-')

Error output Paste the entire output of the command, including log information prior to the error.

Reading fragments for 10x_pbmc.
from: pbmc_tutorial/data/pbmc_granulocyte_sorted_3k_atac_fragments.tsv.gz
---------------------------------------------------------------------------
_RemoteTraceback                          Traceback (most recent call last)
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/joblib/externals/loky/process_executor.py", line 426, in _process_worker
    call_item = call_queue.get(block=True, timeout=timeout)
  File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/multiprocessing/queues.py", line 116, in get
    return _ForkingPickler.loads(res)
  File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pycisTopic/pseudobulk_peak_calling.py", line 10, in <module>
    import pandas as pd
  File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pandas/__init__.py", line 48, in <module>
    from pandas.core.api import (
  File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pandas/core/api.py", line 47, in <module>
    from pandas.core.groupby import (
  File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pandas/core/groupby/__init__.py", line 1, in <module>
    from pandas.core.groupby.generic import (
  File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pandas/core/groupby/generic.py", line 77, in <module>
    from pandas.core.frame import DataFrame
  File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pandas/core/frame.py", line 182, in <module>
    from pandas.core.generic import NDFrame
  File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pandas/core/generic.py", line 179, in <module>
    from pandas.core.window import (
  File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pandas/core/window/__init__.py", line 1, in <module>
    from pandas.core.window.ewm import (
  File "/home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pandas/core/window/ewm.py", line 11, in <module>
    import pandas._libs.window.aggregations as window_aggregations
ImportError: /lib/x86_64-linux-gnu/libstdc++.so.6: version `GLIBCXX_3.4.29' not found (required by /home/wangyue/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pandas/_libs/window/aggregations.cpython-38-x86_64-linux-gnu.so)
"""

The above exception was the direct cause of the following exception:

BrokenProcessPool                         Traceback (most recent call last)
Cell In[4], line 2
      1 from pycisTopic.pseudobulk_peak_calling import export_pseudobulk
----> 2 bw_paths, bed_paths = export_pseudobulk(input_data = cell_data,
      3                  variable = 'celltype',                                                                     # variable by which to generate pseubulk profiles, in this case we want pseudobulks per celltype
      4                  sample_id_col = 'sample_id',
      5                  chromsizes = chromsizes,
      6                  bed_path = os.path.join(work_dir, 'test/consensus_peak_calling/pseudobulk_bed_files/'),  # specify where pseudobulk_bed_files should be stored
      7                  bigwig_path = os.path.join(work_dir, 'test/consensus_peak_calling/pseudobulk_bw_files/'),# specify where pseudobulk_bw_files should be stored
      8                  path_to_fragments = fragments_dict,
      9                  n_cpu = 8,                                                                                 # specify the number of cores to use, we use ray for multi processing
     10                  normalize_bigwig = True,
     11                  remove_duplicates = True,
     12                  #_temp_dir = tmp_dir,
     13                  split_pattern = '-')

File ~/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pycisTopic/pseudobulk_peak_calling.py:210, in export_pseudobulk(input_data, variable, chromsizes, bed_path, bigwig_path, path_to_fragments, sample_id_col, n_cpu, normalize_bigwig, remove_duplicates, split_pattern)
    203     _sample_cell_data["barcode"] = prepare_tag_cells(
    204         _sample_cell_data.index.tolist(), split_pattern
    205     )
    206 _cell_type_to_cell_barcodes = _sample_cell_data \
    207     .groupby(variable, group_keys=False)["barcode"] \
    208     .apply(set) \
    209     .to_dict()
--> 210 _cell_type_to_fragments = _get_fragments_for_cell_barcodes(
    211     path_to_fragments[sample], _cell_type_to_cell_barcodes, n_cores=n_cpu
    212 )
    213 for cell_type in _cell_type_to_fragments.keys():
    214     cell_type_to_fragments_all_samples[cell_type].extend(
    215         _cell_type_to_fragments[cell_type]
    216     )

File ~/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/pycisTopic/pseudobulk_peak_calling.py:68, in _get_fragments_for_cell_barcodes(path_to_fragments, cell_type_to_cell_barcodes, n_cores)
     66 contigs = tbx.contigs
     67 tbx.close()
---> 68 cell_type_to_fragments_per_contig = joblib.Parallel(n_jobs=n_cores)(
     69     joblib.delayed(_get_fragments_for_cell_barcodes_single_contig)(
     70         path_to_fragments, contig, cell_type_to_cell_barcodes
     71     )
     72     for contig in contigs
     73 )
     74 cell_type_to_fragments = {
     75     cell_type: [] for cell_type in cell_type_to_cell_barcodes.keys()
     76 }
     77 for i in range(len(cell_type_to_fragments_per_contig)):

File ~/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/joblib/parallel.py:1952, in Parallel.__call__(self, iterable)
   1946 # The first item from the output is blank, but it makes the interpreter
   1947 # progress until it enters the Try/Except block of the generator and
   1948 # reach the first `yield` statement. This starts the aynchronous
   1949 # dispatch of the tasks to the workers.
   1950 next(output)
-> 1952 return output if self.return_generator else list(output)

File ~/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/joblib/parallel.py:1595, in Parallel._get_outputs(self, iterator, pre_dispatch)
   1592     yield
   1594     with self._backend.retrieval_context():
-> 1595         yield from self._retrieve()
   1597 except GeneratorExit:
   1598     # The generator has been garbage collected before being fully
   1599     # consumed. This aborts the remaining tasks if possible and warn
   1600     # the user if necessary.
   1601     self._exception = True

File ~/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/joblib/parallel.py:1699, in Parallel._retrieve(self)
   1692 while self._wait_retrieval():
   1693 
   1694     # If the callback thread of a worker has signaled that its task
   1695     # triggered an exception, or if the retrieval loop has raised an
   1696     # exception (e.g. `GeneratorExit`), exit the loop and surface the
   1697     # worker traceback.
   1698     if self._aborting:
-> 1699         self._raise_error_fast()
   1700         break
   1702     # If the next job is not ready for retrieval yet, we just wait for
   1703     # async callbacks to progress.

File ~/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/joblib/parallel.py:1734, in Parallel._raise_error_fast(self)
   1730 # If this error job exists, immediatly raise the error by
   1731 # calling get_result. This job might not exists if abort has been
   1732 # called directly or if the generator is gc'ed.
   1733 if error_job is not None:
-> 1734     error_job.get_result(self.timeout)

File ~/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/joblib/parallel.py:736, in BatchCompletionCallBack.get_result(self, timeout)
    730 backend = self.parallel._backend
    732 if backend.supports_retrieve_callback:
    733     # We assume that the result has already been retrieved by the
    734     # callback thread, and is stored internally. It's just waiting to
    735     # be returned.
--> 736     return self._return_or_raise()
    738 # For other backends, the main thread needs to run the retrieval step.
    739 try:

File ~/software/miniconda3/envs/scenicplus/lib/python3.8/site-packages/joblib/parallel.py:754, in BatchCompletionCallBack._return_or_raise(self)
    752 try:
    753     if self.status == TASK_ERROR:
--> 754         raise self._result
    755     return self._result
    756 finally:

BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.

Version (please complete the following information):