aertslab / pycisTopic

pycisTopic is a Python module to simultaneously identify cell states and cis-regulatory topics from single cell epigenomics data.
Other
56 stars 12 forks source link

export_pseudobulk [BUG] #55

Open alexlenail opened 1 year ago

alexlenail commented 1 year ago

Thanks for making this package available. I submitted a slurm job with this function:

    bw_paths, bed_paths = export_pseudobulk(input_data=metadata,
                     variable='subclass_label_2',
                     sample_id_col='sample_id',
                     chromsizes=chromsizes,
                     bed_path=base_path+f'ATAC/pseudobulks/{region}/bed',
                     bigwig_path=base_path+f'ATAC/pseudobulks/{region}/bigwig',
                     path_to_fragments=fragments_dict,
                     n_cpu=16,
                     normalize_bigwig=True,
                     remove_duplicates=True,
                     _temp_dir=base_path+f'ATAC/pseudobulks/{region}/tmp',
                     split_pattern='-')

I've been waiting for this step for about 96h. The log file reads:

2023-01-20 19:40:23,007 cisTopic     INFO     Reading fragments from ...sample1/outs/atac_fragments.tsv.gz
2023-01-20 19:46:34,790 cisTopic     INFO     Reading fragments from ...sample2/outs/atac_fragments.tsv.gz
2023-01-20 19:52:13,172 cisTopic     INFO     Reading fragments from ...sample3/outs/atac_fragments.tsv.gz
2023-01-20 19:58:06,604 cisTopic     INFO     Reading fragments from ...sample4/outs/atac_fragments.tsv.gz
2023-01-20 20:05:07,534 cisTopic     INFO     Reading fragments from ...sample5/outs/atac_fragments.tsv.gz
2023-01-20 20:10:28,990 cisTopic     INFO     Reading fragments from ...sample6/outs/atac_fragments.tsv.gz
2023-01-20 20:16:22,397 cisTopic     INFO     Reading fragments from ...sample7/outs/atac_fragments.tsv.gz
2023-01-20 20:23:02,828 cisTopic     INFO     Reading fragments from ...sample8/outs/atac_fragments.tsv.gz

And nothing else -- hasn't changed in 96h. Inspecting the job, it's using 2% of only 1 of the 48 cores, and 60GB RAM (of 260GB provided).

The tutorial suggests it should launch Ray jobs or something, something like

(export_pseudobulk_ray pid=12108) 2022-08-08 18:30:07,800 cisTopic INFO Creating pseudobulk for AST

but I'm not seeing that in my output. Does anything stick out to you as a mistake here?

cbravo93 commented 1 year ago

Hi @alexlenail !

Do you get any ray message at all? This message for instance:

2022-08-05 16:50:45,105 INFO services.py:1470 -- View the Ray dashboard at http://127.0.0.1:8265

If not could you try:

Can you also show how metadata looks like?

Thanks!

C

alexlenail commented 1 year ago

Thanks for the help debugging, @cbravo93 !

image

Metadata looks like:

image

When I use n_cpu=1 though, I get this error message:

image

Which I can probably fix by changing the chromsizes inputs. But that leads me to wonder: maybe these errors weren't properly bubbling up when you use Ray / n_jobs > 1 ?

alexlenail commented 1 year ago

Traceback:

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
Cell In[42], line 1
----> 1 narrow_peaks_dict = peak_calling(macs_path,
      2                                  bed_paths,
      3                                  output_dir,
      4                                  genome_size='mm',
      5                                  n_cpu=8,
      6                                  input_format='BEDPE',
      7                                  shift=73,
      8                                  ext_size=146,
      9                                  keep_dup='all',
     10                                  q_value=0.05,
     11                                  _temp_dir='/home/gridsan/lenail/tmp/ray_spill')

File /data1/groups/neuroTF/tools/pycisTopic/pycisTopic/pseudobulk_peak_calling.py:413, in peak_calling(macs_path, bed_paths, outdir, genome_size, n_cpu, input_format, shift, ext_size, keep_dup, q_value, nolambda, **kwargs)
    410     os.makedirs(outdir)
    412 if n_cpu > 1:
--> 413     ray.init(num_cpus=n_cpu, **kwargs)
    414     narrow_peaks = ray.get(
    415         [
    416             macs_call_peak_ray.remote(
   (...)
    430         ]
    431     )
    432     ray.shutdown()

File ~/.conda/envs/py39/lib/python3.9/site-packages/ray/_private/client_mode_hook.py:105, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
    103     if func.__name__ != "init" or is_client_mode_enabled_by_default:
    104         return getattr(ray, func.__name__)(*args, **kwargs)
--> 105 return func(*args, **kwargs)

File ~/.conda/envs/py39/lib/python3.9/site-packages/ray/_private/worker.py:1439, in init(address, num_cpus, num_gpus, resources, object_store_memory, local_mode, ignore_reinit_error, include_dashboard, dashboard_host, dashboard_port, job_config, configure_logging, logging_level, logging_format, log_to_driver, namespace, runtime_env, storage, **kwargs)
   1397     ray_params = ray._private.parameter.RayParams(
   1398         node_ip_address=node_ip_address,
   1399         raylet_ip_address=raylet_ip_address,
   (...)
   1433         node_name=_node_name,
   1434     )
   1435     # Start the Ray processes. We set shutdown_at_exit=False because we
   1436     # shutdown the node in the ray.shutdown call that happens in the atexit
   1437     # handler. We still spawn a reaper process in case the atexit handler
   1438     # isn't called.
-> 1439     _global_node = ray._private.node.Node(
   1440         head=True, shutdown_at_exit=False, spawn_reaper=True, ray_params=ray_params
   1441     )
   1442 else:
   1443     # In this case, we are connecting to an existing cluster.
   1444     if num_cpus is not None or num_gpus is not None:

File ~/.conda/envs/py39/lib/python3.9/site-packages/ray/_private/node.py:242, in Node.__init__(self, ray_params, head, shutdown_at_exit, spawn_reaper, connect_only)
    235     self._plasma_store_socket_name = self._prepare_socket_file(
    236         self._ray_params.plasma_store_socket_name, default_prefix="plasma_store"
    237     )
    238     self._raylet_socket_name = self._prepare_socket_file(
    239         self._ray_params.raylet_socket_name, default_prefix="raylet"
    240     )
--> 242 self.metrics_agent_port = self._get_cached_port(
    243     "metrics_agent_port", default_port=ray_params.metrics_agent_port
    244 )
    245 self._metrics_export_port = self._get_cached_port(
    246     "metrics_export_port", default_port=ray_params.metrics_export_port
    247 )
    249 ray_params.update_if_absent(
    250     metrics_agent_port=self.metrics_agent_port,
    251     metrics_export_port=self._metrics_export_port,
    252 )

File ~/.conda/envs/py39/lib/python3.9/site-packages/ray/_private/node.py:801, in Node._get_cached_port(self, port_name, default_port)
    798 # Maps a Node.unique_id to a dict that maps port names to port numbers.
    799 ports_by_node: Dict[str, Dict[str, int]] = defaultdict(dict)
--> 801 with FileLock(file_path + ".lock"):
    802     if not os.path.exists(file_path):
    803         with open(file_path, "w") as f:

File ~/.conda/envs/py39/lib/python3.9/site-packages/filelock/_api.py:220, in BaseFileLock.__enter__(self)
    214 def __enter__(self) -> BaseFileLock:
    215     """
    216     Acquire the lock.
    217 
    218     :return: the lock object
    219     """
--> 220     self.acquire()
    221     return self

File ~/.conda/envs/py39/lib/python3.9/site-packages/filelock/_api.py:187, in BaseFileLock.acquire(self, timeout, poll_interval, poll_intervall, blocking)
    185             msg = "Lock %s not acquired on %s, waiting %s seconds ..."
    186             _LOGGER.debug(msg, lock_id, lock_filename, poll_interval)
--> 187             time.sleep(poll_interval)
    188 except BaseException:  # Something did go wrong, so decrement the counter.
    189     with self._thread_lock:

KeyboardInterrupt: 
alexlenail commented 1 year ago

The filesystem I'm using doesn't support file locking on the home directory. Am I reading correctly that Ray is trying to lock a file to write metrics to? Is there a way to disable that? Maybe one of the **kwargs passed to ray.init() ?

https://docs.ray.io/en/latest/ray-core/package-ref.html

cbravo93 commented 1 year ago

Hi Alex!

Can you put your _temp_dir path somewhere that is not home? That should fix the problem :)

C

dburkhardt commented 1 year ago

@cbravo93 just a note, I'm also having difficulty using Ray. The issue looks very similar to what @alexlenail is posting, and it took me a while to troubleshoot.

However, I do get a different error that suggests nodes aren't starting up.

---------------------------------------------------------------------------
TimeoutError                              Traceback (most recent call last)
File /srv/conda/envs/saturn/lib/python3.9/site-packages/ray/_private/node.py:310, in Node.__init__(self, ray_params, head, shutdown_at_exit, spawn_reaper, connect_only)
    309 try:
--> 310     ray._private.services.wait_for_node(
    311         self.gcs_address,
    312         self._plasma_store_socket_name,
    313     )
    314 except TimeoutError:

File /srv/conda/envs/saturn/lib/python3.9/site-packages/ray/_private/services.py:434, in wait_for_node(gcs_address, node_plasma_store_socket_name, timeout)
    433         time.sleep(0.1)
--> 434 raise TimeoutError("Timed out while waiting for node to startup.")

TimeoutError: Timed out while waiting for node to startup.

During handling of the above exception, another exception occurred:

Exception                                 Traceback (most recent call last)
Cell In[13], line 5
      2 BW_PATHS_PICKLE = os.path.join(work_dir, 'scATAC/consensus_peak_calling/pseudobulk_bed_files/bw_paths.pkl')
      4 if True:#not os.path.exists(BED_PATHS_PICKLE):
----> 5     bw_paths, bed_paths = export_pseudobulk(input_data = rna_obs_small,
      6                  variable = 'leiden',                                                                     # variable by which to generate pseubulk profiles, in this case we want pseudobulks per celltype
      7                  sample_id_col = 'sample_id',
      8                  chromsizes = small_chromsizes,
      9                  bed_path = os.path.join(work_dir, 'scATAC/consensus_peak_calling/pseudobulk_bed_files/'),  # specify where pseudobulk_bed_files should be stored
     10                  bigwig_path = os.path.join(work_dir, 'scATAC/consensus_peak_calling/pseudobulk_bw_files/'),# specify where pseudobulk_bw_files should be stored
     11                  path_to_fragments = {"cd34_multiome": "./data/smaller_fragments.tsv.gz"},                                                        # location of fragment fiels
     12                  n_cpu = 16,                                                                                 # specify the number of cores to use, we use ray for multi processing
     13                  normalize_bigwig = True,
     14                  remove_duplicates = True,
     15                  _temp_dir = os.path.join(tmp_dir, 'ray_spill'),
     16 
     17     )
     19     pickle.dump(bed_paths, 
     20             open(os.path.join(work_dir, 'scATAC/consensus_peak_calling/pseudobulk_bed_files/bed_paths.pkl'), 'wb'))
     21     pickle.dump(bw_paths,
     22            open(os.path.join(work_dir, 'scATAC/consensus_peak_calling/pseudobulk_bed_files/bw_paths.pkl'), 'wb'))

File /srv/conda/envs/saturn/lib/python3.9/site-packages/pycisTopic/pseudobulk_peak_calling.py:165, in export_pseudobulk(input_data, variable, chromsizes, bed_path, bigwig_path, path_to_fragments, sample_id_col, n_cpu, normalize_bigwig, remove_duplicates, split_pattern, use_polars, **kwargs)
    163 # Create pseudobulks
    164 if n_cpu > 1:
--> 165     ray.init(num_cpus=n_cpu, **kwargs)
    166     ray_handle = ray.wait(
    167         [
    168             export_pseudobulk_ray.remote(
   (...)
    182         num_returns=len(groups),
    183     )
    184     ray.shutdown()

File /srv/conda/envs/saturn/lib/python3.9/site-packages/ray/_private/client_mode_hook.py:105, in client_mode_hook.<locals>.wrapper(*args, **kwargs)
    103     if func.__name__ != "init" or is_client_mode_enabled_by_default:
    104         return getattr(ray, func.__name__)(*args, **kwargs)
--> 105 return func(*args, **kwargs)

File /srv/conda/envs/saturn/lib/python3.9/site-packages/ray/_private/worker.py:1439, in init(address, num_cpus, num_gpus, resources, object_store_memory, local_mode, ignore_reinit_error, include_dashboard, dashboard_host, dashboard_port, job_config, configure_logging, logging_level, logging_format, log_to_driver, namespace, runtime_env, storage, **kwargs)
   1397     ray_params = ray._private.parameter.RayParams(
   1398         node_ip_address=node_ip_address,
   1399         raylet_ip_address=raylet_ip_address,
   (...)
   1433         node_name=_node_name,
   1434     )
   1435     # Start the Ray processes. We set shutdown_at_exit=False because we
   1436     # shutdown the node in the ray.shutdown call that happens in the atexit
   1437     # handler. We still spawn a reaper process in case the atexit handler
   1438     # isn't called.
-> 1439     _global_node = ray._private.node.Node(
   1440         head=True, shutdown_at_exit=False, spawn_reaper=True, ray_params=ray_params
   1441     )
   1442 else:
   1443     # In this case, we are connecting to an existing cluster.
   1444     if num_cpus is not None or num_gpus is not None:

File /srv/conda/envs/saturn/lib/python3.9/site-packages/ray/_private/node.py:315, in Node.__init__(self, ray_params, head, shutdown_at_exit, spawn_reaper, connect_only)
    310     ray._private.services.wait_for_node(
    311         self.gcs_address,
    312         self._plasma_store_socket_name,
    313     )
    314 except TimeoutError:
--> 315     raise Exception(
    316         "The current node has not been updated within 30 "
    317         "seconds, this could happen because of some of "
    318         "the Ray processes failed to startup."
    319     )
    320 node_info = ray._private.services.get_node_to_connect_for_driver(
    321     self.gcs_address,
    322     self._raylet_ip_address,
    323 )
    324 if self._ray_params.node_manager_port == 0:

Exception: The current node has not been updated within 30 seconds, this could happen because of some of the Ray processes failed to startup.
cbravo93 commented 1 year ago

Hi @dburkhardt !

Can you paste your full command?. Can you try:

We normally do not work in /tmp in our system because you need some space for files to be written.

C

cbravo93 commented 1 year ago

@dburkhardt can you check https://github.com/aertslab/scenicplus/issues/31 https://github.com/aertslab/scenicplus/issues/24 ?

dburkhardt commented 1 year ago

Hi @dburkhardt !

Can you paste your full command?. Can you try:

* ray.init()

* ray.init(n_cpu=2)

* ray.init(_temp_dir=XXX)

We normally do not work in /tmp in our system because you need some space for files to be written.

C

image

These work

cbravo93 commented 1 year ago

@dburkhardt can you check with n_cpu=1 and see if any error pops up? Can you check chromsizes match with the fragments file?

dburkhardt commented 1 year ago

This does work with n_cpu=1 with correct output, and I can proceed with the rest of the analysis. The issue only comes up when trying to run with n_cpu>1.

cbravo93 commented 1 year ago

Does os.path.join(tmp_dir, 'ray_spill') exist? I just managed to reproduce the error with

ray.init(_temp_dir='blah')
dburkhardt commented 1 year ago

Interesting, if I just omit _temp_dir from the export_pseudobulk command, then it works fine. It seems like there are multiple issues with manually specifying this parameter, rather than using the default.

Looking at the ray documentation (https://docs.ray.io/en/latest/ray-core/configure.html):

(There is not currently a stable way to change the root temporary directory when calling ray.init(), but if you need to, you can provide the _temp_dir argument to ray.init().)

So maybe best just to omit this?

cbravo93 commented 1 year ago

If you dont define _temp_dir it will use /tmp. If you have limited space in /tmp I would recommend to specify _temp_dir, especially downstream. What is your value for os.path.join(tmp_dir, 'ray_spill')?

dburkhardt commented 1 year ago

No, the temporary directory didn't exist before calling export_pseudobulk. I wouldn't expect it to need to exist before, though, because it's temporary. You don't check for that before passing the _temp_dir to ray?

cbravo93 commented 1 year ago

It is checked internally by ray. What is your value for os.path.join(tmp_dir, 'ray_spill')?

alexlenail commented 1 year ago

@cbravo93 did you run into this issue at all? https://github.com/ray-project/ray/issues/7724

when I set _temp_dir = '/state/partition1/user/lenail' I get:

OSError: AF_UNIX path length cannot exceed 107 bytes: '/state/partition1/slurm_tmp/21322766.4294967291.0/ray/session_2023-02-02_13-15-02_941811_78876/sockets/plasma_store'

What do you set as _temp_dir ?

jflucier commented 1 year ago

Hi,

I essentially noticed the same observation that were posted in this issue. To sum it up:

  1. when I remove _temp_dir an specify n_cpu>1, it sucessfully creates bed and bw files
  2. _temp_dir must exist prior to function call export_pseudobulk with n_cpu>1 or else it fails creating bed and bw files
  3. If _temp_dir is a "long path", an error is thrown OSError: AF_UNIX path length cannot exceed 107 bytes:. This is a known ray bug but doent seem to be resolve in version 2.4 https://github.com/ray-project/ray/issues/7724

Thank for all the comment in this issue. I agree with @dburkhardt, I think this directory should be automatically created when calling export_pseudobulk

massonix commented 1 year ago

Hi @cbravo93, thanks for developing this amazing tool!

I had a similar problem with ray and the export_pseudobulk function. In my case, I was not even able to initialize ray following the code you provided. Running:

import ray
ray.init()

Raised the following error:

2023-08-22 11:22:18,012 ERROR services.py:1207 -- Failed to start the dashboard , return code 1
2023-08-22 11:22:18,013 ERROR services.py:1232 -- Error should be written to 'dashboard.log' or 'dashboard.err'. We are printing the last 20 lines for you. See 'https://docs.ray.io/en/master/ray-observability/ray-logging.html#logging-directory-structure' to find where the log file is.
2023-08-22 11:22:18,014 ERROR services.py:1276 -- 
The last 20 lines of /scratch_tmp/10407352/ray/session_2023-08-22_11-22-13_701675_119199/logs/dashboard.log (it contains the error message from the dashboard): 
  File "/home/groups/singlecell/rmassoni/anaconda3/envs/scenicplus/lib/python3.8/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 843, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/home/groups/singlecell/rmassoni/anaconda3/envs/scenicplus/lib/python3.8/site-packages/ray/dashboard/modules/log/log_manager.py", line 8, in <module>
    from ray.util.state.common import (
  File "/home/groups/singlecell/rmassoni/anaconda3/envs/scenicplus/lib/python3.8/site-packages/ray/util/state/__init__.py", line 1, in <module>
    from ray.util.state.api import (
  File "/home/groups/singlecell/rmassoni/anaconda3/envs/scenicplus/lib/python3.8/site-packages/ray/util/state/api.py", line 17, in <module>
    from ray.util.state.common import (
  File "/home/groups/singlecell/rmassoni/anaconda3/envs/scenicplus/lib/python3.8/site-packages/ray/util/state/common.py", line 120, in <module>
    @dataclass(init=True)
  File "/home/groups/singlecell/rmassoni/anaconda3/envs/scenicplus/lib/python3.8/site-packages/pydantic/dataclasses.py", line 139, in dataclass
    assert init is False, 'pydantic.dataclasses.dataclass only supports init=False'
AssertionError: pydantic.dataclasses.dataclass only supports init=False
2023-08-22 11:22:18,142 INFO worker.py:1636 -- Started a local Ray instance.

I checked this issue in the ray github, and the problem seems to be with the version 2 of pydantic. I downgraded pydantic as follows:

conda activate scenicplus
which python
~/anaconda3/envs/scenicplus/bin/python
~/anaconda3/envs/scenicplus/bin/pip install "pydantic<2"

And then everything worked. This should be fixed with the version 2.6 of pydantic.