eakmanrq / sqlframe

Turning PySpark Into a Universal DataFrame API
https://sqlframe.readthedocs.io/en/stable/
MIT License
290 stars 9 forks source link

On duckdb engine, I get unconsistent result between full outer join. #51

Closed cristian-marisescu closed 4 months ago

cristian-marisescu commented 4 months ago

Pretty self explanatory from output, pyspark fills in the nulls, duckdb engine doesn't. What's the plan on same output guarantee? Seems like fugue-project offers consistency between various NULL implementations https://fugue-tutorials.readthedocs.io/index.html?highlight=guarantees#ibis

What do you think? Do you see possibility on replicating exact behavior of spark in other engines or leaving it up to the user to know the different flavors of each execution context?

duckdb


initial: DataFrame = spark.createDataFrame(
    [
        (1, "data"),
        (2, "data"),
    ],
    ["id", "data"],
)

for_join: DataFrame = spark.createDataFrame(
    [
        (1, "other_data"),
        (2, "other_data"),
        (3, "other_data"),
    ],
    ["id", "other_data"],
)

joined = initial.join(for_join, on="id", how="full")

joined.show()

python sqlframe_playground.py
+------+------+------------+
|  id  | data | other_data |
+------+------+------------+
|  1   | data | other_data |
|  2   | data | other_data |
| None | None | other_data |
+------+------+------------+                                                                                                                                                                                                                                      

pyspark

python sqlframe_playground.py
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/04 06:33:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+---+----+----------+                                                           
| id|data|other_data|
+---+----+----------+
|  1|data|other_data|
|  2|data|other_data|
|  3|NULL|other_data|
+---+----+----------+
eakmanrq commented 4 months ago

Great question. My instinct right now is I want to bring the PySpark API to other engines but make it feel like a native DataFrame API for that engine. Basically I wonder how would it behave if that engine provider wrote the API themselves and just happened to write it in a PySpark compatible way. This is what led to the current decision for example to have the data format strings accept the engine's format instead of PySpark format. Also in BigQuery I have timestamp functions return a timestamp object although technically a Spark timestamp is a BigQuery datetime.

So to directly talk to your example, if someone was wanting a native PySpark API on DuckDB then I think they would expect the result you got since it behaves how DuckDB behaves.

What do you think? In the long term, if the project got enough adoption, I could see this being configurable but in the short term I need to pick an approach.

cristian-marisescu commented 4 months ago

I understand your current approach, preferring exact engine processing due to familiarity and expected results. This is especially useful for learning pyspark without actually deploying it. However, in the long term, I see a more configurable approach, focusing on common functions across engines with different outcomes. Differences in NULL handling, data types, and different function defaults can create confusion.

Engineers might use this by unit testing locally or in CI with duckdb for speed, then deploying to a cluster with pyspark. Or another case, you have a mix of both big data, large fact tables and really small data, like mappings from business for some dimensions. We could run the facts on pyspark engine and the small data on the duckdb and would like to share common functions between those two(like standard cleaning, standard key creations, common mapping, logic, etc.).

You can see where this is going if the unified api doesn't actually deliver a unified result. For start I would see it configurable with a simple switch, just a true or false to use selected engine defaults or pyspark defaults. Making it more configurable than that would create a lot of edge cases and combinations between all of the available engines and I don't see the benefit. I don't see the need of writing pyspark api, running it on duckdb, with bigquery defaults, that's asking for troubles. But I see writing pyspark api, running it on a different engine, but with the same defaults pyspark has, for complete portability between backends and no unexpected behavior.

eakmanrq commented 4 months ago

Yeah definitely agree about making it a bool.

I wonder if I could implement this at first for just DuckDB to start in order to address this use case. Based on the adoption I could decide if it should be expanded to other engines.

cristian-marisescu commented 4 months ago

Personally, I see pyspark - duckdb combo, just because of the following points:

  1. one distributed on cloud + one fast local processing
  2. possible bigger pool of adoption vs bigquery, snowflake (the dreaded vendor lock in)
  3. duckdb and not postgres just because of natural olap advantage over oltp in etl+ growing momentum of duckdb
eakmanrq commented 4 months ago

The issue you demonstrated at the top is actually a bug in SQLFrame when doing a full outer join. It should have coalesced the two columns and instead it just picked the left. The linked PR fixes that problem.

Going to create a new issue for allowing setting a config to accept Spark date strings instead of the engine's format to help enable writing in pure Spark. I think outside of that I will plan on addressing issues on a as-reported basis and if I have a contradiction between engine behavior then I will use that future config to allow both behaviors.

Thanks for reporting this and the good discussion!

eakmanrq commented 4 months ago

Included in 1.6.2

cristian-marisescu commented 4 months ago

Thank you for all of your work, truly excited to see how it evolves.