ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.6k stars 5.71k forks source link

[CORE] Unable to run celery task containing ray tasks #31157

Open NumberChiffre opened 1 year ago

NumberChiffre commented 1 year ago

What happened + What you expected to happen

Hi everyone,

I have a pipeline that runs a bunch of ray remote functions. This pipeline is sent as a celery task. Everything works if the celery worker uses --pool=threads, but as soon as I switch back to --pool=prefork. I get the following error:

celery-worker    | 2022-12-16T16:38:50.011824477Z 2022-12-16 16:38:50,011       ERROR worker.py:813 -- print_logs: <_MultiThreadedRendezvous of RPC that terminated with:
celery-worker    | 2022-12-16T16:38:50.011854063Z       status = StatusCode.INTERNAL
celery-worker    | 2022-12-16T16:38:50.011859633Z       details = "Received RST_STREAM with error code 2"
celery-worker    | 2022-12-16T16:38:50.011863721Z       debug_error_string = "UNKNOWN:Error received from peer ipv4:172.18.0.3:47345 {grpc_message:"Received RST_STREAM with error code 2", grpc_status:13, created_time:"2022-12-16T16:38:50.011364109+00:00"}"
celery-worker    | 2022-12-16T16:38:50.011877767Z >

The working version:

celery -A app worker --loglevel=INFO -P threads --concurrency=${CELERY_CONCURRENCY}

The failed version:

celery -A app worker --loglevel=INFO

The reason why I'd like to switch back to prefork is that their threads/TaskPool does not implement task termination. Would appreciate for feedback/alternatives around this.

Thanks in advance!

Versions / Dependencies

Latest versions of:

Reproduction script

None

Issue Severity

High: It blocks me from completing my task.

rkooo567 commented 1 year ago

Ray is not working very well with fork because Ray is built on top of grpc that is not fork friendly (you need to do some special config https://github.com/grpc/grpc/blob/master/doc/fork_support.md). Even after fixing this, I am not 100% sure this will work with fork.

In this case, you need to make sure ray is imported and initialized after fork is happening. Can you share the script you send ray pipeline to celery?

NumberChiffre commented 1 year ago

@rkooo567 Thanks for the timely reply, my celery task kind of looks like this:

celery_app = Celery(__name__)
celery_app.conf.broker_url = os.getenv("CELERY_BROKER_URL", REDIS_URL)
celery_app.conf.result_backend = os.getenv("CELERY_RESULT_BACKEND", REDIS_URL)

@signals.setup_logging.connect
def setup_celery_logging(**kwargs):
    ray.init()

@celery_app.task(name="test")
def run_task(
        config: Dict
) -> bool:
    result = start_pipeline(config=config)
    return True

Where start_pipeline loops over a set of ray tasks.

Just tried inserting those two env variables GRPC_ENABLE_FORK_SUPPORT=true and GRPC_POLL_STRATEGY=poll, I got this:

celery-worker    | 2022-12-16T17:52:52.248886305Z   File "/usr/local/lib/python3.10/site-packages/ray/remote_function.py", line 122, in _remote_proxy
celery-worker    | 2022-12-16T17:52:52.248890093Z     return self._remote(args=args, kwargs=kwargs, **self._default_options)
celery-worker    | 2022-12-16T17:52:52.248902336Z   File "/usr/local/lib/python3.10/site-packages/ray/util/tracing/tracing_helper.py", line 307, in _invocation_remote_span
celery-worker    | 2022-12-16T17:52:52.248906664Z     return method(self, args, kwargs, *_args, **_kwargs)
celery-worker    | 2022-12-16T17:52:52.248910280Z   File "/usr/local/lib/python3.10/site-packages/ray/remote_function.py", line 272, in _remote
celery-worker    | 2022-12-16T17:52:52.248914218Z     worker.function_actor_manager.export(self)
celery-worker    | 2022-12-16T17:52:52.248917775Z   File "/usr/local/lib/python3.10/site-packages/ray/_private/function_manager.py", line 213, in export
celery-worker    | 2022-12-16T17:52:52.248921682Z     if self._worker.gcs_client.internal_kv_exists(key, KV_NAMESPACE_FUNCTION_TABLE):
celery-worker    | 2022-12-16T17:52:52.248925399Z   File "/usr/local/lib/python3.10/site-packages/ray/_private/gcs_utils.py", line 177, in wrapper
celery-worker    | 2022-12-16T17:52:52.248929106Z     return f(self, *args, **kwargs)
celery-worker    | 2022-12-16T17:52:52.248932853Z   File "/usr/local/lib/python3.10/site-packages/ray/_private/gcs_utils.py", line 331, in internal_kv_exists
celery-worker    | 2022-12-16T17:52:52.248943102Z     reply = self._kv_stub.InternalKVExists(req, timeout=timeout)
celery-worker    | 2022-12-16T17:52:52.248946889Z   File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 944, in __call__
celery-worker    | 2022-12-16T17:52:52.248950746Z     state, call, = self._blocking(request, timeout, metadata, credentials,
celery-worker    | 2022-12-16T17:52:52.248954283Z   File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 926, in _blocking
celery-worker    | 2022-12-16T17:52:52.248958020Z     call = self._channel.segregated_call(
celery-worker    | 2022-12-16T17:52:52.248961697Z   File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 496, in grpc._cython.cygrpc.Channel.segregated_call
celery-worker    | 2022-12-16T17:52:52.248965715Z   File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 353, in grpc._cython.cygrpc._segregated_call
celery-worker    | 2022-12-16T17:52:52.248969462Z   File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 357, in grpc._cython.cygrpc._segregated_call
celery-worker    | 2022-12-16T17:52:52.248973179Z ValueError: Cannot invoke RPC on closed channel!

Not sure if this looks like a potential feature on ray side. I think for the time being, I'd switch back to using threads.