ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.7k stars 5.73k forks source link

[Ray Data] Allow operating in `pyarrow` format in `iter_torch_batches` #44615

Open dev-goyal opened 6 months ago

dev-goyal commented 6 months ago

Description

Currently, iter_torch_batches itself calls iter_batches, https://github.com/ray-project/ray/blob/1d6983380b8adacd33f92588110a820e6587467c/python/ray/data/iterator.py#L393

without setting a value for batch_format, which means it defaults to the default numpy format https://github.com/ray-project/ray/blob/1d6983380b8adacd33f92588110a820e6587467c/python/ray/data/iterator.py#L138

This means, the incoming batch in iter_torch_batches is first converted to numpy, before any processing is done on it. Especially in the case when collate_fn is provided, this could be somewhat wasteful. Consider the following example,

table = pa.Table.from_pydict({
    "col_a": [[0, 1], [1, 2, 3], [1]],
    "col_b": [0, 2, 3],
    "col_c": [0.1, 0.2, 0.3]
})

print(table["col_a"].type)
# list<item: int64>

table["col_a"].to_numpy()
# array([array([0, 1]), array([1, 2, 3]), array([1])], dtype=object)

# Now, I'd like to convert col_a to a ragged tensor to later generate bags of embeddings. 
# See: https://pytorch.org/docs/stable/generated/torch.nn.EmbeddingBag.html
# This means creating two flat arrays - offsets and values. 
# This coincidentally is also how pa.ListArray() is already setup

# In pyarrow: generate the offsets and values
array = table["col_a"].combine_chunks()
offsets = torch.as_tensor(array.offsets[:-1].to_numpy())
values = torch.as_tensor(array.values.to_numpy())
print(offsets)
# tensor([0, 2, 5], dtype=torch.int32)
print(values)
# tensor([0, 1, 1, 2, 3, 1])

# In numpy: generate the offsets and values
array = table["col_a"].to_numpy()  # collate_fn would get this as a numpy array
offsets = torch.as_tensor(
    np.array([0] + [len(val) for val in array]).cumsum()[:-1]
)
# Flatten the inhomogeneous array of arrays into a single dimensional vector
values = torch.as_tensor(np.hstack(array))
print(offsets)
# tensor([0, 2, 5])
print(values)
# tensor([0, 1, 1, 2, 3, 1])

Thus, if collate_fn would allow accepting pyarrow, this would be much easier.

Moreover, given that collate_fn runs on the GPU, compared to something like map_batches on CPUs, it would make sense to offload as much computation as possible earlier in the pipeline.

Use case

See the bag of embeddings example above.

dev-goyal commented 6 months ago

If this sounds reasonable, and I haven't missed anything crucial, I'd be quite happy to make the PR. Perhaps we want to only make this configurable if collate_fn is provided?

scottjlee commented 6 months ago

@dev-goyal thanks for the suggestion! We would be happy to help shepherd the PR, let us know if we can help answer any questions.

I think it make senses to override the current default batch type used only if collate_fn is provided