capitalone / datacompy

Pandas, Polars, and Spark DataFrame comparison for humans and more!
https://capitalone.github.io/datacompy/
Apache License 2.0
482 stars 125 forks source link

Comparison fails on dataframes with a single column #253

Open rupertbarton opened 10 months ago

rupertbarton commented 10 months ago

When running a comparison on dataframes with a single column, the following exception is thrown:

/opt/venv/lib/python3.8/site-packages/datacompy/spark.py:356: in rows_both_mismatch
    self._merge_dataframes()
/opt/venv/lib/python3.8/site-packages/datacompy/spark.py:462: in _merge_dataframes
    self._all_rows_mismatched = self.spark.sql(mismatch_query).orderBy(
/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py:1440: in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322: in __call__
    return_value = get_return_value(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
a = ('xro111', <py4j.clientserver.JavaClient object at 0x7f7ae6a7cd90>, 'o32', 'sql')
kw = {}, converted = ParseException()
    def deco(*a: Any, **kw: Any) -> Any:
        try:
            return f(*a, **kw)
        except Py4JJavaError as e:
            converted = convert_exception(e.java_exception)
            if not isinstance(converted, UnknownException):
                # Hide where the exception came from that shows a non-Pythonic
                # JVM exception message.
>               raise converted from None
E               pyspark.errors.exceptions.captured.ParseException: 
E               [PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 36)
E               
E               == SQL ==
E               SELECT * FROM matched_table A WHERE 
E               ------------------------------------^^^
/opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:175: ParseException

I believe it's due to this line always including a WHERE statement even when the where_cond is empty (empty because there are no columns apart from the join column).

fdosani commented 10 months ago

Thanks for the report. Would you be able to provide a minimal example so we can reproduce the issue? That would be really helpful to debug here.

rupertbarton commented 10 months ago

Thanks for your reply, here is a sample of some code that throws an exception for us:

df_1 = spark.createDataFrame([{"a": 1}])
df_2 = spark.createDataFrame([{"a": 1}])

compare = datacompy.SparkCompare(
    spark,
    df_1,
    df_2,
    join_columns=["a"],
    cache_intermediates=True,
)

compare.rows_both_mismatch.count()
rupertbarton commented 10 months ago

The error message is:

---------------------------------------------------------------------------
ParseException                            Traceback (most recent call last)
Cell In[7], line 12
      2 df_2 = spark.createDataFrame([{"a": 1}])
      4 compare = datacompy.SparkCompare(
      5     spark,
      6     df_1,
   (...)
      9     cache_intermediates=True,
     10 )
---> 12 compare.rows_both_mismatch.count()

File /opt/venv/lib/python3.8/site-packages/datacompy/spark.py:356, in SparkCompare.rows_both_mismatch(self)
    354 """pyspark.sql.DataFrame: Returns all rows in both dataframes that have mismatches"""
    355 if self._all_rows_mismatched is None:
--> 356     self._merge_dataframes()
    358 return self._all_rows_mismatched

File /opt/venv/lib/python3.8/site-packages/datacompy/spark.py:462, in SparkCompare._merge_dataframes(self)
    458 where_cond = " OR ".join(
    459     ["A." + name + "_match= False" for name in self.columns_compared]
    460 )
    461 mismatch_query = """SELECT * FROM matched_table A WHERE {}""".format(where_cond)
--> 462 self._all_rows_mismatched = self.spark.sql(mismatch_query).orderBy(
    463     self._join_column_names
    464 )

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py:1440, in SparkSession.sql(self, sqlQuery, args, **kwargs)
   1438 try:
   1439     litArgs = {k: _to_java_column(lit(v)) for k, v in (args or {}).items()}
-> 1440     return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
   1441 finally:
   1442     if len(kwargs) > 0:

File /opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:175, in capture_sql_exception.<locals>.deco(*a, **kw)
    171 converted = convert_exception(e.java_exception)
    172 if not isinstance(converted, UnknownException):
    173     # Hide where the exception came from that shows a non-Pythonic
    174     # JVM exception message.
--> 175     raise converted from None
    176 else:
    177     raise

ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 36)

== SQL ==
SELECT * FROM matched_table A WHERE 
------------------------------------^^^
fdosani commented 10 months ago

Thanks for the details. So it seems like I can reproduce it. I also tried using Pandas and Fugue:

Pandas

df_1 = pd.DataFrame([{"a": 1}])
df_2 = pd.DataFrame([{"a": 1}])

compare = datacompy.Compare(df_1, df_2, join_columns=["a"])
print(compare.report())

Results in:

DataComPy Comparison
--------------------

DataFrame Summary
-----------------

  DataFrame  Columns  Rows
0       df1        1     1
1       df2        1     1

Column Summary
--------------

Number of columns in common: 1
Number of columns in df1 but not in df2: 0
Number of columns in df2 but not in df1: 0

Row Summary
-----------

Matched on: a
Any duplicates on match values: No
Absolute Tolerance: 0
Relative Tolerance: 0
Number of rows in common: 1
Number of rows in df1 but not in df2: 0
Number of rows in df2 but not in df1: 0

Number of rows with some compared columns unequal: 0
Number of rows with all compared columns equal: 1

Column Comparison
-----------------

Number of columns compared with some values unequal: 0
Number of columns compared with all values equal: 1
Total number of values which compare unequal: 0

Fugue

df_1 = spark.createDataFrame([{"a": 1}])
df_2 = spark.createDataFrame([{"a": 1}])

print(datacompy.report(df_1, df_2, join_columns=["a"]))

Results in:

DataComPy Comparison                                                            
--------------------

DataFrame Summary
-----------------

  DataFrame  Columns  Rows
0       df1        1     1
1       df2        1     1

Column Summary
--------------

Number of columns in common: 1
Number of columns in df1 but not in df2: 0
Number of columns in df2 but not in df1: 0

Row Summary
-----------

Matched on: a
Any duplicates on match values: No
Absolute Tolerance: 0
Relative Tolerance: 0
Number of rows in common: 1
Number of rows in df1 but not in df2: 0
Number of rows in df2 but not in df1: 0

Number of rows with some compared columns unequal: 0
Number of rows with all compared columns equal: 1

Column Comparison
-----------------

Number of columns compared with some values unequal: 0
Number of columns compared with all values equal: 1
Total number of values which compare unequal: 0

We should align the Spark with Pandas and Fugue.

@rupertbarton Would you be open to using Fugue for your Spark Compare for now? you should be able to run it successfully. I'll need to debug the native Spark compare. I have been debating if we should just remove it in favor of using Fugue moving forward.

rupertbarton commented 10 months ago

I'll create a ticket in our backlog to investigate switching over, thanks!

fdosani commented 10 months ago

@rupertbarton more for my understanding but could you articulate what sort of use case you have where you are just joining on a single column with nothing else to compare?

fdosani commented 10 months ago

@jdawang @rupertbarton I have a WIP fix here

Getting the following back:

In [2]: print(compare.report())

****** Column Summary ******
Number of columns in common with matching schemas: 1
Number of columns in common with schema differences: 0
Number of columns in base but not compare: 0
Number of columns in compare but not base: 0

****** Row Summary ******
Number of rows in common: 2
Number of rows in base but not compare: 0
Number of rows in compare but not base: 0
Number of duplicate rows found in base: 0
Number of duplicate rows found in compare: 0

****** Row Comparison ******
Number of rows with some columns unequal: 0
Number of rows with all columns equal: 2

****** Column Comparison ******
Number of columns compared with some values unequal: 0
Number of columns compared with all values equal: 0
None

Seems like the Column Comparison is different than the Pandas version. I think this is mostly due to the difference in the underlying logic. In Pandas it would say: Number of columns compared with all values equal: 1.

I can kind of see this both ways. This corner case is just a bit odd cause you aren't really comparing anything, just joining on the key (a).

@jdawang Another reason why I'm thinking maybe we just drop the native Spark implementation. The differences are annoying.

rupertbarton commented 10 months ago

@rupertbarton more for my understanding but could you articulate what sort of use case you have where you are just joining on a single column with nothing else to compare?

Hi! Our use case is that we have a large number of tables we are running assertions on, and all of them work fine apart from 1 particular table. This table has multiple columns, but all of the columns apart from 1 are being encrypted so we're excluding them from the comparison as it's awkward trying to work out what the encrypted values will be, hence why the DF only has a single column.

We still would want to compare that all the values in the expected DF and all the values in the actual DF match up, and we're using the same code for every table.

ramakanth1997 commented 2 months ago

Hi everyone I guess this will work for you print(compare.report(sample_count=1000))