stanfordmlgroup / chexpert-labeler

CheXpert NLP tool to extract observations from radiology reports.
MIT License
350 stars 79 forks source link

Running multiple chexpert labelers in parallel #34

Open PabloMessina opened 2 years ago

PabloMessina commented 2 years ago

Chexpert labeler is a bit slow. In my machine it labels about 4-5 reports per second on average, which is too slow if you want to label dozens of thousands of reports quickly. As a workaround, I thought that I could leverage the fact that my machine has multiple cores by running multiple instances of chexpert labeler over disjoint splits of my report dataset. To this effect I tried the following:

def _invoke_chexpert_labeler_process(self, reports, tmp_suffix='', n_processes = 10):

    n = len(reports)
    if n < 100:
        n_processes = 1

    chunk_size = n // n_processes
    processes = []
    output_paths = []

    if self.verbose:
        print(f'Chexpert labeler: running {n_processes} processes in parallel')

    start = time.time()
    custom_env = _get_custom_env()

    for i in range(n_processes):
        # Define chunk range
        b = i * chunk_size
        e = n if i + 1 == n_processes else b + chunk_size

        # Define input & output paths for i-th chunk
        input_path = os.path.join(TMP_FOLDER, f'labeler-input{tmp_suffix}_{i}.csv')
        output_path = os.path.join(TMP_FOLDER, f'labeler-output{tmp_suffix}_{i}.csv')
        output_paths.append(output_path)

        # Create input file
        os.makedirs(TMP_FOLDER, exist_ok=True)
        in_df = pd.DataFrame(reports[b:e])
        in_df.to_csv(input_path, header=False, index=False, quoting=csv.QUOTE_ALL)

        # Build command & call chexpert labeler process
        cmd_cd = f'cd {CHEXPERT_FOLDER}'
        cmd_call = f'{CHEXPERT_PYTHON} label.py --reports_path {input_path} --output_path {output_path}'
        cmd = f'{cmd_cd} && {cmd_call}'
        if self.verbose:
            print(f'({i}) Running chexpert labeler over {len(in_df)} reports ...')
        processes.append(subprocess.Popen(cmd, shell=True, env=custom_env))

    out_labels = np.empty((n, len(CHEXPERT_LABELS)), np.int8)

    offset = 0        
    for i, p in enumerate(processes):
        # Wait for subprocess to finish
        if p.poll() is None:
            p.wait()
        if self.verbose: print(f'process {i} finished, elapsed time = {time.time() - start}')
        # Read chexpert-labeler output
        out_df = pd.read_csv(output_paths[i])
        out_df = out_df.fillna(-2)
        out_labels[offset : offset + len(out_df)] = out_df[CHEXPERT_LABELS].to_numpy().astype(np.int8)
        offset += len(out_df)

    assert offset == n

    return out_labels

Unfortunately, I'm getting this very strange behavior:

Chexpert labeler: running 10 processes in parallel 1) Running chexpert labeler over 29 reports ... 2) Running chexpert labeler over 29 reports ... 3) Running chexpert labeler over 29 reports ... 4) Running chexpert labeler over 29 reports ... 5) Running chexpert labeler over 29 reports ... 6) Running chexpert labeler over 29 reports ... 7) Running chexpert labeler over 29 reports ... 8) Running chexpert labeler over 29 reports ... 9) Running chexpert labeler over 29 reports ... 10) Running chexpert labeler over 34 reports ... process 0 finished, elapsed time = 9.482320785522461 process 1 finished, elapsed time = 10.595801830291748 process 2 finished, elapsed time = 203.73371744155884 process 3 finished, elapsed time = 203.74254941940308 process 4 finished, elapsed time = 203.7504105567932 process 5 finished, elapsed time = 209.21588110923767 process 6 finished, elapsed time = 209.2250039577484 process 7 finished, elapsed time = 209.2326741218567 process 8 finished, elapsed time = 209.23797416687012 process 9 finished, elapsed time = 209.24284863471985

As you can see, the first two processes terminate relatively quickly (in about 10 seconds), but for some unknown reason processes 2 through 9 terminate about 200 seconds later. I've run my code several times and I always get the same result.

I have two questions:

Thank you very much in advance.

BardiaKh commented 1 year ago

It is probably too late of an answer, but I didn't have much luck with Python's native mp. I ended up spawning multiple processes using bash. For future reference, the code is shared here:

https://github.com/BardiaKh/ParallelCheXpertLabeler

PabloMessina commented 1 year ago

@BardiaKh That's great. In my case it turns out that my implementation was okay, as long as the dockerized version of the chexpert labeler is invoked. If you invoke multiple docker containers running the chexpert labeler in parallel, that works like a charm. Code here: https://github.com/PabloMessina/MedVQA/blob/5d4835a390b6e5d697bd2f4f3dfef73af95101bb/medvqa/metrics/medical/chexpert.py#L198