IBM / data-prep-kit

Open source project for data preparation of LLM application builders
https://ibm.github.io/data-prep-kit/
Apache License 2.0
266 stars 124 forks source link

[Feature] Enable composable/pipelined transforms in the python API #374

Open daw3rd opened 4 months ago

daw3rd commented 4 months ago

Search before asking

Component

Library/core

Feature

It would be useful to be able define a sequence of transforms that are run one after the other, within a python process. This should be implemented as an AbstractBinaryTransform, probably that takes a list of transform instances. The configuration for this class could either be a single dictionary or a list of dictionaries corresponding 1:1 with the list of transforms. This "pipeline transform" should also be runnable in a runtime like any other transform. This latter may mean a single config dict is used to initialize the "pipeline transform".

Are you willing to submit a PR?

blublinsky commented 4 months ago

I am not convinced that this is the right thing to do. It will only work if all transforms in the sequence have comparable dependencies, which is not a save assumption. It might easily lead to library conflicts, which are hard to debug. We intentionally are running different transforms in different Python processes to avoid this issue

daw3rd commented 1 month ago

Start of formal design discussion, given recent interest in this requirement. The basic requirement is as follows then:

  1. Ability to have a series of transforms called in sequence on an in-memory object, independent of run-time.

The base AbstractBinaryTransform (and AbstractTableTRansform) already work on in-memory objects, so I believe we can use these frameworks to implement a transform that calls a list of transforms, passing outputs to inputs as in-memory objects. This same transform could then also be run as any other transform in any of our run-times to operate on on-disc data to create new output files.

Some complications/considerations

  1. Creation of the transform in its most simple programmatic form. For example, for code2parquet->noop :
    c2p = Code2ParquetTransform({ "domain" : "foo"})
    noop = NoopTransform({"sleep": 1 })
    p = Pipeline([c2p, noop])

    However, to support CLI configuration for the run-times, a dictionary should be supported.
    Maybe a nested/structured JSON/AST needs to be used to specify the configuration of the transforms.

    { 
    "code2parquet" :     { "domain": "foo" }, 
    "noop" : { "sleep": 1 }
    }

    Then the pipeline cli would be --pipeline_transforms "{ "code2parquet": ...}". How would we map from transform string references (i.e. "code2parquet) to a python class. For example, in the above, using "code2parquet" would require a registry of mappings of strings to transform classes. Are there alternatives? Maybe the full python class name?

  2. How to handle flush(). Probably not too bad. flush() returns N items. For N > 0, call the next transform's transform() method with each item() and then its flush() method. Results of the flush() are handled in the same way for the next transform(s) in the sequence.
  3. Some special handling may be needed to make sure that compatible transforms are in the list. For example, an AbstractTableTransform (ATT) followed by an AbstractBinaryTransform (ABT) would first need to convert the ATT's output arrow Table back to a byte arrays before passing to the ABT. Or we could just treat everything as an ABT, but that could be inefficient if all transforms are ATTs.
blublinsky commented 1 month ago

I would rather continue with the launcher, etc.

  1. We do not need anything special to support configuration. Transforms already have unique prefixes, so there is no need for nesting, etc.
  2. Fully agree with flush
  3. There is no need for special handling - all transform return byte arrays

There are certain limitations on the pipelining. From the point of view of the runtime, the pipeline has to be seen as a single binary transform to ensure that all of the transform's invocations are happening on the same thread