Closed danscales closed 7 months ago
@danscales I'll be honest, I'm not really loving the shift to RDD. Not that it was perfectly readable before, but feel like doing things in RDD makes it way more complicated and harder to read/modify. I'm not totally sure if we need UDFs though, I'm pretty sure you can use out-of-the-box spark functions at the end if you just keep track of the values per year in AFiData.
I asked chatgpt and got the suggestion you could do something like this:
// Assuming your dataframe is named df
val nestedDF = df.groupBy("ID")
.pivot("year")
.agg(first("value"))
.withColumnRenamed("ID", "ID")
.withColumn("values_per_year", struct((1 to num_years).map(year => col(s"`$year`").alias(s"year_$year")): _*)
.drop((1 to num_years).map(year => s"year_$year"): _*)
nestedDF.show()
Which transforms a dataframe like this:
+---+----+-----+
| ID|year|value|
+---+----+-----+
| 1|2019| 10|
| 1|2020| 20|
| 1|2021| 30|
| 2|2019| 15|
| 2|2020| 25|
| 2|2021| 35|
+---+----+-----+
To something like this, which I think is what we need to output it to JSON:
+---+------------------------------------+
| ID| values_per_year|
+---+------------------------------------+
| 1| {2019 -> 10, 2020 -> 20, 2021 -> 30}|
| 2| {2019 -> 15, 2020 -> 25, 2021 -> 35}|
+---+------------------------------------+
But this is kind of beyond where I understand how to do things simply in Scala, so up to you if you think this is the best approach.
@danscales I'll be honest, I'm not really loving the shift to RDD. Not that it was perfectly readable before, but feel like doing things in RDD makes it way more complicated and harder to read/modify. I'm not totally sure if we need UDFs though, I'm pretty sure you can use out-of-the-box spark functions at the end if you just keep track of the values per year in AFiData.
I asked chatgpt and got the suggestion you could do something like this:
// Assuming your dataframe is named df val nestedDF = df.groupBy("ID") .pivot("year") .agg(first("value")) .withColumnRenamed("ID", "ID") .withColumn("values_per_year", struct((1 to num_years).map(year => col(s"`$year`").alias(s"year_$year")): _*) .drop((1 to num_years).map(year => s"year_$year"): _*) nestedDF.show()
Which transforms a dataframe like this:
+---+----+-----+ | ID|year|value| +---+----+-----+ | 1|2019| 10| | 1|2020| 20| | 1|2021| 30| | 2|2019| 15| | 2|2020| 25| | 2|2021| 35| +---+----+-----+
To something like this, which I think is what we need to output it to JSON:
+---+------------------------------------+ | ID| values_per_year| +---+------------------------------------+ | 1| {2019 -> 10, 2020 -> 20, 2021 -> 30}| | 2| {2019 -> 15, 2020 -> 25, 2021 -> 35}| +---+------------------------------------+
But this is kind of beyond where I understand how to do things simply in Scala, so up to you if you think this is the best approach.
OK, thanks for the suggestion! I will try this approach and see how complex it is if I can get it to work.
OK, thanks for the suggestion! I will try this approach and see how complex it is if I can get it to work.
OK, Justin, I switched back to using DataFrames. It was not easy to figure out! See my message for my new commit. As you suggested, I added lossYear to the DataGroup. Using pivot() seemed complicated and possibly inefficient, but I was able to use another built-in aggregator, map_from_array(). However, I still needed to use a UDF, to force the maps to be sorted and fully populated with years (2021, 2022, 2023) even with some zero entries.
Let me know what you think about the new version, and feel free to add more comments. Thanks!
GTC-2747 New columns in AFi which is forest loss per year since 2021
Added two new columns which are forest loss for each year from 2021 for both the natural forest (SBTN) and the JRC forest cover in each location.
AFi was previously written mainly using data frames. This worked well, because there were only scalar numbers in each column - no complex data structure like per-year forest loss (ForestChangeDiagnosticDataLossYearly). With this change, I need to aggregate on two new values which are per-year-forest loss. This is not easy to do in dataframes, because you have to define user-defined aggregate functions (and possibly user-defined types). And I actually ran into a run-time bug compiling the generated Java code for the needed user-defined aggregate function, which I couldn't figure out how to work around.
So, I switched the code to do most of the aggregation in RDDs, similar to forest_change_diagnostic. This means we don't have to define user-defined aggregation functions, and in fact, can use the normal merge function of ForestChangeDiagnosticDataLossYearly.
I defined the range of years for forest loss in AfiCommand.{TreeCoverLossYearStart,TreeCoverLossYearEnd}. Added some extra args for the empty() and prefilled() methods of ForestChangeDiagnosticDataLossYearly, so it works for different year ranges. I set the end year to 2023 already, since TCL 2023 is coming in at about the same time.
Removed a bunch of unused imports in various files.