ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
31.96k stars 5.45k forks source link

Ray multiprocessing.Pool: core_worker_process.cc:278: The core worker has already been shutdown. #46144

Open jfecunha opened 2 weeks ago

jfecunha commented 2 weeks ago

What happened + What you expected to happen

I am using ray multiprocessing.Pool to run data processing tasks within a docker container in GCP Vertex AI (Single machine).

The task that is being processed consists of the following steps (This is for simplicity. We make further transformations on the JSON file before generate the numpy matrix. ):

Everything works as expected until the end of the script when the pool is shutdown and some statistics are computed about the amount of files that were processed.

The error that I see is the following:

[.....] core_worker_process.cc:278: The core worker has already been shutdown. This happens when the language frontend accesses the Ray's worker after it is shutdown. The process will exit",

Looking into the folder where the outputs are expected I see that all the files are processed. We have some exceptions for files not processed due to data quality issues.

I am trying to understand why this error is happening and causing the job to fail in vertex ai.

I ran other jobs with fewer data and I don't have this issue. As a benchmark 400k files work as expected and higher than that, I see the reported error. I don't know if this helps.

Versions / Dependencies

I build a docker image to run in an AMD-based machine in GCP with the following dependencies:

I also tried to build the docker image with ray-supported images (FROM rayproject/ray:nightly-py39-cpu) but the outcome was the same. Also tried Python 3.10.

Reproduction script

"""Dummy script to demonstrate the error."""

import logging
import json
import argparse

from typing import List
from pathlib import Path

import numpy as np
import ray
import tqdm

from ray.util.multiprocessing import Pool as RayPool

def parse_args():
    parser = argparse.ArgumentParser(
        'Parser.',
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )
    parser.add_argument(
        '--input', help='Input directory for raw data.'
    )
    parser.add_argument(
        '--output', help='Output directory for processed data.'
    )
    return parser.parse_args()

def transform_data(file: str):
    try:
        with open(file, 'r') as f:
            data = json.load(f)

        matrix = np.array(data['matrix'])    
        np.savez_compressed(f'{Path(file).stem}.npz', matrix=matrix)

    except Exception as e:
       logging.error('Error processing file %s: %s', file, e)
       return {'num_rows': 0}

    return {'num_rows': matrix.shape[0]}

def run_workers(workers_pool: RayPool, files: List, progress_bar: tqdm):
    num_rows = 0
    try: 
        for results in workers_pool.imap_unordered(transform_data, files):
            for res in results:
                num_rows += res['num_rows']
            progress_bar.update()
    except KeyboardInterrupt:
        logging.info('Stopped by keyboard interrupt')
        workers_pool.terminate()
    finally:
        workers_pool.close()
        workers_pool.join()
        if ray.is_initialized():
            logging.info('Shutting down Ray')
            ray.shutdown()
    return {'total_rows': num_rows}

def main():

    args = parse_args()
    logging.info('Arguments parsed: %s', args)

    output_dir = Path(args.output)
    input_dir = Path(args.input)

    output_dir.mkdir(exist_ok=True, parents=True)
    files = input_dir.glob('**/*.json')

    # Running in a machine with 60 cpus in GCP
    workers_pool = RayPool(processes=60, ray_address='local')
    progress_bar = tqdm.tqdm(files, desc='Converting files', total=len(files))

    results = run_workers(workers_pool, files, progress_bar)

    # NOTE: Error is raised before it reaches this point
    print('Statistics:')
    print(results)

if __name__ == '__main__':
    main()

This script is executed in the docker entrypoint.

Issue Severity

High: It blocks me from completing my task.

jfecunha commented 1 week ago

I got this working by changing the workers_pool.imap_unordered to workers_pool.map. So I guess that this is related to the iterator that imap_unordered returns each time a task is executed. Does Ray have in-build progress_bars to get the status of the jobs?

That was the main reason for using imap_unordered against map.