ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.61k stars 5.71k forks source link

Failed to execute `sudo -n $(which py-spy) dump -p 10821`. #35197

Open vishrantgupta opened 1 year ago

vishrantgupta commented 1 year ago

What happened + What you expected to happen

Failed to execute `sudo -n $(which py-spy) dump -p 10821`.

=== stderr ===
Error: Failed to get process executable name. Check that the process is running.
Reason: No such file or directory (os error 2)
Reason: No such file or directory (os error 2)

=== stdout ===
Failed to execute `sudo -n $(which py-spy) record -o /tmp/ray/session_2023-05-09_08-04-34_572807_8/logs/flamegraph_11233_cpu_profiling.svg -p 11233 -d 5 -f flamegraph`.

=== stderr ===
Error: Failed to get process executable name. Check that the process is running.
Reason: No such file or directory (os error 2)
Reason: No such file or directory (os error 2)

=== stdout ===

Versions / Dependencies

ray-cluster-0.5.0.

~
 👉 | k get raycluster
NAME                 DESIRED WORKERS   AVAILABLE WORKERS   STATUS   AGE
raycluster-kuberay   4                 4                   ready    19h
~
 👉 | k get raycluster -o yaml
apiVersion: v1
items:
- apiVersion: ray.io/v1alpha1
  kind: RayCluster
  metadata:
    annotations:
      meta.helm.sh/release-name: raycluster
      meta.helm.sh/release-namespace: default
    creationTimestamp: "2023-05-09T04:59:34Z"
    generation: 7
    labels:
      app.kubernetes.io/instance: raycluster
      app.kubernetes.io/managed-by: Helm
      app.kubernetes.io/name: kuberay
      helm.sh/chart: ray-cluster-0.5.0
    name: raycluster-kuberay
    namespace: default
    resourceVersion: "3081784"
    uid: 4bf23c3d-5f19-4865-a7cf-2e7389f2619d
  spec:
    headGroupSpec:
      rayStartParams:
        block: "true"
        dashboard-host: 0.0.0.0
      serviceType: ClusterIP
      template:
        metadata:
          annotations: {}
          labels:
            app.kubernetes.io/instance: raycluster
            app.kubernetes.io/managed-by: Helm
            app.kubernetes.io/name: kuberay
            helm.sh/chart: ray-cluster-0.5.0
        spec:
          affinity: {}
          containers:
          - env: []
            image: 10.225.67.35:30001/tcx/rayproject/ray:2.3.0
            imagePullPolicy: IfNotPresent
            name: ray-head
            resources:
              limits:
                cpu: "1"
                memory: 2G
              requests:
                cpu: "1"
                memory: 2G
            securityContext: {}
            volumeMounts:
            - mountPath: /tmp/ray
              name: log-volume
          imagePullSecrets:
          - name: kubelet-pull-secret
          nodeSelector: {}
          tolerations: []
          volumes:
          - emptyDir: {}
            name: log-volume
    workerGroupSpecs:
    - groupName: workergroup
      maxReplicas: 2147483647
      minReplicas: 4
      rayStartParams:
        block: "true"
      replicas: 4
      template:
        metadata:
          annotations: {}
          labels:
            app.kubernetes.io/instance: raycluster
            app.kubernetes.io/managed-by: Helm
            app.kubernetes.io/name: kuberay
            helm.sh/chart: ray-cluster-0.5.0
        spec:
          affinity: {}
          containers:
          - env: []
            image: 10.225.67.35:30001/tcx/rayproject/ray:2.3.0
            imagePullPolicy: IfNotPresent
            name: ray-worker
            resources:
              limits:
                cpu: "4"
                memory: 1G
              requests:
                cpu: "4"
                memory: 1G
            securityContext: {}
            volumeMounts:
            - mountPath: /tmp/ray
              name: log-volume
          imagePullSecrets:
          - name: kubelet-pull-secret
          initContainers:
          - command:
            - sh
            - -c
            - until nslookup $FQ_RAY_IP; do echo waiting for K8s Service $FQ_RAY_IP;
              sleep 2; done
            image: 10.225.67.35:30001/tcx/busybox:1.28
            name: init
            securityContext: {}
          nodeSelector: {}
          tolerations: []
          volumes:
          - emptyDir: {}
            name: log-volume
  status:
    availableWorkerReplicas: 4
    desiredWorkerReplicas: 4
    endpoints:
      client: "10001"
      dashboard: "8265"
      metrics: "8080"
      redis: "6379"
      serve: "8000"
    head:
      podIP: 10.233.97.162
      serviceIP: 10.233.54.172
    lastUpdateTime: "2023-05-10T00:03:40Z"
    maxWorkerReplicas: 2147483647
    minWorkerReplicas: 4
    observedGeneration: 7
    state: ready
kind: List
metadata:
  resourceVersion: ""
  selfLink: ""

Reproduction script

# run_load.py

import logging
from datetime import datetime

import ray
from numpy import random

LOGGER = logging.getLogger(__name__)

logging.basicConfig(level=logging.DEBUG)

def get_record():
    return {
        "type": "some value"
    }

def generate_records(num_records: int):
    yield from [get_record() for _ in range(num_records)]

def write_to_kafka(partition):
    import json
    from confluent_kafka import Producer as KafkaProducer

    producer = KafkaProducer({'bootstrap.servers': 'internal-kafka-brokers:9092',
                              'queue.buffering.max.ms': 1000,
                              'batch.size': 1000000,
                              'acks': 0,
                              'compression.type': 'gzip'
                              })

    print("writing to kafka")

    for r in generate_records(1000):
        # for r in records:
        producer.produce('vsa_metrics', value=bytes(json.dumps(r), 'utf-8'), partition=partition)

    producer.flush()

@ray.remote
def process_records():
    print("initializing records")

    import time
    from multiprocessing import Process
    # from multiprocessing import Pool

    # Get the current time
    start_time = time.time()
    num_partitions = 4

    proc = []

    for i in range(num_partitions):
        p = Process(target=write_to_kafka(partition=i))
        p.start()
        proc.append(p)

    for p in proc:
        p.join()

    end_time = time.time()
    time_required = end_time - start_time

    print("Time required: ", time_required)

    return 10000

tasks = [process_records.remote() for i in range(4)]
print(tasks)
# raysubmit.py

import time

from ray.job_submission import JobSubmissionClient, JobStatus

# If using a remote cluster, replace 127.0.0.1 with the head node's IP address.
client = JobSubmissionClient("http://127.0.0.1:8265")
job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python run_load.py",
  # Path to the local directory that contains the script.py file
  runtime_env={"working_dir": "./", "pip": ["confluent-kafka==2.1.1"]}
)
print(job_id)

def wait_until_status(job_id, status_to_wait_for, timeout_seconds=5):
    start = time.time()
    while time.time() - start <= timeout_seconds:
        status = client.get_job_status(job_id)

        print(f"status: {status}")
        if status in status_to_wait_for:
            break
        time.sleep(1)

wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}, 360)
logs = client.get_job_logs(job_id)
print(logs)

Issue Severity

High: It blocks me from completing my task.

rkooo567 commented 1 year ago

I think your pid 10821 maybe doesn't exists? How did you find the pid?

stale[bot] commented 1 year ago

Hi, I'm a bot from the Ray team :)

To help human contributors to focus on more relevant issues, I will automatically add the stale label to issues that have had no activity for more than 4 months.

If there is no further activity in the 14 days, the issue will be closed!

You can always ask for help on our discussion forum or Ray's public slack channel.