ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.23k stars 5.81k forks source link

[core] Sending large binary blobs over object store scaler superlinearly #25217

Open krfricke opened 2 years ago

krfricke commented 2 years ago

What happened + What you expected to happen

See https://github.com/ray-project/ray/pull/25177

Sending large binary strings over the Ray object store scales superlinearly. Benchmarks:

Checkpoint ser/de for 0.50 GB took -2.34 seconds.
Checkpoint ser/de for 1.00 GB took -5.18 seconds.
Checkpoint ser/de for 2.00 GB took -28.52 seconds.
Checkpoint ser/de for 4.00 GB took -90.82 seconds.

We would expect this to scale linearly (i.e. 2GB should take 10 seconds, 4GB should take 20 seconds).

These benchmarks were on a Macbook 2019 16 inch. This might be related to object spilling (?) - we should run this on beefier instances to confirm (I can do that in a bit).

In Ray Tune, we avoid this currently by chunking the data and streaming it via an actor: https://github.com/ray-project/ray/blob/master/python/ray/tune/utils/file_transfer.py#L280

We can do such chunking in Ray AIR as well, however, I think it would be good to resolve this on the core side, rather than having to do this on the library level.

~Thus, a few ideas:~

~1. Ray core support for generators (see @stephanie-wang's proposal) and maybe a utility for chunking data~ ~2. Streaming support out of the box (automatic chunking/resolving of generators)~

~For 2, we could automatically adjust to available object store size, so that potentially we could transfer objects that are larger than the object store.~

(this doesn't really work as the data remains in the object store)

Versions / Dependencies

Latest master

Reproduction script

import os
import shutil
import time

import ray

from ray.ml import Checkpoint

def create_large_checkpoint(size_in_gb: float) -> str:
    path = f"/tmp/checkpoint_large_{size_in_gb:.2f}_gb"
    if os.path.exists(path):
        print(f"Return existing checkpoint in {path}")
        return path

    print(f"Creating checkpoint in {path}")
    os.makedirs(path)
    with open(os.path.join(path, "checkpoint.bin"), "wb") as fp:
        fp.write(os.urandom(int(size_in_gb * 1024 * 1024 * 1024)))

    return path

checkpoint_size = 1.0

checkpoint_path = create_large_checkpoint(size_in_gb=checkpoint_size)

ray.init()
shutil.rmtree("/tmp/checkpoint_benchmark_output", ignore_errors=True)
start_time = time.monotonic()
Checkpoint.from_object_ref(
    Checkpoint.from_directory(checkpoint_path).to_object_ref()
).to_directory("/tmp/checkpoint_benchmark_output")
time_taken = start_time - time.monotonic()

print(f"Checkpoint ser/de for {checkpoint_size:.2f} GB took {time_taken:.2f} seconds.")

Issue Severity

Medium: It is a significant difficulty but I can work around it.

rkooo567 commented 2 years ago

These benchmarks were on a Macbook 2019 16 inch. This might be related to object spilling (?) - we should run this on beefier instances to confirm (I can do that in a bit).

Is it possible the same issue is reproducible from Linux?

Also cc @scv119 @stephanie-wang (can you guys help finding the owner for this work?)

krfricke commented 2 years ago

Benchmarks on a m5.8xlarge instance (128 GB RAM, linux):

Mean (SD) over 10 runs

0.25 GB: 1.19 (0.12)
0.50 GB: 2.19 (0.01)
1.00 GB: 4.46 (0.12)
1.50 GB: 6.61 (0.13)
2.00 GB: 8.97 (0.32)
3.00 GB: 13.20 (0.24)
4.00 GB: 14.49 (0.30)
8.00 GB: 31.76 (2.74)
16.00 GB: 114.93 (27.13)
24.00 GB: 230.28 (33.94)
Usage:
 0.0/32.0 CPU
 0.00/77.348 GiB memory
 0.00/37.140 GiB object_store_memory

So this seems to be related to memory/object store size (might be spilling).

Adjusted benchmark script for multiple runs:

import os
import shutil
import time

from collections import defaultdict

import numpy as np

import ray

from ray.ml import Checkpoint

def create_large_checkpoint(size_in_gb: float) -> str:
    path = f"/tmp/checkpoint_large_{size_in_gb:.2f}_gb"
    if os.path.exists(path):
        # print(f"Return existing checkpoint in {path}")
        return path

    # print(f"Creating checkpoint in {path}")
    os.makedirs(path)
    with open(os.path.join(path, "checkpoint.bin"), "wb") as fp:
        fp.write(os.urandom(int(size_in_gb * 1024 * 1024 * 1024)))

    return path

ray.init()

repeats = 5

results = defaultdict(list)
for checkpoint_size in [32.0]:
    for i in range(repeats):
        checkpoint_path = create_large_checkpoint(size_in_gb=checkpoint_size)

        shutil.rmtree("/tmp/checkpoint_benchmark_output", ignore_errors=True)
        start_time = time.monotonic()
        Checkpoint.from_object_ref(
            Checkpoint.from_directory(checkpoint_path).to_object_ref()
        ).to_directory("/tmp/checkpoint_benchmark_output")
        time_taken = time.monotonic() - start_time

        results[checkpoint_size].append(time_taken)
        print(f"Checkpoint ser/de for {checkpoint_size:.2f} GB took {time_taken:.2f} seconds.")

    shutil.rmtree(f"/tmp/checkpoint_large_{checkpoint_size:.2f}_gb")
    print(f"{checkpoint_size:.2f} GB: {list(results[checkpoint_size])}")

print("Raw")
for size, res in results.items():
    print(f"{size:.2f} GB: {list(res)}")

print("Summary")

for size, res in results.items():
    arr = np.array(res)
    print(f"{size:.2f} GB: {np.mean(arr):.2f} ({np.std(arr):.2f})")
scv119 commented 2 years ago

we should investigate to see where it slows down, and decide what to do next.

fishbone commented 2 years ago

This seems not only happening for mac, but also for linux.

0.25 GB: 1.19 (0.12)
0.50 GB: 2.19 (0.01)
1.00 GB: 4.46 (0.12)
1.50 GB: 6.61 (0.13)
2.00 GB: 8.97 (0.32)
3.00 GB: 13.20 (0.24)
4.00 GB: 14.49 (0.30)
8.00 GB: 31.76 (2.74)
16.00 GB: 114.93 (27.13)
24.00 GB: 230.28 (33.94)

Basically, it's not scaling up linearly.

fishbone commented 2 years ago

Benchmark the parallel copy and get the following metrics:

1 GB 111 ms
2 GB 222 ms
3 GB 997 ms
4 GB 5432 ms
5 GB 5748 ms
6 GB 6989 ms

I think it's highly related to parallel copy. Let me see how to fix this.

suquark commented 2 years ago

If it is related to page fault and swap to disk, maybe this could be helpful? https://man7.org/linux/man-pages/man2/memfd_create.2.html There are not macOS correspondings though

hora-anyscale commented 2 years ago

Per Triage Sync: @scv119 can you please clarify the current status on this?

zen-xu commented 2 weeks ago

Just put the code into ray.remote.

from __future__ import annotations

import os
import shutil
import time

from collections import defaultdict

import numpy as np
import ray

from ray.train import Checkpoint

ray.init()

def create_large_checkpoint(size_in_gb: float) -> str:
    path = f"/tmp/checkpoint_large_{size_in_gb:.2f}_gb"
    if os.path.exists(path):
        return path

    os.makedirs(path)
    with open(os.path.join(path, "checkpoint.bin"), "wb") as fp:
        fp.write(os.urandom(int(size_in_gb * 1024 * 1024 * 1024)))

    return path

@ray.remote
def benchmark(checkpoint_size: float, repeats: int) -> list[float]:
    checkpoint_path = create_large_checkpoint(checkpoint_size)
    times_taken: list[float] = []
    for _ in range(repeats):
        shutil.rmtree("/tmp/checkpoint_benchmark_output", ignore_errors=True)
        start_time = time.monotonic()
        checkpoint = ray.get(ray.put(Checkpoint.from_directory(checkpoint_path)))
        checkpoint.to_directory("/tmp/checkpoint_benchmark_output")
        times_taken.append(time.monotonic() - start_time)
    shutil.rmtree(checkpoint_path)
    return times_taken

repeats = 5
results = defaultdict(list)
for checkpoint_size in [0.25, 0.5, 1, 1.5, 2, 3, 4, 8]:
    results[checkpoint_size] = ray.get(benchmark.remote(checkpoint_size, repeats))
    print(f"{checkpoint_size:.2f} GB: {results[checkpoint_size]}")

print("Raw")
for size, res in results.items():
    print(f"{size:.2f} GB: {list(res)}")

print("Summary")
for size, res in results.items():
    arr = np.array(res)
    print(f"{size:.2f} GB: {np.mean(arr):.2f} ({np.std(arr):.2f})")
image