mjakubowski84 / parquet4s

Read and write Parquet in Scala. Use Scala classes as schema. No need to start a cluster.
https://mjakubowski84.github.io/parquet4s/
MIT License
283 stars 65 forks source link

failed to read parquet generated by pandas #312

Closed conderls closed 1 year ago

conderls commented 1 year ago

I created some test data with pandas=2.0.3 python=3.7

pd.DataFrame([[1, "a", ["a", "b"]]], columns=["d", "s", "seq"]).to_parquet("/tmp/data.parquet")

and parquet4s failed to read it

    case class TestItem(
      d: Long,
      s: String,
      seq: Array[String]
    )

    val path = Path("/tmp/data.parquet")

    // reading
    val parquetIterable = ParqReader.as[TestItem].read(path)
    try {
      parquetIterable.foreach{ item =>
        println(item)
      }
    } finally {
      parquetIterable.close()
    }

with decode error raised, which seems failed to decode list data:

// scala 2.12.12, parquet4s 2.12.0
Failed to decode field seq of record: RowParquetRecord(d=LongValue(1),s=BinaryValue(Binary{1 constant bytes, [97]}),seq=ListParquetRecord(RowParquetRecord(item=BinaryValue(Binary{1 constant bytes, [97]})),RowParquetRecord(item=BinaryValue(Binary{1 constant bytes, [98]}))))
com.github.mjakubowski84.parquet4s.ParquetRecordDecoder$DecodingException: Failed to decode field seq of record: RowParquetRecord(d=LongValue(1),s=BinaryValue(Binary{1 constant bytes, [97]}),seq=ListParquetRecord(RowParquetRecord(item=BinaryValue(Binary{1 constant bytes, [97]})),RowParquetRecord(item=BinaryValue(Binary{1 constant bytes, [98]}))))
    at com.github.mjakubowski84.parquet4s.ParquetRecordDecoder$DecodingException$.apply(ParquetRecordDecoder.scala:34)
    at com.github.mjakubowski84.parquet4s.ParquetRecordDecoder$.$anonfun$headValueDecoder$1(ParquetRecordDecoder.scala:61)
    at com.github.mjakubowski84.parquet4s.ParquetRecordDecoder$.$anonfun$headValueDecoder$1(ParquetRecordDecoder.scala:65)
    at com.github.mjakubowski84.parquet4s.ParquetRecordDecoder$.$anonfun$headValueDecoder$1(ParquetRecordDecoder.scala:65)
    at com.github.mjakubowski84.parquet4s.ParquetRecordDecoder$.$anonfun$genericDecoder$1(ParquetRecordDecoder.scala:76)
    at com.github.mjakubowski84.parquet4s.ParquetRecordDecoder$.decode(ParquetRecordDecoder.scala:46)
    at com.github.mjakubowski84.parquet4s.ParquetIterable$.$anonfun$apply$1(ParquetIterable.scala:19)
    at scala.collection.immutable.List.map(List.scala:293)
    at com.github.mjakubowski84.parquet4s.ParquetIterableImpl.$anonfun$iterator$1(ParquetIterable.scala:219)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator$ConcatIterator.advance(Iterator.scala:199)
    at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:227)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.Iterator$ConcatIterator.foreach(Iterator.scala:179)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at com.github.mjakubowski84.parquet4s.etl.CompoundParquetIterable.foreach(CompoundParquetIterable.scala:6)

if the test data is wrote with com.github.mjakubowski84.parquet4s.ParquetWriter, it just works perfectly.

mjakubowski84 commented 1 year ago

Well, in your Pandas, column d is not Long. Check what is the type of d and use the proper Scala type.

conderls commented 1 year ago
  1. the error message shows that it failed to decode field seq, but not d field. And I have tried and successfully decoded the following data pd.DataFrame([[1, "a"]], columns=["d", "s"]).to_parquet("/tmp/data.parquet")
// successfully print parsed result:
TestItem(1,a)

failed with data

// data generated with pandas
pd.DataFrame([[["a", "b"]]], columns=["s"]).to_parquet("/tmp/data.parquet")

// scala
case class TestItem(s: Seq[String])
  1. I have also tried to create data with spark 3.2.1
    Seq((1L, "a", Seq("a", "b"))).toDF("d", "s", "seq").write.parquet("file:///tmp/data_spark")

    the parquet4s can successfully read the data.

so there may be something mismatched between pandas and scala to read/write parquet.

mjakubowski84 commented 1 year ago

Ah, yes. I wrongly read the logs. Pandas encapsulated elements of "seq" within yet another object with field "item". If you are using the latest version of Parquet4s and the issue persists then it means that Parquet4s doesn't recognise that format of lists. So, either propose a fix to Parquet4s or add an encapsulating case class around items of "seq".

pt., 8 wrz 2023, 10:56 użytkownik conderls @.***> napisał:

the error message shows that it failed to decode field seq, but not d field. And I have tried and successfully decoded the following data pd.DataFrame([[1, "a"]], columns=["d", "s"]).to_parquet("/tmp/data.parquet")

// successfully print parsed result:TestItem(1,a)

— Reply to this email directly, view it on GitHub https://github.com/mjakubowski84/parquet4s/issues/312#issuecomment-1711319627, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACY66ZWD7G3C6GUPSGN6OODXZLMVBANCNFSM6AAAAAA4QAUGFQ . You are receiving this because you commented.Message ID: @.***>

conderls commented 1 year ago

Ah, yes. I wrongly read the logs. Pandas encapsulated elements of "seq" within yet another object with field "item". If you are using the latest version of Parquet4s and the issue persists then it means that Parquet4s doesn't recognise that format of lists. So, either propose a fix to Parquet4s or add an encapsulating case class around items of "seq".

  1. the parquet4s I used is 2.12.0 which is already the latest version;
  2. I got your meaning and checked the schema of list which varies between pandas and scala/spark
    
    # schema of data generated by pandas(pyarrow<13.0.0, one should upgrade it >=13.0.0 to keep consistent  schema with spark)
    schema: org.apache.parquet.schema.MessageType =
    message schema {
    optional int64 d;
    optional binary s (UTF8);
    optional group seq (LIST) {
    repeated group list {
      optional binary item (UTF8);
    }
    }
    }

schema of data generated by parquet4s or spark

schema: org.apache.parquet.schema.MessageType = message spark_schema { required int64 d; optional binary s (UTF8); optional group seq (LIST) { repeated group list { optional binary element (UTF8); <--- element vs item } } }



I finally found the solution according to [pyarrow docs](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow.parquet.write_table)
with option `use_compliant_nested_type=True` during saving data to parquet with pandas
`df.to_parquet("/tmp/data.parquet", use_compliant_nested_type=True)`

the default value of `use_compliant_nested_type=True` since pyarrow=13.0.0, so one should set the option to True explicitly or update the pyarrow>=13.0.0.
mjakubowski84 commented 1 year ago

Legacy pyarrow nested types will be supported also in Parquet4s 2.14.0