ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.21k stars 5.81k forks source link

Ray tasks in batch operation not completing - 144 out of 145 tasks complete but not all #46782

Open agopinath1205 opened 4 months ago

agopinath1205 commented 4 months ago

What happened + What you expected to happen

I have a batch ray job where i see that most of the tasks are getting completed but the last task in an actor gets stuck and does not get complete

Ray tasks in batch operation not completing - 144 out of 145 tasks complete but not all

I am not sure if this is a problem on my end or something wrong on ray, or is there some configuration that needs to be added. Any help appreciated.

Versions / Dependencies

I am using ray within Kubernetes ray image is 2.24 -> and running a tensor flow model.

Reproduction script

def process_batch(file_batch, create=False): ds_ray_data = ray.data.read_parquet(file_batch) exploded_df_source_programs = ds_ray_data.map_batches( GenerateEmbeddings, fn_constructor_kwargs={

"model": local_path, # Local testing

        # "model_config": local_path + "/model_config.json" # Local testing
        "model": "/tmp/ray/embedding_use_multilingual_shared_20230919",
        "model_config": "/tmp/ray/embedding_use_multilingual_shared_20230919/model_config.json"
    },
    batch_format='pandas',
    concurrency=18,
    num_gpus=0.5,
    # num_cpus=0.2, # Local testing
    batch_size=500
).materialize()

final_data = exploded_df_source_programs.map_batches(
    PrepareBatch,
    batch_format='pandas',
    # concurrency=20, # Local testing
    concurrency=220,
    num_cpus=0.25,
    # num_cpus=0.2, # Local testing
    batch_size=100
    # batch_size=100 # Local testing
).materialize()

df_final = final_data.to_pandas()
pa_table = pa.Table.from_pandas(df_final)

if create and f'{process_pending_files}' == 'False':
    create_table(pa_table)
    table = catalog.load_table(f'{table_output_identifier}')
else:
    table = catalog.load_table(f'{table_output_identifier}')

table.append(pa_table)
print(f"Data appended to table {table_output_identifier}.")

Issue Severity

High: It blocks me from completing my task.

agopinath1205 commented 4 months ago

I have two steps in the ray job one runs on GPU to compute embeddings, other one on CPU which is getting stuck.

jjyao commented 4 months ago

Hi @agopinath1205,

Can you try the latest Ray (2.33), we have fixed a few issues related to it? You can also use Ray state api (ray list tasks --detail) to see which task is not completing and which stage it's in.

agopinath1205 commented 4 months ago

I am using Kuberay and is there a way i can debug this using kuberay operator?

jjyao commented 4 months ago

Same thing, you can run ray list tasks --detail inside the head pod

agopinath1205 commented 4 months ago

Sounds good! I am trying to understand why the last task is getting stuck, what could be the root cause?

jjyao commented 4 months ago

I cannot tell until I have more information from ray list tasks --detail