mjakubowski84 / parquet4s

Read and write Parquet in Scala. Use Scala classes as schema. No need to start a cluster.
https://mjakubowski84.github.io/parquet4s/
MIT License
283 stars 65 forks source link

missing tail records of large(~193M) parquet files #323

Closed conderls closed 1 year ago

conderls commented 1 year ago
  1. I have a parquet file (~193MB, 22666 records)generated by spark3.1.2 and missing some records when reading the file with parquet4s(only 14447 records of 22666 are loaded)
    // record schema
    case class OriginInfo(
    hash: Long = -1,
    id: Long = -1,
    dtype: String = "Unknown",
    content: String = ""
    )

image

  1. if I just read and rewrite the file with pandas, it works;
  2. if I split the file into files with small size, e.g. repartition it to 2 files, it works;
  3. if I repeat a data frame (which parquet4s can load properly) and get a parquet file around 200MB, it failed.

it is kinda weird,does it mean that it data in one block buffer is exhausted?

gaohongkui commented 1 year ago

same problem

conderls commented 1 year ago

I found out the issue is caused by ParquetIterable[T].toSeq not triggering loading full data and I close the data stream.

using ParquetIterable[T].toArray solving this issue.

mjakubowski84 commented 1 year ago

Oh! That is something interesting. I wonder why Scala's toSeq doesn't iterate over the whole iterable, but toArray does it...

conderls commented 1 year ago

Oh! That is something interesting. I wonder why Scala's toSeq doesn't iterate over the whole iterable, but toArray does it...

The code snippet I used is

val data = ParquetReader.as[T].read(Path(_path))
try {
  //  `.toSeq`:
  //       failed if there are multi row-groups in the parquet file.
  //       java.io.IOException: Stream is closed!
  data.toArray
} finally {
  data.close()
}

we can create multi row-groups parquet file with pandas

In [1]: df = pd.DataFrame([[1,"a"], [2, "b"]], columns=["foo", "bar"])
   ...: df.to_parquet("test.parquet", row_group_size=1)

In [2]: import pyarrow as pa

In [3]: pa.parquet.read_metadata("test.parquet")
Out[3]:
<pyarrow._parquet.FileMetaData object at 0x7fc8c43bf040>
  created_by: parquet-cpp-arrow version 13.0.0
  num_columns: 2
  num_rows: 2
  num_row_groups: 2  # <<<<<<<<<< 2 row groups
  format_version: 2.6
  serialized_size: 2007

The main reason for .toSeq failed here is because the lazy implementation under the hood

As with toIterable, it's lazy in this default implementation, as this TraversableOnce may be lazy and unevaluated.