uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.8k stars 284 forks source link

Error reading parquet files made by AWS Athena #447

Closed RoelantStegmann closed 4 years ago

RoelantStegmann commented 4 years ago

I made a bunch of parquet files using an amazon athena CTAS query. I downloaded these files to first test locally (the end goal is to access the data from S3).

If I run the code below;

import s3fs
from petastorm.reader import make_batch_reader
from petastorm.tf_utils import make_petastorm_dataset

dataset_url = "file:///Data/test-parquet"

with make_batch_reader(dataset_url) as reader:
    dataset = make_petastorm_dataset(reader)
    for batch in dataset:
        break
batch.correct

I receive a lot of warnings and then an error in for batch in dataset

pyarrow.lib.ArrowIOError: The file only has 1 row groups, requested metadata for row group: 1

If 1 look at dataset.take(1) or something alike, I do see the correct schema of the table. However, I don't seem to be able to access the data.

selitvin commented 4 years ago

Did you try looking at the parquet file structure using parquet-tools cli? It can provide you with more information about what's going on. For example, you would be able to inspect the column-group structure of your file. Not sure why would pyarrow complain about 1 row-group in a file.

RoelantStegmann commented 4 years ago

I continued testing and I can get it to work like this, so a bit confused why the implementation of petastorm does not work...

import pyarrow.parquet as pq
table = pq.ParquetDataset('C:/Data/test-parquet')
table = pq.read_table('C:/Data/test-parquet')
selitvin commented 4 years ago

The difference might stem from the fact that we read not the full table but just one row-group. See how we use piece here: https://github.com/uber/petastorm/blob/efa7a3ce0ac9e2050c227052580ffcd72fdd144d/petastorm/arrow_reader_worker.py#L247

Pieces are constructed here: https://github.com/uber/petastorm/blob/efa7a3ce0ac9e2050c227052580ffcd72fdd144d/petastorm/etl/dataset_metadata.py#L265 (dataset.pieces)

Maybe there is something different in the list of pieces in your dataset object that violates some assumptions in petastorm code?

RoelantStegmann commented 4 years ago

Thanks for the pointers. Obtaining a single piece indeed shows indeed that the row_group is not there.

ParquetDatasetPiece('C:/Data/test-parquet\\20191119_133052_00001_sft4j_01695ac9-ae84-4e6a-8988-4da1211b8a25', row_group=None, partition_keys=[])

I'm a quite new user to parquet so not sure I understand why that would be happening. I thought maybe setting the parameter split_row_groups would help but that is not implemented.

selitvin commented 4 years ago

Is it possible that this parquet file simply has no records?

RoelantStegmann commented 4 years ago

Actually I'm not able to verify the error I reported 4 days ago.

dataset = pq.ParquetDataset('C:/Data/test-parquet')
sorted_pieces = sorted(dataset.pieces, key=lambda x: x.path) 

# I needed the column names
piece = sorted_pieces[-1]
table = piece.read()
table.shape

for piece in sorted_pieces:
    column_names = table.column_names
    table = compat_piece_read(piece, lambda _: pq_file, columns=table.column_names, partitions=dataset.partitions) 
    table.shape

This all works fine - still in contrast to the snippet of code in my opening post. I'll try to go through the flow in petastorm later today.

selitvin commented 4 years ago

Is there anything else to do in this issue? Can we close it?

jeisinge commented 4 years ago

We are getting the same error message for parquets written by Spark. We can confirm this happens for one-row group files as well as multiple row group files. PyArrow is reporting the correct number of row groups, but Petastorm attempts to read past the maximum row group number.

We have attempted adjusting the parquet.block.size to create multiple row groups. This works, but Petastorm fails to read some of these as well. In particular, we are able to read different partitions, but other partitions fail.

Multi row group error:

pyarrow.lib.ArrowIOError: The file only has 6 row groups, requested metadata for row group: 10

This can be replicated by:

pq.ParquetFile("six_row_group.parquet").read_row_group(10)

Metadata appears to be correct:

<pyarrow._parquet.FileMetaData object at 0x7fa4829d2f48>
  created_by: parquet-mr version 1.10.1.2-databricks4 (build 0fde0614ce406457c34b127500b30addc60d3f0a)
  num_columns: 139
  num_rows: 370494
  num_row_groups: 6
  format_version: 1.0
  serialized_size: 102173

We fixed it by decreaseing the row group size to 32 MBs when we wrote the parquet files:

spark.conf.set("parquet.block.size", 1024 * 1024 * 32)

Any help would be much appreciated!

selitvin commented 4 years ago

Let me try to help. Some questions that may help clarifying the picture:

jeisinge commented 4 years ago

Possible Cause?

I found something interesting in these parquet files. I noticed that at https://github.com/uber/petastorm/blob/master/petastorm/etl/dataset_metadata.py#L291, a dictionary of file_path->num_row_groups is attempted to be created. When I inspect the individual files, the path comes up blank, '':

parquet_file = pq.ParquetFile("/foo/data=2020-01-24/part-00087-bd4cd140-58ec-4cb2-b74f-201fa5fc2092.c000.snappy.parquet")
row_group = parquet_file.metadata.row_group(0)
column = row_group.column(0)
print("'{}'".format(column.file_path))
 ''

The result is that the dictionary ends up with a key of ''. I wonder if another read ends up overwriting this file_path key?

It is not entirely clear to me if file_path is reliable. See https://issues.apache.org/jira/browse/ARROW-5349 . (Also, it is not clear to me if this is the cause!) Any thoughts here would be much appreciated!

Answers to questions:

I have been unsuccessful in reproducing sample data --- however, I am continuing my reproduction attempts. I can say that every time I write out the data with a different parquet.block.size, a different partition/file fails.

I have attempted Petastorm reading with PyArrow 0.14.1 and 0.15.1.

The dataset is being written by Spark 2.4.4 as a Delta Lake partitioned table:

dataframe
  .repartition('date)
  .write
  .format("delta")
  .partitionBy("date")
  .mode("overwrite")
  .option("path", path)
  .option("mergeSchema", true)
  .saveAsTable(tableName)

This data is being read in via make_batch_reader.

The data generated by the HelloWorld worked.

jeisinge commented 4 years ago

Hmmm... the above-referenced code might not be the code path taken. Looking at another path when metadata and common_metadata return None, I see https://github.com/uber/petastorm/blob/master/petastorm/etl/dataset_metadata.py#L326:

    def split_piece(piece):
        metadata = compat_get_metadata(dataset.pieces[0], dataset.fs.open)
        return [compat_make_parquet_piece(piece.path, dataset.fs.open,
                                          row_group=row_group,
                                          partition_keys=piece.partition_keys)
                for row_group in range(metadata.num_row_groups)]

Why are we looking up the metadata for dataset.pieces[0] instead of the passed in piece?

When I run:

for piece in dataset.pieces:
  print(piece.get_metadata().num_row_groups)

I get:

11
17
17
16
14
10
8
9
...
selitvin commented 4 years ago

I think you are right - this looks like a bug! Does changing dataset.piece[0] --> piece makes you code work correctly? Modifying the tests now to see if I can reproduce and verify the fix.

jeisinge commented 4 years ago

I've run a couple of tests and it appears to be working. I'm continuing to test with a full train and I'll report back by the end of the week once it has run.

selitvin commented 4 years ago

Thank you. This is a serious bug. I really appreciate you looking into this. I am in the middle of 0.8.1 release and will add this fix to the release. If you don't mind, I'll add you to the reviewers of the fix.