mmcdermott / MEDS_transforms

A simple set of MEDS polars-based ETL and transformation functions
MIT License
17 stars 3 forks source link

Add aggregation for quantile computation of code values #51

Closed mmcdermott closed 2 months ago

mmcdermott commented 2 months ago

This should go into the aggregate_code_metadata.py file, which means we need

There are a few options for each of these.

Parameters

For the parametrization question, the easiest approach is just to expect a dedicated config parameter (e.g., n_quantiles) that is outside the list of aggregations of interest. This means you could only compute one set of quantiles per aggregation stage, but makes things trivial to incorporate into the existing aggregate_code_metadata.py structure. This is likely the best starting point. The right long term solution is to make the aggregation names be "parameterized constants" instead of pure constants, so that you can do something like "values/quantiles/5" or "values/quantiles/100@0.3" or something instead to specify the number of quantiles and (optionally) the error parameter for approximation algorithms (said algorithms are needed as you can't easily compute exact quantiles across a sharded dataframe)

Mapping and Reducing

To do the mapreduce properly, we can either include an approximate quantile algorithm, such as by polarsifying this ddsketch algorithm. Alternatively, we can do something more memory intensive but also more accurate that is likely sufficient and not too expensive for our needs provided we can restrict the precision of the numerical values observed to a certain # of digits which is to just compute the full count of each unique value observed or otherwise compute a density approximation or histogram approximation and use that to compute the quantiles after the reduction step is finalized. This would make the mapping and reduction easier, but more expensive computationally, but also would require some post-reduction step.

Storage format

We should likely use a fixed-size array type or a struct type that way it will likely be more efficient than a non-fixed-length list type.

Others who may have inputs

@Oufattole @prenc @EthanSteinberg

Oufattole commented 2 months ago

I think we can use DDSketch to get protobuf binary strings for each shard as shown here. We could store a parquet file for each shard with two columns: code and ddsketch_protobuff_binary_string. Then, we can have a reduce step that loads these parquets, converts the protobuf back to a DDSketch, and reduces them by merging the sketches using ddsketch.merge(another_ddsketch).

mmcdermott commented 2 months ago

This seems reasonable. Try it and see, I say!

mmcdermott commented 2 months ago

Another way would be to just aggregate the values via a value_counts call in the aggregation, then write a custom function to merge those two structs and sum over the fields (though I'm not sure how that function would look off-hand). To make the cost not too extreme, you can also add a "precision" field which sets how many decimal points are allowed or how many bins are considered between a max and min value that are set in the code_metadata.parquet file and compute counts in those bins instead.

This would be basically like fitting a histogram to the data then inferring quantiles from the histogram.

mmcdermott commented 2 months ago

So, to summarize a conversation on slack for this issue and future, related issues.

There are three strategies here:

1. Quick and dirty, but doesn't scale:

Forget the aggregate_code_metadata.py format and just right a custom transformation function for quantilization that loads all shards at once and computes quantiles. This is easy to implement but won't scale to large datasets, and ultimately will likely be replaced by a solution in the Mapreduce format.

2. Capture an explicit histogram:

Using pl.Expr.value_counts() either with or without binning, we can basically compute a histogram on each code in each shard, then have a merge function merge them. In the format of aggregate_code_metadata.py, each shard would output a dataframe of with columns code and value_counts, with value_counts being a pl.Struct type column with the values as keys and counts as values. The challenge with this is that the merging might be tricky, because structs in polars can be unnested horizontally into columns, but not directly exploded into rows across both keys and values, which would permit an efficient join and aggregation solution.

One solution to the problem with value_counts would be to instead use a combination of pl.Expr.unique(maintain_order=True) to get the unique values in order of appearance in one list and pl.Expr.unique_counts() to get the counts of those same unique values also in order of appearance in another list. That would then permit you to, during reduce, explode both of those added columns vertically and do a group by. This is likely the best approach, presuming it can be verified through tests that the orders are properly aligned.

The solution for this is likely to make the "histogram" like aspect of this clearer from the outset, and just explicitly pre-compute the min and max value for all codes across all shards, using the existing aggregations, then define in a configuration file the number of bins supported for the histogram operation such that bins can be pre-defined based on the min and max, then have all shards fill the same bins in the same way in an array column that can be easily summed in a pointwise manner, presuming that is supported (though it should be as arrays are of fixed size). The challenge with this is that it is not the most precise form of quantiling, as linearly interpolating between the min and max may yield poor bin boundaries if the data are highly skewed or unequally distributed. Note that dependeing on how aggregate_code_metadata.py works (in particular if it needs each aggregation to store one column) you might need to put both lists into a two-element struct that you then unnest horizontally before exploding vertically in the reduce, I don't remember.

3. Use a fancy algorithm

There are many fancier algorithms, like DDSketch, that basically track, instead of hard bin endpoints, representative data points weighted by their occurrence rates in a data dependent manner, therefore enabling density respondent statistics calculation. These, when properly designed and implemented, may be more precise and more efficient than either of the two approaches above, bit for bit. But, generally they are more complex and harder to implement. Relying on DDSketch or an existing explicit library, in particular, will rely on using user defined functions which will significantly slow the execution down relative to relying on native polars. So, the real challenge with these approaches would be trying to see if one could implement them directly without relying on UDFs, though of course that improvement could happen at a later stage.

mmcdermott commented 2 months ago

Actually, a fourth option -- just don't even store the counts. Just store the observed values (with repetition), or a random sample of N of the values for some parameter N in a list column, then in the reduce just concatenate all the lists, explode the output, and compute the quantiles. That's probably the quickest and easiest solution for now. The problem with this, of course, is that it will be expensive as the lists stored will be very long (often much longer than the list of unique values). To make it faster given the lengths of the list, it would probably be smarter to explode in the reduce function but before actually joining the tables, so that tables are just concatenated vertically during the reduce and you don't need to worry about the linear time list concatenation operation in polars.

mmcdermott commented 2 months ago

Closing for now due to the stupid solution implementation. If this is too inefficient, we will need to re-open and use a better solution.