tensorflow / io

Dataset, streaming, and file system extensions maintained by TensorFlow SIG-IO
Apache License 2.0
698 stars 281 forks source link

Streaming interleaved parquet files #1688

Open angusl-gr opened 2 years ago

angusl-gr commented 2 years ago

Hi,

I'm trying to stream multiple Parquet files into a Keras model in an interleaved streaming fashion: e.g. stream rows from say ["file_1.parquet", ...,, "file_4.parquet"] and interleave them, and when one file is exhausted open file_5.parquet in its place and so on. The best I can do is effectively the final comment in this similar issue, but the performance is extremely slow compared to just writing my own generator which reads whole file in and produces batches. I have also tried the tensorflow_io.arrow integration but I get a lot of warnings, the performance seems poor and I can't find a straightforward way to do interleaving. Am I missing something or is this the best I can do?

Thanks!

My code for reference:

import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio

def convert_ordered_dict_to_tensor(d):
    vals = list(d.values())
    return tf.transpose(tf.convert_to_tensor(vals))

if __name__ == "__main__":
    for i in range(10):
        data = np.random.rand(10**6, 100).astype(np.float32)
        df = pd.DataFrame(data)
        df.columns = [str(col_name) for col_name in df.columns]
        df.to_parquet(f"file_{i}.parquet")

    batch_size = 1024
    num_workers = 2
    columns = {f"{i}": tf.TensorSpec(tf.TensorShape([]), tf.float32) for i in range(100)}
    ds = tf.data.Dataset.list_files("file_*.parquet")
    ds = (
        ds.interleave(
            lambda f: tfio.IODataset.from_parquet(f, columns=columns),
            cycle_length=2,
            block_length=2,
            num_parallel_calls=num_workers,
        )
        .batch(batch_size)
        .map(convert_ordered_dict_to_tensor)
        .prefetch(2)
    )
    for batch in ds:
        pass
angusl-gr commented 2 years ago

Just bumping as I still haven't managed to find a resolution to this

dsiegel commented 2 years ago

I have the same issue. Using tfio.IODataset.from_parquet is extremely slow when used with interleave, but without interleave seems to work fine.

dsiegel commented 2 years ago

I found that moving the batch inside theinterleave` speeds things up a lot. Also "deterministic=False" might help. cycle_length should be >= num_workers.

ds.interleave(
            lambda f: tfio.IODataset.from_parquet(f, columns=columns).batch(2000),
            cycle_length=num_workers,
            block_length=1,
            num_parallel_calls=num_workers,
            deterministic=False,
        )
        .map(convert_ordered_dict_to_tensor)
        .prefetch(2)

Maybe related: #1709