G-Research / spark-extension

A library that provides useful extensions to Apache Spark and PySpark.
Apache License 2.0
196 stars 26 forks source link

On AWS - after Diff, Insert columns are all null #64

Closed leewalter78 closed 2 years ago

leewalter78 commented 3 years ago

I found this project when trying to compare dataframes using pyspark, and it works appears to work great. I am seeing an issue when running this as part of an AWS Glue job with this jar - spark-extension_2.11-1.3.3-2.4.jar. Locally, it works fine and I cannot reproduce this issue.

If I create the dataframes using spark.createDataFrame().. and do a diff, I get the results I expect. When I read the dataframes from a glue context, after a diff, the Inserts columns are all null. I tried with and without unique identifiers, and I also reduced the number of rows and columns. I thought it might be Sparse mode causing this - so I tried both true and false on the sparse mode.

I created a simple case where the left dataframe has 1 row and the right dataframe has 2. One is the same row, and the diff identifies that and it identifies the Insert as well - but it loses all the values

The Deletes and No Changes rows are as expected, and currently, I only see the issue with Inserts. I have not tested Changes yes - been stuck on Inserts.

Any insights on what might be causing this?

Sample outlook +----+---------+---------+------------------+-----------+-------------+--------------------+--------------+ |diff| col1|col2| col2|col4|col4| col5|col6|col7|col8|col9| +----+---------+---------+------------------+-----------+-------------+--------------------+--------------+ | N|123456789| ABCDE|Test| 670| 4|Description...| 77.0| FL| null| null| | I| null| null| null| null| null| null| null| null| null| null| +----+---------+---------+------------------+-----------+-------------+--------------------+--------------+

Note: I also tried 1.3.0 and the same thing is happening (spark-extension_2.11-1.3.0-2.4)

leewalter78 commented 3 years ago

I know without being able to reproduce it locally makes it nearly impossible to analyze - but was wondering if anyone has seen a similar behavior?

Since the C, D, and N types work - there is a workaround - Do a left anti join on the id columns with the Right Datatframe and the recon results where diff != I. Basically, remove the C,D, and N rows from the right dataframe and you will be left with the inserts.

kasunnirmala commented 2 years ago

@leewalter78 Did you able to fix this?

EnricoMi commented 2 years ago

@leewalter78 do you see the same issue? Can you provide steps for me to reproduce this? "On AWS" does not really help.

leewalter78 commented 2 years ago

No, I was not able to resolve - I ended up using a custom solution with set differences and then joining on the keys .

At the time, I uploaded the jar ( spark-extension_2.11-1.3.3-2.4.jar) to an s3 bucket, and referenced it from the glue job under python lib path.

Then, I believe this was the sample snippet I had tried - (I forget all the details, but there were cases were this did not work - from my post, it was inserts where all the values would be NULL

from gresearch.spark.diff import *

left = spark.createDataFrame([(1, "one"), (2, "two"), (3, "three")], ["id", "value"]) right = spark.createDataFrame([(1, "one"), (2, "Two"), (4, "four")], ["id", "value"]) left.diff(right, "id").show()

leewalter78 commented 2 years ago

I believe the sample code worked above, but when I used my actual dataframes, the values for the Inserts were all null

EnricoMi commented 2 years ago

I just ran that sample snipped on Glue 3.0 Python with spark-extension_2.12-2.1.0-3.1.jar and it worked:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
job.commit()

left = spark.createDataFrame([(1, "one"), (2, "two"), (3, "three")], ["id", "value"])
right = spark.createDataFrame([(1, "one"), (2, "Two"), (4, "four")], ["id", "value"])

from gresearch.spark.diff import *

left.diff(right, "id").show()
+----+---+----------+-----------+
|diff| id|left_value|right_value|
+----+---+----------+-----------+
|   D|  3|     three|       null|
|   N|  1|       one|        one|
|   C|  2|       two|        Two|
|   I|  4|      null|       four|
+----+---+----------+-----------+
kasunnirmala commented 2 years ago

I just ran that sample snipped on Glue 3.0 Python with spark-extension_2.12-2.1.0-3.1.jar and it worked:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
job.commit()

left = spark.createDataFrame([(1, "one"), (2, "two"), (3, "three")], ["id", "value"])
right = spark.createDataFrame([(1, "one"), (2, "Two"), (4, "four")], ["id", "value"])

from gresearch.spark.diff import *

left.diff(right, "id").show()
+----+---+----------+-----------+
|diff| id|left_value|right_value|
+----+---+----------+-----------+
|   D|  3|     three|       null|
|   N|  1|       one|        one|
|   C|  2|       two|        Two|
|   I|  4|      null|       four|
+----+---+----------+-----------+

Can you please specify the exact Job details section, both Basic and Advanced Properties?

EnricoMi commented 2 years ago

Here are the job details:

bobhaffner commented 2 years ago

For what it's worth, I was able to get the same result as Enrico.

EnricoMi commented 2 years ago

Looks like spark-extensions work on AWS Glue and the reported issue is not reproducible. Closing for now.