tensorflow / tfx

TFX is an end-to-end platform for deploying production ML pipelines
https://tensorflow.org/tfx
Apache License 2.0
2.1k stars 706 forks source link

Slow parquet to TFRecord using parquet_executor.Executor #6389

Open tgrunzweig-cpacket opened 11 months ago

tgrunzweig-cpacket commented 11 months ago

If the bug is related to a specific library below, please raise an issue in the respective repo directly:

TensorFlow Data Validation Repo

TensorFlow Model Analysis Repo

TensorFlow Transform Repo

TensorFlow Serving Repo

System information

Describe the current behavior Reading a single 20MB file using example_gen = FileBasedExampleGen(input_base=_data_root, custom_executor_spec=custom_executor_spec) context.run(example_gen)

takes 37s on this machine Describe the expected behavior reading the same file using pd.read_parquet() takes 37ms, thats 1000x times faster.

Standalone code to reproduce the issue

in jupyter notebook

import pandas as pd import numpy as np import string

import tensorflow as tf from tfx import v1 as tfx from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext from google.protobuf.json_format import MessageToDict

from tfx.components import FileBasedExampleGen from tfx.components.example_gen.custom_executors import parquet_executor from tfx.dsl.components.base import executor_spec

arr_random = np.random.randint(low=2, high=10, size=(100000,26)) columns = list(string.ascii_uppercase) df = pd.DataFrame(arr_random, columns=columns) df.to_parquet('./gen_data/test.parquet')

_pipeline_root = './pipeline/' _data_root = './gen_data/'

context = InteractiveContext(pipeline_root=_pipeline_root) custom_executor_spec = executor_spec.BeamExecutorSpec(parquet_executor.Executor) example_gen = FileBasedExampleGen(input_base=_data_root, custom_executor_spec=custom_executor_spec)

context.run(example_gen)

Providing a bare minimum test case or step(s) to reproduce the problem will greatly help us to debug the issue. If possible, please share a link to Colab/Jupyter/any notebook.

Name of your Organization (Optional) cpacket networks Other info / logs

in jupyter lab cells.

%%time context.run(example_gen)

CPU times: user 37.3 s, sys: 176 ms, total: 37.5 s Wall time: 37.8 s

%%time df = pd.read_parquet('./gen_data/test.parquet')

CPU times: user 37.1 ms, sys: 32.3 ms, total: 69.4 ms Wall time: 37.7 ms

maybe related to issue #4561?

Include any logs or source code that would be helpful to diagnose the problem. If including tracebacks, please include the full traceback. Large logs and files should be attached.

singhniraj08 commented 10 months ago

@tgrunzweig-cpacket,

The difference in execution time of FileBasedExampleGen component and pandas.read_parquet is because of the fact that pandas read_parquet will just read the parquet object and return a pandas dataframe whereas FileBasedExampleGen reads the Parquet files and transform the data to TF examples.

Internally it does the data type conversion by running dict_to_example() and splits the data into train and eval sets which leads to more execution time.

Thank you!

tgrunzweig-cpacket commented 10 months ago

Hi Niraj, thanks for the quick reply.

Sure, converting a file to tfrecords is more than loading the file to memory, but I would think that those other things should not take that long either. For example if I trivially implement something like your other steps in straight python it looks like this:

df = pd.read_parquet('./gen_data/test.parquet')
df_train,df_test = train_test_split(df)
d_train = dict(zip(df_train.index, df_train.values))
d_test = dict(zip(df_test.index, df_test.values))

For the 20MB file I generated above the time to execute (using %%time in my notebook)

loading from disk to df: 37ms splitting to train, test: 30ms converting to dicts: 52ms

I'm not sure whats the dict format that dict_to_example() expects (its not described in the function's docstring), are you suggesting that this function would take 37s for this 100,000 long dictionary? Is that truly what is expected?

singhniraj08 commented 10 months ago

@tgrunzweig-cpacket,

Going through parquet_executor.py code, the executor class runs a beam pipeline which reads parquet files using beam.io.ReadFromParquet and then converts the parquet object to TFExample using dict_to_example.

The slower execution can be because of the fact that that each input split is transformed by this function separately.

@briron, TFX ExampleGen with parquet executor takes around 45s in execution whereas this is done by pandas in 44ms with same size of data. Is this expected? Thanks!