data-forge / data-forge-ts

The JavaScript data transformation and analysis toolkit inspired by Pandas and LINQ.
http://www.data-forge-js.com/
MIT License
1.33k stars 77 forks source link

New idea for Data-Forge core data pipelines in DF v2 #111

Open ashleydavis opened 3 years ago

ashleydavis commented 3 years ago

Just going to document an idea that I had for building data pipelines in Data-Forge.

This is intended for version 2, but I'm thinking it would be more of a "core" library with a higher level and more feature rich library on top that is more like DFN v1.

Please feel free to comment and give feedback!

Some examples follow...

Setting up a data source:

const dataSource = dataForgeFs.fromCsvFile(filePath);

Building a pipeline and invoking core operations:

const pipeline = new dataForge.Pipeline()
    .map(...)        // Transforms data
    .filter(...)        // Filter data.
    .otherOp(...)  // ... Other core operations

Apply the pipeline to the data source

const pipelineApplication = pipeline.apply(dataSource);

Now the pipeline can be invoked and data extracted in various ways.

  1. Simply iterate the pipeline (using for await for async streaming):

    for await (const record of pipelineApplication.toIterator()) {
    // ...
    }
  2. Convert the data to an array (assuming it can fit in memory), asynchronously:

    const data  = await pipelineApplication.toArray();

Or synchronously:

const data  = pipelineApplication.toArraySync();
  1. Pipe the data to a data target, asynchronously:
    const dataTarget = dataForgeFS.toCsvFile(filePath);
    await pipeApplication.toTarget(dataTarget);

Or synchronously:

await pipeApplication.toTargetSync(dataTarget);

Of course, all the methods can be chained, e.g.:

const data = await new dataForge.Pipeline()
    .map(...)
    .otherOp(...)
    .otherOp(...)
    .apply(dataForgeFs.readCsvFile(filePath))
    .toArray();

Some notes:

Of course, all this is a major departure from the way DF works currently, but there are multiple enhancements that can build on this new API. So I"m thinking this could be more like a "Data-Forge core" library with a higher level library (much like DFN 1) that builds on top of this with more features and functionality.

If it were just the core library it would support the following operations:

What other core operations would this core need to support?

rat-matheson commented 3 years ago

This is fantastic. At a high level, it is exactly how I hoped a data forge 2 api might look, really basic operations with more complex ones imported and applied via map, etc. I haven't had a chance to come up with API suggestions yet but I will. The operations you've mapped out above look like a good start.

What exactly is apply and why does it occur after a transformation operation like map in the example above? The dataForgeFs.readCsvFile(filePath) is throwing me off a bit because I assume we already have a dataFrame based pipeline so I'm unsure why we are apply another from a csv

ashleydavis commented 3 years ago

apply is separate from building the pipeline purely just to be separate. It means you can define a pipeline and reuse it on different data sets.

That's just an idea... and I realize it seems a bit backward. Let's think on that idea some more.

I could easily structure it the other way around (more like DF v1), eg:

const dataSource = dataForgeFs.fromCsvFile(filePath);
const dataTarget = dataForgeFS.toCsvFile(filePath);
await dataSource.map(...).filter(...).etc(....).toTarget(dataTarget);

This kind of example reads better... but it doesn't cleanly separate the data pipeline from the execution of the pipeline.

Please don't let the CSV files put you off. The data source and data target can be anything, indeed you'll be able to implement your own data sources and targets just by providing a custom function that returns an object in a conventional format, so you'll be able to have code that looks like this:

const dataSource = createMyDataSource(... inputs ...);
const dataTarget = createMyDataTarget(... inputs ...);
await dataSource.map(...).filter(...).etc(....).toTarget(dataTarget);

A data source is almost like a factory.... when requested (as a pipeline is being executed) the data source will asynchronously load the data stream from some location (file, REST API, memory location, database, or some custom option).

Same with a data target, it's like a factory.... when requested (again as a pipeline is being executed) the data target will asynchronously save the data stream to some location (again a file, REST API, memory location, database, or whatever you want).

ashleydavis commented 3 years ago

By the way this is all just extreme idea generation... so please give me your wildest ideas.

At some point we'll have to come back to reality and try and figure out if we can make all this backward compatible with DF v1.

rat-matheson commented 3 years ago

gotcha...so in that one example above, you created a pre-constructed pipeline and then 'apply' applied it the the data source you passed in. I think I get the gist of it and had a similar operation in the streaming library I wrote. It might apply here:

interface Stream<T> {
    // typically streaming/pipeline functions
    map<T_MAPPED>((item:T) => D):Stream<T_MAPPED>;
    filter((item:T) => boolean);

    // The juicy bit
    // So the last instance of the stream just prior to the mapStream function gets passed in 
    // as the parameter
    mapStream<T_MAPPED>((stream:Stream<T>) => Stream<T_MAPPED>):Stream<T_MAPPED>;

    ...
}

So in the context of libraries of complex resuable streams/pipelines, the mapStream function was super useful.

Stream.fromJsonArray<Transaction>(path)
    .filter(transaction => transaction.invoicedCompanyId === 'someId')
    .mapStream(MyCustomPipelines.mapToAggregatedTransactions())
    .find(t => ...)
    ...

It was also useful as a baseOperation for implementing higher level operations like join.

I've been thinking about data-forge use cases and using mapStream (or apply(...)) as a way to implement Matrix math. It seems great to me because server side it could use an n-api implementation while browser side could use a browser implementation with absolutely no change in the base pipeline implementation!

Apologies in advance for not having a better use case on hand. As a simplified example, say we want to determine the total cost of goods for each company that sells Acme factory products. Let's say we have two data sets.

DataSet 1: CompanyUnitSales

companyId   productId  unitsSold someOtherCol ...
c1          p1         10
c1          p2         5
c2          p1         14
c2          p2         0
...

DataSet 2: AcmeFactoryProductPrices

productId wholesalePrice someOtherCol ...
p1         10.99
p2         12.50 
...

So in order to do the matrix math, we need to convert to matrices. Matrix1 for DataSet1 is an MxN matrix where M is the company index and N is the product index

[
10   5
14   0
]

Matrix2 for DataSet2 is just a Nx1 vector giving the price for each N product (where the row index maps to the product id)

[
2.00
3.00
]

so Matrix1 x Matrix2 =

[  35
   28  ]

And we see company1 sells $35 with of products and company2 sells $28 worth.

Kind of a silly example but played out on the pipeline, it might look like

let productPricesMatrix = DataForge.fromJsonFile<ProductPrice>(path)
    .mapPipeline(MatrixOps.toSummaryMatrix({
        expandRowIndices:'productId',
        cellValue: 'wholesalePrice',
        // expandColumnIndices: undefined value just inserts a default column
    }))

let companyUnitSales = DataForge.fromJsonFile<CompanyUnitSale>(path2);

companyUnitSales.
    .mapPipeline(MatrixOps.toSummaryMatrix({
        expandRowIndices:'companyId',
        expandColumnIndices: 'productId',
        cellValue:'unitsSold'
    }))

    // do optimized matrix math if loading the n-api version of MatrixOps
    .mapPipeline(MatrixOps.crossProductWith(productPricesMatrix))

    //now back to our good old pipeline operations
    .filter(t => ...)
    .toTarget(Targets.createFile('/someFile.json', Format.json))
ashleydavis commented 3 years ago

Cool example

rat-matheson commented 3 years ago

Would it be helpful to put together a small set of use cases? It could be a good starting point for creating some behavioral tests to help guide/justify the v2 api. I like where I think you are going and perhaps these types of tests could show it working in the wild (even if they all fail due to lack of an implementation).

If this is helpful, is there a particular format you would like? If you have a domain suggestion, and an approach, I can contribute.

ashleydavis commented 3 years ago

No particular format required, just keep writing down your ideas here. It all helps piece this puzzle together ;)