DataCater / datacater

The developer-friendly ETL platform for transforming data in real-time. Based on Apache Kafka® and Kubernetes®.
https://datacater.io
Other
82 stars 4 forks source link

Proposal for a more flexible pipeline specification #14

Closed flippingbits closed 1 year ago

flippingbits commented 1 year ago

At the moment, our pipeline spec is very opinionated and limited with regards to defining filters and transforms. It is not flexible enough to serve for general-purpose streaming data pipelines.

The main limitations of our current spec are:

The current pipeline spec looks as follows (example):

---
spec      :
  filters: []
  transformationSteps:
  -   name: First
      transformations:
      -   attributeName: name
          transformation: trim
      -   attributeName: email
          transformation: trim
          filter: not-empty
      -   attributeName: age
          transformation: add-column
          transformationConfig:
            attributeName: age
            defaultValue: 42

I propose changing our pipeline spec as follows:

Speaking YAML, the spec could look as follows:

---
spec      :
  steps:
  - kind: Field
    fields:
      name:
        transform:
          key: trim
      email:
        transform:
          key: hash
          config:
            algorithm: sha1
        filter:
          key: not-empty
  - kind: Record
    transform: user-defined-record-transformation
    config:
      code: |
        def transform(record):
          return record

Filters and transforms are referenced by their key. They can be configured with the object config. Field-level steps allow to apply filters/transforms to single fields of the record's value while Record-level steps allow to operate on entire records, consisting of a key, a value, and metadata.