databricks / koalas

Koalas: pandas API on Apache Spark
Apache License 2.0
3.32k stars 355 forks source link

Question: how to avoid all data being merged in the driver? #2059

Open lfdversluis opened 3 years ago

lfdversluis commented 3 years ago

I have some code that performs a groupby + apply

df = ks.read_parquet(os.path.join(hdfs_path, folder, "tasks", "schema-1.0"),
             columns=[
                 "workflow_id", "id", "task_id", "children", "parents", "ts_submit", "runtime"
             ], pandas_metadata=False)

grouped_df = df.groupby("workflow_id")
toposort_df = grouped_df.apply(toposort)

# Reduce the number of files being output to 10 instead of hundreds.
toposort_df.spark.repartition(10).to_parquet(output_location_topo, compression='snappy', engine='pyarrow')

All it has to do is perform some computation on each group and write the outcome to parquet, however, checking the UI it seems that all data is first aggregated to the driver. Can I somehow make it that each executor just writes the data directly per group without having to go through the driver?

I tried the following approaches without success:

  1. Running df.to_parquet() in the toposort function, but that didn't really work for some odd reason. As if executors weren't waiting for the write to wrap up.
  2. Skipping the .spark.repartition(10) part. It wrote many more files, but still aggregated all data in the executor.
  3. Running grouped_df.apply(toposort).spark.repartition(10).to_parquet(output_location_topo, compression='snappy', engine='pyarrow') also does not work.

Would running grouped_df.apply(toposort).spark.repartition(10).to_parquet(output_location_topo, compression='snappy', engine='pyarrow') (so, not assigning the output to toposort_df) do the trick?

ueshin commented 3 years ago

Does the function toposort have a return type annotation? If not, Koalas collects some amount of data into the driver to infer the return type.

See also:

Also, what default index type are you using? As seems like you don't specify index_col parameter when reading, you have to carefully choose the default index type.

Thanks.

lfdversluis commented 3 years ago

@ueshin good pointers (thanks!). I did check the type hints and my function has a return type annotated: def toposort(df) -> pd.DataFrame["workflow_id": int, "task_id": int, "task_slack": int]

What this function does is it applies topological sorting to a DAG graph (pretty much like https://pypi.org/project/toposort/ does) and then per group computes the difference in finish times which I named task_slack).

I do not specify the index_col indeed. As I group by workflow_id I guess I should set it to this column? I will try this out.

lfdversluis commented 3 years ago

I do not see in https://koalas.readthedocs.io/en/latest/reference/api/databricks.koalas.DataFrame.groupby.html#databricks.koalas.DataFrame.groupby how you can groupby on the index? Specifying workflow_id or index or level=0 is not supported, while https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.groupby.html does explicitly mention this. Is the best way to approach this to run reset_index(drop=False) and then run the groupby?