ccao-data / data-architecture

Codebase for CCAO data infrastructure construction and management
https://ccao-data.github.io/data-architecture/
5 stars 3 forks source link

Refactor ratio stats for build speed increase #521

Open wagnerlmichael opened 5 days ago

wagnerlmichael commented 5 days ago

Summary

This is not a final PR for review, but a progress update to determine next steps.

Currently all of the assesspy functions used in this script are copied in. If we were to move forward with this solution, they would need to be refactored in the actual package rather than copy/pasted and changed in this script.

Currently with these changes the build time for reporting.ratio_stats table is ~15 minutes, a large improvement over the previous ~1 hour. All of this speed up came from editing the boot_ci function. I'm not sure how much speed we could get from editing the other functions. Changing sampling from pandas to numpy index sampling contributed to about ~10% of the speed up whereas parallel processing contributed to ~90% of the speed up.

Dev table here: "z_ci_436_refactor_ratio_stats_job_to_use_pyspark_reporting"."ratio_stats"

Other strategies tried

Spark

I tried for a while with different spark strategies. First I attempted to convert the data frame to a spark data frame and sample on that, but that didn't work. It was extremely slow, I'm assuming this was the case due to computationally intensive transformations from pandas to spark to pandas.

I tried to get around this issue by using a pandas udf. Supposedly, this allows the spark api to operate on the pandas data frame in a columnar format, maintaining speed increases from distributed processing. This also resulted in much longer build times or errors I couldn't work through.

I also tried a single pandas df conversion to spark, and then edit the remaining data structures in boot_ci so that they were all spark compatible, I also could not get this speed up working.

I am new to spark, so it is very possible I missed something obvious or there are remaining workable solutions.

Numba and Dask

I tried basic numba parallelization and Dask parallelization, but neither were able to be imported in properly. I because this is because they both have C bindings and Athena doesn't allow for this with third-party package additions.

concurrent.futures

I tried using this built-in python func but the parallelization was failing due to a pickling error, I switched to multiprocessing and that finally worked.

Considerations on current strategy

If we were to move forward with this solution, we would need to decide how to reconcile the changed boot_ci function with the assesspy build. One option is to edit the package itself and include a boolean param that turns parallel processing on/off. Another option is too just keep the copy pasted functions in this script, but that creates two sources of truth for the assesspy functions which isn't ideal.

One potential upside of not using spark is that we can potentially maintain these functions in assesspy rather than building out an entirely new set of spark assesspy functions.

Other ways forward

We could also continue to develop here. Two other paths forward for me could be: