ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.89k stars 5.76k forks source link

Sparse object reads - read part of an object, without downloading the entire object #20500

Open ericl opened 2 years ago

ericl commented 2 years ago

Overview

Currently, accessing an object requires ray.get(), which downloads the entire object to a node. This means that applications must pre-split their objects to the granularity desired by read tasks. For example, in distributed shuffle, this means each map task must produce O(nreducers) objects instead of a single sorted object.

There are two problems here:

  1. If we don't know the object splits ahead of time, this strategy can't work.
  2. Even if we know the object splits, this adds a lot of metadata overhead tracking many tiny objects that have effectively identical metadata reference counting lifetimes. This makes the metadata plane the bottleneck for very large shuffle workloads (e.g., 10m+ objects). Right now the metadata overhead for each object is significant O(1KB), so tracking millions of objects can consume gigabytes of worker memory.

Possible solution

There are several potential APIs:

  1. mmap-based (get-only change)

    array = ray.get(x, map_sparse=True)
    print(array[5])  # fetches the pages needed to return this

    limitation: only works with zero-copy data types (Arrow tables, numpy arrays)

  2. index-based (user has to structure get/put data into an array)

    x = ray.put(ray.ObjectArray([a, b, c, d, e, f, g]))
    print(ray.get(x)[5])

    limitation: not transparent to reader / writer

  3. code-based (imagine an actor attached to a read-only object)

    ref = ray.readonly_actor(list).remote(x)
    print(ray.get(ref.get_element(5)))

    limitation: object must be pinned in server memory

Proposed solution

The current thought is to go with Option 2, which can be implemented as a new experimental API: ray.experimental.ObjectArray. The rationale is that it's the simplest solution and most general.

kfstorm commented 2 years ago

An interesting topic. 👍

jon-chuang commented 2 years ago

I definitely like this idea. Definitely a pain point I've come across.

Some thoughts:

  1. On-demand paging is only really useful for large objects (or large collections of small objects)
    • since for small or medium objects (or collections), we should prefer to fetch the entire object.
  2. It needs to exploit decomposability of the language's data structure (e.g. slice, column, row, pointers + alignment)
    • rather than focus on arrays, one could have an abstract interface for the language front-end to determine how the language's type is mapped to pages, so it could be extended to beyond array-like types.
    • This requires an extra "page table" which may live in the object's metadata.
    • zero-copy serialization formats typically already provide some relative pointer functionality, which could be exploited to mimic the indexing into an array
  3. If taking the approach 2. in the OP rather than 1., could more generic collections than arrays, and retrievals of parts thereof fit into the xlang RayDatatype and RaySerializer?
    • Is this desired - what use cases are there?

Additional thoughts:

  1. If we only transmit part of an object, are we still transmitting the metadata along with it?
  2. For partially resolved objects, do they get stored in any way in the owner's object table?
    • Is there some way of expressing object pages, rather than an object as being pinned (say for the following, where we pass a slice as the arg to a function - my_func.remote(obj_ref[5000:10000]))?)
    • Is expressing this even desirable?
  3. Not a big fan of mmap-based - do the protocols even exist to trigger a remote fetch on an OS page-fault...?

Notes to self: