apache / datafusion-ray

Apache DataFusion Ray
https://datafusion.apache.org/ray
Apache License 2.0
117 stars 10 forks source link

Cannot run benchmarks in k8s due to excessive spilling & OOM #44

Open andygrove opened 1 week ago

andygrove commented 1 week ago

I cannot get benchmarks running in k8s. I suspect that too many tasks are being scheduled in parallel.

I added resource constraints in the code:

@ray.remote(num_cpus=1)
def execute_query_stage(

...

@ray.remote(num_cpus=1)
def execute_query_partition(

I am running the benchmark with

RAY_ADDRESS='http://localhost:8265' ray job submit --working-dir `pwd` -- python3 tpcbench.py --benchmark tpch --queries /home/ray/datafusion-benchmarks/tpch/queries/ --data /mnt/bigdata/tpch/sf100  --concurrency 4

My cluster definition is:

apiVersion: ray.io/v1alpha1
kind: RayCluster
metadata:
  name: datafusion-ray-cluster
spec:
  headGroupSpec:
    rayStartParams:
      num-cpus: "0"
    template:
      spec:
        containers:
          - name: ray-head
            image: andygrove/datafusion-ray-tpch:latest
            imagePullPolicy: Always
            resources:
              limits:
                cpu: 2
                memory: 8Gi
              requests:
                cpu: 2
                memory: 8Gi
            volumeMounts:
              - mountPath: /mnt/bigdata  # Mount path inside the container
                name: ray-storage
        volumes:
          - name: ray-storage
            persistentVolumeClaim:
              claimName: ray-pvc  # Reference the PVC name here
  workerGroupSpecs:
    - replicas: 2
      groupName: "datafusion-ray"
      rayStartParams:
        num-cpus: "4"
      template:
        spec:
          containers:
            - name: ray-worker
              image: andygrove/datafusion-ray-tpch:latest
              imagePullPolicy: Always
              resources:
                limits:
                  cpu: 5
                  memory: 64Gi
                requests:
                  cpu: 5
                  memory: 64Gi
              volumeMounts:
                - mountPath: /mnt/bigdata
                  name: ray-storage
          volumes:
            - name: ray-storage
              persistentVolumeClaim:
                claimName: ray-pvc

I build my image with this Dockerfie, which extends the datafusion-ray image built from the repo.

FROM andygrove/datafusion-ray

RUN sudo apt update && \
    sudo apt install -y git

RUN git clone https://github.com/apache/datafusion-benchmarks.git
andygrove commented 5 days ago

I tried running locally rather than in k8s using ray.init() to create the cluster. The issue is that we are using too much object store memory. For TPC-H q2 @ 100GB, it consumes all the memory on my workstation (128 GB) and then crashed. I tried limiting object store memory with ray.init(num_cpus=concurrency, object_store_memory=512 * 1024 * 1024) and it ran longer, but is spilling huge amounts of data to disk and is taking an unreasonable amount of time.

Here is an example where it is spilling a huge amount of data.

(raylet) Spilled 35419 MiB, 1062 objects, write throughput 1534 MiB/s.
andygrove commented 5 days ago

Root cause is https://github.com/apache/datafusion-ray/issues/46