ray-project / kuberay

A toolkit to run Ray applications on Kubernetes
Apache License 2.0
1.17k stars 373 forks source link

[Bug/RayJob] k8sjob doesn't contain ray worker logs with ray.shutdown() at the end #1975

Open ByronHsu opened 7 months ago

ByronHsu commented 7 months ago

Search before asking

KubeRay Component

ray-operator

What happened + What you expected to happen

I ran a simple ray program with ray.shutdown() at the end, but the logs of k8sjob doesn't contain any worker logs.

python file:

import ray

@ray.remote
def f(x):
    print(f"ray task get {x}")
    return x * x

ray.init()

futures = [f.remote(i) for i in range(2)]
l = ray.get(futures)
print(l)

ray.shutdown()

k8sjob's pod logs:

$ kubectl logs rayjob-sample-vrd7f
2024-03-08 15:59:21,180 INFO cli.py:36 -- Job submission server address: http://rayjob-sample-raycluster-g5t6q-head-svc.flytesnacks-development.svc.cluster.local:8265
2024-03-08 15:59:21,570 SUCC cli.py:60 -- ------------------------------------------------
2024-03-08 15:59:21,570 SUCC cli.py:61 -- Job 'rayjob-sample-b5bx5' submitted successfully
2024-03-08 15:59:21,570 SUCC cli.py:62 -- ------------------------------------------------
2024-03-08 15:59:21,570 INFO cli.py:285 -- Next steps
2024-03-08 15:59:21,570 INFO cli.py:286 -- Query the logs of the job:
2024-03-08 15:59:21,570 INFO cli.py:288 -- ray job logs rayjob-sample-b5bx5
2024-03-08 15:59:21,570 INFO cli.py:290 -- Query the status of the job:
2024-03-08 15:59:21,571 INFO cli.py:292 -- ray job status rayjob-sample-b5bx5
2024-03-08 15:59:21,571 INFO cli.py:294 -- Request the job to be stopped:
2024-03-08 15:59:21,571 INFO cli.py:296 -- ray job stop rayjob-sample-b5bx5
2024-03-08 15:59:21,584 INFO cli.py:303 -- Tailing logs until the job exits (disable with --no-wait):
2024-03-08 15:59:26,303 INFO worker.py:1405 -- Using address 10.42.0.237:6379 set in the environment variable RAY_ADDRESS
2024-03-08 15:59:26,303 INFO worker.py:1540 -- Connecting to existing Ray cluster at address: 10.42.0.237:6379...
2024-03-08 15:59:26,308 INFO worker.py:1715 -- Connected to Ray cluster. View the dashboard at http://10.42.0.237:8265 
[0, 1]
2024-03-08 15:59:27,651 SUCC cli.py:60 -- -----------------------------------
2024-03-08 15:59:27,651 SUCC cli.py:61 -- Job 'rayjob-sample-b5bx5' succeeded
2024-03-08 15:59:27,651 SUCC cli.py:62 -- -----------------------------------

The expected behavior is that the log should contain ray task get X.

If i remove ray.shutdown() and run again, i can see the worker logs there.

$ kubectl logs rayjob-sample-fmcv6                   
2024-03-08 16:31:25,132 INFO cli.py:36 -- Job submission server address: http://rayjob-sample-raycluster-n4pqr-head-svc.flytesnacks-development.svc.cluster.local:8265
2024-03-08 16:31:25,518 SUCC cli.py:60 -- ------------------------------------------------
2024-03-08 16:31:25,518 SUCC cli.py:61 -- Job 'rayjob-sample-8txpp' submitted successfully
2024-03-08 16:31:25,518 SUCC cli.py:62 -- ------------------------------------------------
2024-03-08 16:31:25,518 INFO cli.py:285 -- Next steps
2024-03-08 16:31:25,518 INFO cli.py:286 -- Query the logs of the job:
2024-03-08 16:31:25,518 INFO cli.py:288 -- ray job logs rayjob-sample-8txpp
2024-03-08 16:31:25,518 INFO cli.py:290 -- Query the status of the job:
2024-03-08 16:31:25,518 INFO cli.py:292 -- ray job status rayjob-sample-8txpp
2024-03-08 16:31:25,518 INFO cli.py:294 -- Request the job to be stopped:
2024-03-08 16:31:25,519 INFO cli.py:296 -- ray job stop rayjob-sample-8txpp
2024-03-08 16:31:25,522 INFO cli.py:303 -- Tailing logs until the job exits (disable with --no-wait):
2024-03-08 16:31:30,713 INFO worker.py:1405 -- Using address 10.42.0.243:6379 set in the environment variable RAY_ADDRESS
2024-03-08 16:31:30,714 INFO worker.py:1540 -- Connecting to existing Ray cluster at address: 10.42.0.243:6379...
2024-03-08 16:31:30,719 INFO worker.py:1715 -- Connected to Ray cluster. View the dashboard at http://10.42.0.243:8265 
[0, 1]
(f pid=488) ray task get 0
(f pid=488) ray task get 1
2024-03-08 16:31:32,630 SUCC cli.py:60 -- -----------------------------------
2024-03-08 16:31:32,630 SUCC cli.py:61 -- Job 'rayjob-sample-8txpp' succeeded

The issue is similar to https://github.com/ray-project/ray/issues/31931. I can reproduce the issue by exec to k8sjob pod and run in a local ray cluster.

(base) ray@rayjob-sample-4hm45:~$ cat > a.py <<EOF
> import ray
> 
> @ray.remote
> def f(x):
>     print(f"ray task get {x}")
>     return x * x
> 
> ray.init()
> 
> futures = [f.remote(i) for i in range(2)]
> l = ray.get(futures)
> print(l)
> 
> ray.shutdown()
> EOF
(base) ray@rayjob-sample-4hm45:~$ ls
a.py  anaconda3  pip-freeze.txt  requirements_compiled.txt
(base) ray@rayjob-sample-4hm45:~$ python a.py
2024-03-08 16:59:37,773 WARNING services.py:1996 -- WARNING: The object store is using /tmp instead of /dev/shm because /dev/shm has only 67108864 bytes available. This will harm performance! You may be able to free up space by deleting files in /dev/shm. If you are inside a Docker container, you can increase /dev/shm size by passing '--shm-size=10.05gb' to 'docker run' (or add it to the run_options list in a Ray cluster config). Make sure to set this to more than 30% of available RAM.
2024-03-08 16:59:38,911 INFO worker.py:1715 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
[0, 1]
(base) ray@rayjob-sample-4hm45:~$ ls

I encountered this issue when integrating kuberay 1.1.0 with the latest flyte, where they shutdown the ray program after the task ends. Before the flushing issue is fixed, is removing ray.shutdown() from the code to surface up the logs a good approach? or it could introduce other problems?

Reproduction script

  1. Follow here to set up kuberay operator 1.1.0
  2. Create a rayjob

    
    apiVersion: ray.io/v1
    kind: RayJob
    metadata:
    name: rayjob-sample
    spec:
    entrypoint: python /home/ray/samples/sample_code.py
    # shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false.
    # shutdownAfterJobFinishes: false
    
    # ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes.
    # ttlSecondsAfterFinished: 10
    
    # RuntimeEnvYAML represents the runtime environment configuration provided as a multi-line YAML string.
    # See https://docs.ray.io/en/latest/ray-core/handling-dependencies.html for details.
    # (New in KubeRay version 1.0.)
    runtimeEnvYAML: |
    pip:
      - requests==2.26.0
      - pendulum==2.1.2
    env_vars:
      counter_name: "test_counter"
    
    # Suspend specifies whether the RayJob controller should create a RayCluster instance.
    # If a job is applied with the suspend field set to true, the RayCluster will not be created and we will wait for the transition to false.
    # If the RayCluster is already created, it will be deleted. In the case of transition to false, a new RayCluste rwill be created.
    # suspend: false
    
    # rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller.
    rayClusterSpec:
    rayVersion: '2.9.0' # should match the Ray version in the image of the containers
    # Ray head pod template
    headGroupSpec:
      # The `rayStartParams` are used to configure the `ray start` command.
      # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
      # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
      rayStartParams:
        dashboard-host: '0.0.0.0'
      #pod template
      template:
        spec:
          containers:
            - name: ray-head
              image: rayproject/ray:2.9.0
              ports:
                - containerPort: 6379
                  name: gcs-server
                - containerPort: 8265 # Ray dashboard
                  name: dashboard
                - containerPort: 10001
                  name: client
              resources:
                limits:
                  cpu: "1"
                requests:
                  cpu: "200m"
              volumeMounts:
                - mountPath: /home/ray/samples
                  name: code-sample
          volumes:
            # You set volumes at the Pod level, then mount them into containers inside that Pod
            - name: code-sample
              configMap:
                # Provide the name of the ConfigMap you want to mount.
                name: ray-job-code-sample
                # An array of keys from the ConfigMap to create as files
                items:
                  - key: sample_code.py
                    path: sample_code.py
    workerGroupSpecs:
      # the pod replicas in this group typed worker
      - replicas: 1
        minReplicas: 1
        maxReplicas: 5
        # logical group name, for this called small-group, also can be functional
        groupName: small-group
        # The `rayStartParams` are used to configure the `ray start` command.
        # See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
        # See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
        rayStartParams: {}
        #pod template
        template:
          spec:
            containers:
              - name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc'
                image: rayproject/ray:2.9.0
                lifecycle:
                  preStop:
                    exec:
                      command: [ "/bin/sh","-c","ray stop" ]
                resources:
                  limits:
                    cpu: "1"
                  requests:
                    cpu: "200m"
    # SubmitterPodTemplate is the template for the pod that will run the `ray job submit` command against the RayCluster.
    # If SubmitterPodTemplate is specified, the first container is assumed to be the submitter container.
    submitterPodTemplate:
    spec:
      restartPolicy: Never
      containers:
        - name: my-custom-rayjob-submitter-pod
          image: rayproject/ray:2.9.0
          # If Command is not specified, the correct command will be supplied at runtime using the RayJob spec `entrypoint` field.
          # Specifying Command is not recommended.
          # command: ["sh", "-c", "ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID -- echo hello world"]
          resources:
            limits:
              cpu: "1"
            requests:
              cpu: "200m"

######################Ray code sample#################################

this sample is from https://docs.ray.io/en/latest/cluster/job-submission.html#quick-start-example

it is mounted into the container and executed to show the Ray job at work


apiVersion: v1 kind: ConfigMap metadata: name: ray-job-code-sample data: sample_code.py: |

import ray
import time; time.sleep(3600)

@ray.remote
def f(x):
    print(f"ray task get {x}")
    return x * x

ray.init()

futures = [f.remote(i) for i in range(2)]
l = ray.get(futures)
print(l)

ray.shutdown()

3. Use kubectl to view the logs

### Anything else

_No response_

### Are you willing to submit a PR?

- [ ] Yes I am willing to submit a PR!
rueian commented 5 months ago

Hi @ByronHsu,

Due to the remote streaming nature of those logs, I am afraid that the flushing issue is impossible to solve. All we can do is wait for the logs to be streamed from remote workers.

ray.shutdown() will clear any resources started by Ray, but normally there is no need to invoke it manually because it will be called automatically at the end of the script.

If such auto shutdown doesn't work in your case, you can either add time.sleep(...) before ray.shutdown() or let it sleep for you by passing _exiting_interpreter=True