NVIDIA / fsi-samples

A collection of open-source GPU accelerated Python tools and examples for quantitative analyst tasks and leverages RAPIDS AI project, Numba, cuDF, and Dask.
271 stars 115 forks source link

[FEA]Generic gQuant Dask Data #105

Open yidong72 opened 4 years ago

yidong72 commented 4 years ago

In the _node_flow.py file, we have the following logic to handle the non-dask dataframe

output_df[oport] = [iout.compute() for iout in outputs_dly[oport]][0]

We want to generalize it as the normal dask dataframe handles delayed objects.

That's a list of delayed objects. That's just another delayed collection. A dask-cudf or dask dataframe is just a collection of dataframes and itself is a delayed like object since you can call compute on it. So the generalization would be to return the list of delayed objects that are not necessarily a dask dataframe. We would make a class such as "gQuantDaskData" to use as a port type. Then we can handle such a delayed collection as well. Based on ports type we can return something like:

output_df[oport] = gQuantDaskData(outputs_dly[oport])

This idea would generalize our ability to handle dask distributed processing.

The npartitions is just the length of the list i.e. len(outputs_dly[oport])

Users could inherit from gQuantDaskData and set port types for their particular data. Something like:

class DistributedModel(gQuantDaskData):
    pass  # nothing in particular just indicates this port in/out type can be a distributed model

We check for gQuantDaskData in delayed processing call, enforce for npartitions to match, and add that as a delayed input. On output find the port type derived from gQuantDaskData and return. Above example:

output_df[oport] = DistributedModel(outputs_dly[oport])