lisad / phaser

The missing layer for complex data batch integration pipelines
MIT License
4 stars 1 forks source link
data data-integration etl etl-pipeline

phaser

A library to simplify automated batch-oriented complex data integration pipelines, by organizing steps and column definitions into phases, and offering utilities for transforms, sorting, validating and viewing changes to data.

Goals and Scope

This library is designed to help developers run a series of steps on batch-oriented, record-oriented, un-indexed data. A batch of record-oriented data is a set of records that are intended to be processed together, in which each record has more or less the same fields and those fields are the same type across records. Often record-oriented data can be expressed in CSV files, where the first line contains the column names.

Record-oriented data can be stored or expressed in various formats and objects including:

In this project, record consistency is somewhat forgiving. The library does not insist that each record must have a value for every column. Some records may not have some fields, i.e. 'sparse' data. Sparse data may sometimes be represented in a format that isn't columnar (a JSON format might easily contain records in which only fields with values are listed). Sparse record-oriented data should be trivial to handle in this library, although by default checkpoint data will be saved in a columnar CSV that shows all the null values.

The goals of Phaser are to offer an opinionated framework for complex data pipelines with a structure that

The mechanisms that we think will help phaser meet these goals:

Simple example

from phaser import Phase, Column, FloatColumn, Pipeline

class Validator(Phase):
    columns = [
        Column(name="Employee ID", rename="employeeNumber"),
        Column(name="First name", rename="firstName"),
        Column(name="Last name", rename="lastName", blank=False),
        FloatColumn(name="Pay rate", min_value=0.01, rename="payRate", required=True),
        Column(name="Pay type",
               rename="payType",
               allowed_values=["hourly", "salary", "exception hourly", "monthly", "weekly", "daily"],
               on_error=Pipeline.ON_ERROR_DROP_ROW,
               save=False),
        Column(name="Pay period", rename="paidPer")
    ]
    steps = [
        drop_rows_with_no_id_and_not_employed,
        check_unique("Employee ID")
    ]

class Transformer(Phase):
    columns = [
        FloatColumn(name='Pay rate'),
        FloatColumn(name="bonusAmount")
    ]
    steps = [
        combine_full_name,
        calculate_annual_salary,
        calculate_bonus_percent
    ]

class EmployeeReviewPipeline(Pipeline):

    phases = [Validator, Transformer]

The example above defines a validation phase that renames a number of columns and defines their values, a transformer phase that performs calculations, and a pipeline that combines both phases. The full example can be found in the tests directory of the project, including the sample data and the custom steps defined.

The benefit of even such a simple pipeline expressed as two phases is that the phases can be debugged, tested and run separately. A developer can run the Validator phase once then work on adding features to the Transformer phase, or narrow down an error in production by comparing the checkpoint output of each phase. In addition, the code is readable and supports team collaboration.

Advanced Example

For a real, working advanced example, see the phaser-example repository on GitHub. You should be able to clone that repository, fetch the Boston and Seattle bike trail bike sensor data, and run the pipelines on the source data to get the data in a consistent format.

The pipelines in the phaser-example project demonstrate these features: