activeviam / par-student-spark-atoti

Project with students from CentraleSupelec to explore Spark API in order to power atoti with Spark
1 stars 0 forks source link

Implement offset #17

Open sehyod opened 2 years ago

sehyod commented 2 years ago

We want the list method to be able to take a limit and an offset parameters. However there is no OFFSET keyword in spark SQL so we have tried to find some workarounds. Here is a sum up of the different potential solutions we have explored.

First approach

Our first idea was to query limit + offset rows and then slice the result array in Java. image While this solution does work, it is highly inefficient for large offsets as we fetch numerous useless rows, which takes memory and time.

monotonically_increasing_id

Our second idea was to add a temporary autogenerated column with an id thanks to the spark function monotonically_increasing_id. image At first, this solution seemed to be properly working and had way better performance results than our first approach. However we realized that this code was not always working when running tests on Databricks with large dataset. In fact, as soon as spark uses more than one partition for the data, it stops working. After checking the documentation, we realized that the generated IDs were not consecutive: there are on a same partition but there is an offset of 2^33 (=8589934592) between each partition.

nth_value

Next we tried to use the nth_value function (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.nth_value.html). However, this is not adapted to our need, since it is quite tricky to handle the case where there are not enough rows and nth_value returns null.

User Defined Function

Our last solution was to define a "User Defined Function" thanks to the help of this StackOverflow post . The idea is to add an _id column generated from a monotonically_increasing_id column and a partition_id column.

Test

We checked the method with a notebook on Databricks. image There are 15160640 entries and these entries are on several partitions, so the monotonically_increasing_id (the column mon_id here) goes up to 68719915055. image image The _id is properly generated, with IDs going from 0 to 15160639 image and they are all different: image This means we can use this solution to properly compute the offset!

NB: We could have made a much simpler function, that would just subtract partition_id * 2^31 from the monotonically_increasing_id but the documentation says that the offset of 2^31 is the current implementation, implying it could change in the future. That is why we thought it would be safer to use this more complex but more generic function.