gks-anvil / vrs_anvil_toolkit

Extract clinical variant interpretations from VCF using GA4GH VRS IDs
MIT License
2 stars 1 forks source link

feature/better-threaded-translator #48

Closed bwalsh closed 7 months ago

bwalsh commented 8 months ago

Performance on 3.10 is not optimal and a single threaded model leaves N cores unused.

Reimplement ThreadedTranslator with these constraints:

bwalsh commented 8 months ago

Ideation, Pseudo code (untested). Replace sleep with call to CachedTranslator

vrs_anvil/translator.py

import threading
import queue
from typing import NamedTuple, Generator, Dict
import time

class WorkerThread(threading.Thread):
    def __init__(self, task_queue, result_queue):
        super().__init__(daemon=True)
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.translator = cached_translator_factory()

    def run(self):
        while True:
            item, priority = self.task_queue.get()
            if item is None:
                break  # Signal to exit the thread
            # Process the item (simulated work)
            result_dict= {"gnomad_id": item.gnomad_id, "file_name": item.file_name, 
                      "line_number": item.line_number, "identifier": item.identifier,
                      "priority": priority}
            time.sleep(1)  # Simulate some processing time, call translator...
            # result_dict['result'] = allele ...
            self.result_queue.put(result_dict)
            self.task_queue.task_done()

class VCFItem(NamedTuple):
    gnomad_id: str
    file_name: str
    line_number: int
    identifier: str

def threaded_translator(generator, num_worker_threads) -> Generator[Dict[str, str], None, None]:
    task_queue = queue.PriorityQueue(maxsize=num_worker_threads * 2)
    result_queue = queue.Queue(maxsize=num_worker_threads * 2)

    # Start worker threads
    worker_threads = [WorkerThread(task_queue, result_queue) for _ in range(num_worker_threads)]
    for worker_thread in worker_threads:
        worker_thread.start()

    # Start the reader thread
    def reader_thread(generator):
        for item in generator:
            task_queue.put((item, 1))  # Default priority is 1

    reader = threading.Thread(target=reader_thread, args=(generator,), daemon=True)
    reader.start()

    # Main thread yields results
    while True:
        result = result_queue.get()
        yield result
        result_queue.task_done()
bwalsh commented 8 months ago

tests/unit/test_translator.py

Replace vcf_item_example with gnomad_ids_generator Reading from test fixture(s): https://github.com/ohsu-comp-bio/vrs-python-testing/blob/development/tests/fixtures/gnomAD_v4.0.0_ENSG00000012048_2024_03_04_18_33_26.csv

import pytest
from vrs_anvil.translator import threaded_translator, VCFItem

def test_threaded_translator():
    vcf_item_example = VCFItem(gnomad_id="123", file_name="example.txt", line_number=42, identifier="abc")
    generator_example = (vcf_item_example for _ in range(5))
    num_worker_threads_example = 3
    result_gen = threaded_translator(generator_example, num_worker_threads_example)

    # Example usage of the yielded results
    results = list(result_gen)

    # Add your assertions based on the expected results
    assert len(results) == 5
    assert all(isinstance(result, dict) for result in results)
quinnwai commented 8 months ago

Adding some info about using threading from commit associated with #30, with 1 thread, the iterations are maximized, but runtime is long (long tear down?) whereas iterations are lower at threads increase but the elapsed time is smaller than 1 thread.