NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
787 stars 228 forks source link

[QST] Recommended approach for a reference avro writer/reader #927

Closed galipremsagar closed 3 years ago

galipremsagar commented 3 years ago

What is your question?

This is a question to the spark team of rapids. As part of cuIO refactor, We(rapids cudf team) are currently working on adding fuzz testing coverage for our avro reader(we currently only have reader support - no writer support). To compare and evaluate our avro reader we would require a reference writer/reader to write/read and compare the dataframes.

Since there are is no avro python API support yet in pandas and in pyarrow. We have explored using (pandavro)[https://github.com/ynqa/pandavro], but this library lacks support for nullable values like pd.NA, pd.NAT. So while leaves us with no ability to test for nullable columns.

To be able to achieve that, we tried going from:

Pandas nullable dtypes written to a parquet file -> read parquet in pyspark -> write to avro in pyspark

This final file would be read in by cudf python API and be compared against the original data frame.

The pipeline from pandas to pyspark to cudf is kind of as follows:

>>> import pandas as pd
>>> df
    str  float   int  int8   bool  unsigned cat
0     a   0.32     1     1   <NA>         1   a
1  <NA>   0.32     2     2   True         2   v
2     v   0.00     3     3  False         3   z
3     a   0.23  <NA>  <NA>   True      <NA>   a
>>> df.to_parquet('a.parquet')

>>> from pyspark.sql import SparkSession
>>> # initialise sparkContext
>>> spark = SparkSession.builder \
...     .master('local') \
...     .appName('myAppName') \
...     .config('spark.executor.memory', '5gb') \
...     .config("spark.cores.max", "6") \
...     .getOrCreate()

>>> df = sqlContext.read.parquet('a.parquet')
>>> df
DataFrame[str: string, float: double, int: bigint, int8: tinyint, time: timestamp, bool: boolean, unsigned: bigint, cat: string]
>>> df.show()
+----+-----+----+----+-----+--------+---+
| str|float| int|int8| bool|unsigned|cat|
+----+-----+----+----+-----+--------+---+
|   a| 0.32|   1|   1| null|       1|  a|
|null| 0.32|   2|   2| true|       2|  v|
|   v|  0.0|   3|   3|false|       3|  z|
|   a| 0.23|null|null| true|    null|  a|
+----+-----+----+----+-----+--------+---+
>>> df.write.format("avro").save("file_avro")

>>> import cudf
>>> cudf.read_avro("file_avro")
    str  float   int  int8   bool  unsigned cat
0     a   0.32     1     1   <NA>         1   a
1  <NA>   0.32     2     2   True         2   v
2     v   0.00     3     3  False         3   z
3     a   0.23  <NA>  <NA>   True      <NA>   a

By choosing this approach there is one limitation which will not allow duration types to be written to parquet.

So having this one drawback for that approach would you suggest this way or recommend a better way to achieve a reference avro writer?

galipremsagar commented 3 years ago

cc: @kkraus14

revans2 commented 3 years ago

Pandas nullable dtypes written to a parquet file -> read parquet in pyspark -> write to orc in pyspark

I think you mean write in avro instead of orc, but I get the general idea.

My main concern with this is around date and time types. Parquet has had issues with date/time types related to Gregorian vs Julian calendars. In fact Spark 3.0+ now has two separate modes that are controlled by a combination of Spark specific metadata in the file and Spark configs when reading/writing the data. This same issue/feature exists in the avro spark code too. So if you want to test timestamps before 1900 you are likely going to run into these issues.

You also need to be aware that Spark only supports microseconds for timestamps. So if you try to store any other value of timestamp it will be converted into microseconds when it is read by Spark. Because of this the Spark Avro writer also only supports writing out timestamps in microseconds despite avro supporting more options than that.

You should also be aware that spark converts all timestamps to the local timezone for processing. When writing the data out it will convert them back to UTC though, except for CSV. This should not likely be too much of a problem, except possibly in some corner cases if a timezone might overflow before it is converted back. Normally I wouldn't worry about it, but because you are doing fuzz testing you need to be careful.

You also need to make sure that you configure spark to only have a single task or you might end up with you output in multiple files.

That is all I can think of off the top of my head.

revans2 commented 3 years ago

Sorry I see the unsigned types in there too. Spark, because it is written in java, only supports signed values, so you are likely to have the schema modified when going to/from unsigned values.

revans2 commented 3 years ago

So having this one drawback for that approach would you suggest this way or recommend a better way to achieve a reference avro writer?

Despite that one drawback set of drawbacks I think it would work, and we can help you get the correct configs to at least get as much of the spark specific issues out of the way.

That said doing a quick search for python avro implementations

https://duckduckgo.com/?q=avro+python+implementation&t=canonical&ia=web

I see both spavro and fastavro. I'm not sure if you could build something with them that might give you what you want, but then again you might be trading one set of issues/incompatibilities for another.

kkraus14 commented 3 years ago

Given the limitations in Spark outlined by @revans2 (Thanks!), I don't think it makes sense to use in this scenario if we want to get sufficient coverage of the IO formats.

galipremsagar commented 3 years ago

I think you mean write in avro instead of orc, but I get the general idea.

+1 Yes, I actually meant to write avro

Thanks @revans2 for providing us a detailed overview of the limitations, I have connected offline with @kkraus14 and we'll be exploring on using fastavro