Closed gumartinm closed 3 years ago
Thanks @gumartinm for providing context and creating this review. Would it be possible to add an integration test for the same? Or have any benchmark test results added in the description with and without the change?
Thanks @gumartinm for providing context and creating this review. Would it be possible to add an integration test for the same? Or have any benchmark test results added in the description with and without the change?
Hmmm, I do not have benchmarks, but I have a query in production environment processing 25 million CSV rows. When working with Spark 2.4.X this query ended in 2 minutes. When we upgraded to Spark 3.0.1, after 1 hour, the query did not finish. We managed to see that it was scanning CSV rows much slower than before (so slow that it did not even finish)
Calling createSerializer
only once per Spark task instead of doing it for every CSV row did the trick.
I will try to upload some screenshots with the DAG and the details of the query execution before and after this change.
In this query, we are performing a LEFT JOIN
between a parquet table located in AWS S3 and a Redshift table (that is why there is an Exchange
after the WholeStageCodegen
operation.
When using the current version of the spark-redshift driver we can see that after 21 minutes, Spark is still reading CSV rows (it would take more than one hour for this query to finish):
If we use the patched version we can see that the same query finished in less than 2 minutes reading 25 million CSV rows:
Ship-it. I will start working on merging and releasing this fix after doing some local tests. Do you know if this would impact .json data as well? Thanks again for debugging and working on this fix.
Ship-it. I will start working on merging and releasing this fix after doing some local tests. Do you know if this would impact .json data as well? Thanks again for debugging and working on this fix.
As far as I know, this fix only impacts CSV files because the UNLOAD
operation from Redshift only generates CSV.
Instead of creating a new serializer per row, it is better to create only one per schema.
It is thread-safe because this object does not escape from the thread associated with some Spark task.