NVIDIA / DALI

A GPU-accelerated library containing highly optimized building blocks and an execution engine for data processing to accelerate deep learning training and inference applications.
https://docs.nvidia.com/deeplearning/dali/user-guide/docs/index.html
Apache License 2.0
5.06k stars 615 forks source link

Low reading performance of TFRecords via S3 #5551

Open fversaci opened 2 months ago

fversaci commented 2 months ago

Version

1.40.0.dev20240628

Describe the bug.

Hi,

I am testing the throughput of DALI (v1.40.0.dev20240628) when reading TFRecords via S3 and I am obtaining unexpectedly poor results.

I am scanning the ImageNet training dataset (140 GB, on a system tuned to have 30 GB of available RAM, to prevent automatic memory caching). The dataset is either in its original FILES format or in the form of TFRECORD files, each with a size of 64 MB. In both cases the dataset is read in batches of size 128.

When reading (with no decoding) from fast NVME disks, these are the results I am getting:

This difference seems reasonable, since TFRECORD entails less file I/O overhead.

However, when I run the same tests with the datasets stored on the same disks and the same node, but made available via a MinIO server, the results are:

In this second scenario, the speed of accessing FILES might be reasonable in comparison to the previous case, as accessing S3 incurs higher overhead than directly accessing the filesystem. However, I would have anticipated a much faster speed for TFRECORD, given that it is supposed to distribute access latencies across 64 MB blocks, which is significantly larger than the average 115 kB JPEG files. Surprisingly, TFRECORD is even slower than the FILES setup.

We also conducted a test using your 12 MB tfrecord example with a batch size of 47, and we achieved performance consistent with the previous results.

Do you have any ideas about what could be causing the low reading performance of TFRecords via S3? Are we overlooking some optimization parameters in the TFRecord reader?

Thanks!

Minimum reproducible example

To evaluate the performance of reading the DALI_extra TFRecord example file, both from the filesystem and a pre-configured S3 server:

# read DALI_extra train from the filesystem
python3 tf_read.py --reader=tfrecord --tfrecord-file=/data/tmp/train \
  --tfrecord-index=/data/tmp/train.idx --bs 47 --epochs 20
# read DALI_extra train via S3
python3 tf_read.py --reader=tfrecord --tfrecord-file=s3://tmp/train \
  --tfrecord-index=s3://tmp/train.idx --bs 47 --epochs 20

Similarly, but utilizing the ImageNet training dataset:

```bash
# scan imagenet train images (original jpegs) from the filesystem
python3 tf_read.py --reader=file --file-root=/data/imagenet/train/
# scan imagenet train images (tfrecord) from then filesystem
python3 tf_read.py --reader=tfrecord --file-root=/data/imagenet/tfrecords/train/ \
  --index-root=/data/imagenet/tfrecords/train_idx/
# scan imagenet train images (original jpegs) via S3
python3 tf_read.py --reader=file --file-root=s3://imagenet/train/
# scan imagenet train images (tfrecord) via S3
python3 tf_read.py --reader=tfrecord --file-root=s3://imagenet/tfrecords/train/ \
  --index-root=s3://imagenet/tfrecords/train_idx/
########################################################################
# tf_read.py
########################################################################
# dali
from nvidia.dali.pipeline import pipeline_def
from nvidia.dali.plugin.base_iterator import LastBatchPolicy
from nvidia.dali.plugin.pytorch import DALIGenericIterator
import nvidia.dali.fn as fn
import nvidia.dali.types as types
import nvidia.dali.tfrecord as tfrec

# varia
from clize import run
from tqdm import trange, tqdm
import math
import time
import boto3
import os

global_rank = int(os.getenv("RANK", default=0))
local_rank = int(os.getenv("LOCAL_RANK", default=0))
world_size = int(os.getenv("WORLD_SIZE", default=1))

def parse_s3_uri(s3_uri):
    if not s3_uri.startswith("s3://"):
        raise ValueError("Invalid S3 URI")

    # Remove the "s3://" prefix
    s3_uri = s3_uri[5:]

    # Split the remaining part into bucket and prefix
    parts = s3_uri.split("/", 1)
    bucket_name = parts[0]
    prefix = parts[1] if len(parts) > 1 else ""

    return bucket_name, prefix

def list_s3_files(s3_uri):
    bucket_name, prefix = parse_s3_uri(s3_uri)
    s3 = boto3.client("s3")
    paginator = s3.get_paginator("list_objects_v2")
    pages = paginator.paginate(Bucket=bucket_name, Prefix=prefix)

    paths = []
    for page in pages:
        if "Contents" in page:
            for obj in page["Contents"]:
                paths.append(f"s3://{bucket_name}/{obj['Key']}")

    return sorted(paths)

def read_data(
    *,
    reader="tfrecord",
    use_gpu=False,
    bs=128,
    epochs=10,
    file_root=None,
    index_root=None,
    tfrecord_file=None,
    tfrecord_index=None,
):
    """Read images from tfrecords or filesystem, in a tight loop

    :param reader: "file" or "tfrecord" (default: tfrecord)
    :param use_gpu: enable output to GPU (default: False)
    :param bs: batch size (default: 128)
    :param epochs: Number of epochs (default: 10)
    :param file_root: File root to be used when reading files or tfrecords
    :param index_root: Root path to index files (only when reading tfrecords)
    :param tfrecord_file: Single tfrecord
    :param tfrecord_index: Single index file
    """
    if use_gpu:
        device_id = local_rank
    else:
        device_id = types.CPU_ONLY_DEVICE_ID

    if reader == "file":
        file_reader = fn.readers.file(
            file_root=file_root,
            name="Reader",
            shard_id=global_rank,
            num_shards=world_size,
            pad_last_batch=True,
            # speed up reading
            prefetch_queue_depth=4,
            # dont_use_mmap=True,
            read_ahead=True,
        )
        chosen_reader = file_reader
    elif reader == "tfrecord":
        if tfrecord_file and tfrecord_index:
            path = [tfrecord_file]
            index_path = [tfrecord_index]
            # path *= 10
            # index_path *= 10
        #### alternalitvely, read list of tfrecords
        elif file_root.startswith("s3://"):
            path = list_s3_files(file_root)
            index_path = list_s3_files(index_root)
        else:
            path = sorted([f.path for f in os.scandir(file_root) if f.is_file()])
            index_path = sorted([f.path for f in os.scandir(index_root) if f.is_file()])

        tf_reader = fn.readers.tfrecord(
            path=path,
            index_path=index_path,
            features={
                "image/encoded": tfrec.FixedLenFeature([], tfrec.string, ""),
                "image/class/label": tfrec.FixedLenFeature([], tfrec.int64, -1),
            },
            name="Reader",
            shard_id=global_rank,
            num_shards=world_size,
            pad_last_batch=True,
            # speed up reading
            prefetch_queue_depth=4,
            # dont_use_mmap=True,
            read_ahead=True,
        )
        chosen_reader = tf_reader["image/encoded"], tf_reader["image/class/label"]
    else:
        raise ('--reader: expecting either "file" or "tfrecord"')

    # create dali pipeline
    @pipeline_def(
        batch_size=bs,
        num_threads=4,
        device_id=device_id,
        prefetch_queue_depth=2,
    )
    def get_dali_pipeline():
        images, labels = chosen_reader
        if device_id != types.CPU_ONLY_DEVICE_ID:
            images = images.gpu()
            labels = labels.gpu()
        return images, labels

    pl = get_dali_pipeline()
    pl.build()

    ########################################################################
    # DALI iterator
    ########################################################################
    # produce images
    shard_size = math.ceil(pl.epoch_size()["Reader"] / world_size)
    steps = math.ceil(shard_size / bs)
    # consume uuids to get images from DB
    for _ in range(epochs):
        # read data for current epoch
        for _ in trange(steps):
            pl.run()
        pl.reset()

# parse arguments
if __name__ == "__main__":
    run(read_data)


### Relevant log output

_No response_

### Other/Misc.

_No response_

### Check for duplicates

- [X] I have searched the [open bugs/issues](https://github.com/NVIDIA/DALI/issues) and have found no duplicates for this bug report
jantonguirao commented 1 month ago

We identified a few inefficiencies in the way we access S3 storage in TFRecord reader vs File reader. https://github.com/NVIDIA/DALI/pull/5554 optimizes that.

fversaci commented 1 month ago

Great! I'll test it as soon as it gets into a nightly build. Thanks!

fversaci commented 1 month ago

Hi @jantonguirao

I reran the S3 tests using DALI 1.41.0.dev20240718, configuring num_threads=32 to optimize throughput. To summarize the setup, the dataset is stored on fast disks within the same node as the client and is accessible via a MinIO server. Here are the results:

While the performance has certainly improved, it's still surprising that accessing fewer, larger TFRecords is slower than accessing multiple smaller files. Can you replicate this behaviour as well?

Thanks!

jantonguirao commented 1 month ago

Even if there are fewer larger files, the requests to S3 are independent. I am unaware if reading parts of large files in AWS S3 is less efficient than smaller individual files. Are you able to get a similar performance with other S3 reading libraries?

fversaci commented 1 month ago

If I understand correctly, you’re accessing the parts/blobs within the TFRecords independently through S3. I believe it would be much faster, whenever possible, to read an entire TFRecord with a single query into memory (or the filesystem) and then access the individual parts locally from there. If you think that individual TFRecords might be too large for this approach (since they can be generated arbitrarily large, though that is not recommended), you could still read larger chunks (such as 64 MB) into local memory and unpack the contents of each chunk locally.

fversaci commented 2 days ago

Hi,

I wanted to ask if there are any updates on this issue.

We are currently doing some benchmarking and would like to achieve the best possible performance with DALI. Do you have any plans to work on this in the near future, or is it currently a low priority?

Thank you!

JanuszL commented 2 days ago

Hi @fversaci,

We have a couple of ideas on how to approach this but this requires a noticeable effort and we cannot commit to any particular date at the moment. I'm sorry as this is probably not the answer you are looking for.

fversaci commented 2 days ago

Hi @JanuszL,

thanks anyway for the update. For now, we'll proceed with testing as it is and will plan to retest everything once the issue is resolved.

JanuszL commented 2 days ago

Will keep you posted.