RTIInternational / teehr

Tools for Exploratory Evaluation in Hydrologic Research
https://rtiinternational.github.io/teehr/
MIT License
7 stars 7 forks source link

Add initial metrics to v0.4 #226

Closed mgdenno closed 2 weeks ago

mgdenno commented 3 weeks ago

Under this issue we will add initial metrics to the new architecture approach in v0.4.

An initial file with metrics calculations is available in the playground at: https://github.com/RTIInternational/teehr/blob/v0.4-beta/playground/pyspark/pandas_udfs_complete.py Note this file contains both useful (e.g., NSE, KGE, etc.) and likely not useful UDFs (e.g., count, min, max) which are certainly already available in PySpark. This file adds the decorator to the function definition, but that nis likely not a good idea since testing and utilizing these functions outside of PySpark will be more difficult if we continue this pattern. A better pattern would be to define the function then create a new function with the decorator, something like this:

from pyspark.sql.functions import pandas_udf

def nse():
    # some code to calculate nse.
    pass

nse_udf = pandas_udf(nse, "float")

Then in PySpark we should first try the simplest approach of using the DataFrame API. In some initial tests the SQL API was faster, but I think we should try to utilize and optimize the DataFrame API approach because it is much simpler to understand and extend. Using this approach, we would have something like (note code is lazy so nothing actually happens until the end):

joined_df = (# Read in joined dataset using methods already in code.)

# apply one or more filters using methods already in code
# we are able to loop over filters and apply them one at a time using apply_filters()
joined_df = joined_df.filter()

# apply user requested grouping
grouped_df = joined_df.groupBy("some", fields") # our group by fields

# Assuming we will write an "calculate_metrics" function that loops over user requested metrics,
# reads the user provided input if any (e.g., name to use, bootstap parameters, perhaps a fields to use for a threshold, etc.)
# and adds them one at a time to the metrics_df using .agg(), like this:
metrics_df =grouped_df.agg(nse_udf("value_field", value_field"), alias="new_field_name"))

Note there will also likely be a step to run some "post metric" metrics. For example, there may be a metric that takes other metrics as input. So it might be filter -> group_by -> initial_metrics -> metric_postprocessing. I will find an example, but in this case we will need to know the names of the initial_metrics that go into the metric postprocessing steps. Also if the user requests a metric that must be calculated in post-processing we need to make sure that any initial metrics that are required are added. Probably need some structure around this idea.

I'm sure there is more to say here...

mgdenno commented 3 weeks ago

@samlamont Just FYI incase you didn't see this: https://spark.apache.org/docs/3.1.1/api/python/user_guide/arrow_pandas.html#pandas-udfs-a-k-a-vectorized-udfs