OpenMined / PipelineDP

PipelineDP is a Python framework for applying differentially private aggregations to large datasets using batch processing systems such as Apache Spark, Apache Beam, and more.
https://pipelinedp.io/
Apache License 2.0
273 stars 77 forks source link

DataFrame style API #256

Open dvadym opened 2 years ago

dvadym commented 2 years ago

Context

Now PipelineDP supports 3 execution modes - with Apache Spark, Apache Beam, w/o frameworks (here is an example how to run on different frameworks).

Basically the current API supports works with unstructured collections (RDD for Spark, PCollection for Beam, iterable w/o frameworks) and data semantic is specified with corresponding extractor functions (DataExtractor, usage example is in DPEngine.aggregate).

DataFrame is very common API, it would be great if PipelineDP supports that API natively. There are DataFrame API for both Beam and Spark.

Note: currently it's possible to apply PipelineDP transformations on DataFrames by specifying extractors that returns corresponding column value (example), but this approach has downsides

  1. The result is not DataFrame
  2. PipelineDP can't optimize using column operations
  3. DataFrame API is usually more expressive

PipelineDP APIs

PipelineDP has 2 APIs:

  1. Low-level API of DPEngine, the main function is DPEngine.aggregate.
  2. High-level APIs for Beam, Spark, which introduces PrivatePCollection and PrivateRDD

Goals

The idea of this issue is to design DataFrame API for PipelineDP. There are following ideas:

  1. It should be similar for Pandas, Spark and Beam DataFrames.
  2. Low-level API (i.e. DPEngine.aggregate ) might be something as providing private_id_column, 'partition_key_column', value_column instead of corresponding extractors and taking as input/returning DataFrames.
  3. Ideally (not necessary in 1st version) PipelineDP performs column based operations on DataFrames instead of per-element operations (which should provide performance speed-up).

Note: This is very high-level issue, it will design discussions. I'm happy to participate and help with that.

SemyonSinchenko commented 1 year ago

For Apache Spark it could be done in a way when we convert initial pyspark.sql.DataFrame to RDD[Row] and then to List[RDD[Any]]. So it is not so hard to add a syntax sugar like make_private_df(df: DataFrame, budget_accountant, privacy_id_columns) with a method groupBy and aggregate.

So it will look like:

from pipeline_dp.private_spark import make_private_df, count, sum, average, toDF

df = spark.createDataFrame(...)
private_df = make_private_df(df, budget, "user_id")
agg_private_df = private_df.groupBy("movie_id").aggregate(
  count("*").alias("count"),
  sum("watch_time").alias("total_time"),
  average("rating").alias("avg_rating"),
)

toDF(agg_private_df) # regular pyspark.sql.DataFrame

under the hood it will convert initial pyspark.sql.DataFrame to a list of PrivateRDD and also store the schema of the initial data. Aggregations will be applied to separate RDDs and also there will be corresponding schema update. toDF will combine List[RDD] to RDD[Row] and convert it to spark DataFrame using the inner schema.

It is a "spark-like" syntax. Also, it could be done in the same way for Pandas but without neccesarity of storing the schema cause Pandas is not lazy and schema inference is doing in runtime. But Im not sure about Beam because I'm not familiar with Beam...

SemyonSinchenko commented 1 year ago

I can make a simplified draft and an example and open a pull request. But maybe there is already an understanding of how it should be done?