bokulich-lab / q2-fondue

Functions for reproducibly Obtaining and Normalizing Data re-Used from Elsewhere
BSD 3-Clause "New" or "Revised" License
20 stars 6 forks source link

Process runs out of threads when fetching large amounts of metadata #132

Closed misialq closed 2 years ago

misialq commented 2 years ago

When I'm trying to fetch metadata for ~40k run IDs, at some point during the fetch I get the following error:

RuntimeError                              Traceback (most recent call last)
Input In [21], in <cell line: 1>()
      1 if not os.path.isfile(sra_metadata_path):
----> 2     sra_meta, failed_ids, = fondue.methods.get_metadata(
      3         accession_ids=q2.Artifact.import_data(
      4             'NCBIAccessionIDs', pd.Series(ids, name='ID')
      5         ),
      6         email=email,
      7         n_jobs=n_jobs,
      8         log_level='INFO'
      9     )
     10     sra_meta.save(sra_metadata_path)
     11     failed_ids.save(os.path.join(data_loc, f'sra_failed_ids_batch.qza'))

File <decorator-gen-23>:2, in get_metadata(accession_ids, email, n_jobs, log_level)

File ~/miniconda3/envs/fondue-pub/lib/python3.8/site-packages/qiime2/sdk/action.py:245, in Action._bind.<locals>.bound_callable(*args, **kwargs)
    241         warn(self._build_deprecation_message(),
    242              FutureWarning)
    244 # Execute
--> 245 outputs = self._callable_executor_(scope, callable_args,
    246                                    output_types, provenance)
    248 if len(outputs) != len(self.signature.outputs):
    249     raise ValueError(
    250         "Number of callable outputs must match number of "
    251         "outputs defined in signature: %d != %d" %
    252         (len(outputs), len(self.signature.outputs)))

File ~/miniconda3/envs/fondue-pub/lib/python3.8/site-packages/qiime2/sdk/action.py:391, in Method._callable_executor_(self, scope, view_args, output_types, provenance)
    390 def _callable_executor_(self, scope, view_args, output_types, provenance):
--> 391     output_views = self._callable(**view_args)
    392     output_views = tuplize(output_views)
    394     # TODO this won't work if the user has annotated their "view API" to
    395     # return a `typing.Tuple` with some number of components. Python will
    396     # return a tuple when there are multiple return values, and this length
   (...)
    399     # due to how Python handles multiple returns, and can be worked around
    400     # by using something like `typing.List` instead.

File ~/miniconda3/envs/fondue-pub/lib/python3.8/site-packages/q2_fondue/metadata.py:166, in get_metadata(accession_ids, email, n_jobs, log_level)
    163 id_type = _determine_id_type(accession_ids)
    165 if id_type == 'run':
--> 166     meta, missing_ids = _get_run_meta(
    167         email, n_jobs, accession_ids, log_level, logger
    168     )
    169     # if available, join DOI to meta by run ID:
    170     if id2doi is not None:

File ~/miniconda3/envs/fondue-pub/lib/python3.8/site-packages/q2_fondue/metadata.py:101, in _get_run_meta(email, n_jobs, run_ids, log_level, logger)
     94     logger.warning(
     95         f'The following provided IDs are invalid: '
     96         f'{",".join(invalid_ids.keys())}. Please correct them and '
     97         f'try fetching those independently.'
     98     )
    100 # fetch metadata
--> 101 meta_df, missing_ids = _execute_efetcher(
    102     email, n_jobs, valid_ids, log_level, logger
    103 )
    105 if missing_ids:
    106     logger.warning(
    107         'Metadata for the following run IDs could not be fetched: '
    108         f'{",".join(missing_ids.keys())}. '
    109         f'Please try fetching those independently.'
    110     )

File ~/miniconda3/envs/fondue-pub/lib/python3.8/site-packages/q2_fondue/metadata.py:66, in _execute_efetcher(email, n_jobs, run_ids, log_level, logger)
     62 missing_ids = {}
     63 for num, batch in enumerate(_chunker(run_ids, BATCH_SIZE), 1):
     64     # one efetcher object per loop because threads of one
     65     # efetcher object can only be started once:
---> 66     efetcher = ef.Efetcher(
     67         'efetcher', email, apikey=None,
     68         apikey_var=None, threads=n_jobs, qid=None
     69     )
     70     set_up_entrezpy_logging(efetcher, log_level)
     72     logger.info(
     73         f'Fetching metadata of run IDs from batch number {num}...'
     74     )

File ~/miniconda3/envs/fondue-pub/lib/python3.8/site-packages/entrezpy/efetch/efetcher.py:46, in Efetcher.__init__(self, tool, email, apikey, apikey_var, threads, qid)
     44 def __init__(self, tool, email, apikey=None, apikey_var=None, threads=None, qid=None):
     45   """:ivar result: :class:`entrezpy.base.result.EutilsResult`"""
---> 46   super().__init__('efetch.fcgi', tool, email, apikey=apikey, threads=threads, qid=qid)
     47   self.logger = entrezpy.log.logger.get_class_logger(Efetcher)
     48   self.logger.debug(json.dumps({'init':self.dump()}))

File ~/miniconda3/envs/fondue-pub/lib/python3.8/site-packages/entrezpy/base/query.py:108, in EutilsQuery.__init__(self, eutil, tool, email, apikey, apikey_var, threads, qid)
    106 self.request_counter = 0
    107 self.query_monitor = entrezpy.requester.monitor.QueryMonitor(self.id)
--> 108 self.request_pool = entrezpy.requester.requestpool.RequestPool(self.num_threads,
    109                                                                self.failed_requests,
    110                                                                self.query_monitor,
    111                                                                entrezpy.requester.requester.Requester(1/self.requests_per_sec))
    112 self.logger = entrezpy.log.logger.get_class_logger(EutilsQuery)
    113 self.logger.debug(json.dumps({'init':self.dump()}))

File ~/miniconda3/envs/fondue-pub/lib/python3.8/site-packages/entrezpy/requester/requestpool.py:52, in RequestPool.__init__(self, num_threads, failed_requests, monitor, requester)
     50 atexit.register(self.destructor)
     51 if self.useThreads():
---> 52   self.dispatch_workers()
     53 signal.signal(signal.SIGINT, self.sigint_handler)

File ~/miniconda3/envs/fondue-pub/lib/python3.8/site-packages/entrezpy/requester/requestpool.py:64, in RequestPool.dispatch_workers(self)
     61 for _ in range(self.threads):
     62   w = entrezpy.requester.threadedrequest.ThreadedRequester(
     63     self.requests, self.failed_requests, self.monitor, self.requester, self.stop_event, self.lock)
---> 64   w.start()
     65   self.logger.debug(json.dumps({'thread':w.name, 'status':'started'}))
     66 self.logger.debug(json.dumps({'threading workers':'dispatched'}))

File ~/miniconda3/envs/fondue-pub/lib/python3.8/threading.py:852, in Thread.start(self)
    850     _limbo[self] = self
    851 try:
--> 852     _start_new_thread(self._bootstrap, ())
    853 except Exception:
    854     with _active_limbo_lock:

RuntimeError: can't start new thread

As I ran this in a Jupyter notebook, that same cell was executed maybe twice (there was some network issue the first time) so to reproduce this it would probably be required to use a much higher number of IDs (for the same reason I cannot provide an exact instruction as to how to reproduce this behaviour...).

It would seem though that on every iteration (through retstart in EFetch) a new thread is being spawned and not cleaned up properly - if there are enough batches, the process reaches the maximum allowed count of threads (here it was 4096) and exits with the above-mentioned error.

adamovanja commented 2 years ago

I encountered the same error while running two large get-metadata fetches and they exited with the RuntimeError at the 375th and 280th batch, respectively. So at around 90k run IDs no new threads could be started.

adamovanja commented 2 years ago

I tried a quick fix by simply deleting the efetcher object that launches the threads in each batch - it did not reduce the amount of threads that are launched.