move-coop / parsons

A python library of connectors for the progressive community.
Other
255 stars 125 forks source link

DRAFT PROPOSAL: Add Pipelines Framework to Parsons #980

Open Jason94 opened 5 months ago

Jason94 commented 5 months ago

Overview

This PR implements a draft of the pipelines API that we discussed at the January contributors meeting. The goal of the pipelines API is to add an extra level of abstraction on top of the existing Parsons' connectors and Table to make it easier to write and manage ETL pipelines.

Justification

The contributors recently had a discussion about what we can do to take Parsons to the next level to make it even easier to build ETL scripts. Another big goal that would benefit TMC and the whole space significantly is to make it easier for new data staff who don't have a handle on control flow and data structures to assemble ETL scripts.

In my opinion, Parsons connectors already make it very easy to extract and load data. The two things that it does not do much to help the user with are:

During the call we discussed adding a Pipelines system to Parsons. This system would exist on top of the connectors and Parsons Table, and possibly would hide those details from a new user completely. The idea is this:

Based on contributor feedback we identified two distinct user groups with different sets of issues on the original proposal.

New Engineers/Analysts

There was concern that the syntax was still too complex for analysts/engineers (new users) and would prove too big a barrier for them to use the framework successfully. To accommodate those concerns, the following changes were made in revision 2:

Removal of lambda syntax.

The original syntax used lambda functions, a difficult concept, heavily.

Simplification of Pipeline syntax

The previous pipe-chain syntax in the Pipeline constructor was a repeated function call, which would have been confusing to newer users:

    load_after_1975 = Pipeline(
        load_from_csv("deniro.csv")
        (
            print_data("Raw Data")
        )(
            clean_year()
        )(
            filter_rows({
                "Year": lambda year: year > 1975
            })
        )(
            print_data("After 1975")
        )(
            write_csv("after_1975.csv")
        )
    )

This has been replaced with a much simpler syntax where the pipes are just listed in the Pipeline constructor:

    load_after_1975 = Pipeline(
        "Load after 1975",
        load_from_csv("deniro.csv"),
        clean_year(),
        filter_rows("{Year} > 1975"),
        write_csv("after_1975.csv")
    )

Power Users

The main concern for experienced engineers/analysts with a high degree of Python skill (power users) was that the framework didn't offer them enough value for the cost of construction. In particular, data orchestration was wanting here.

[My organization] started to build an orchestration platform to meet the needs of semi-technical members and resolve some of the civis pain points. Long story short, maintaining software is a lot of work and we're hoping to deprecate that eventually... My opinion is that there are really good, well funded, and active open source projects to do those things and nothing super specific about our use case vs. private enterprise

Prefect integration

To meet that need, this revision of Pipelines has incorporated Prefect under the hood with no additional complexity to anyone using the pipeline framework, be they writing pipes or just assembling pipelines.

Each pipe is defined exactly the same as before, but the framework constructs a Prefect task for that pipe behind the scenes.

# This pipe is transformed into a Prefect pipe named "write_csv", but is written like normal Parsons code.

@define_pipe("write_csv")
def write_csv(data: Table, csv_name: str) -> Table:
    data.to_csv(csv_name)
    return data

Each Pipeline is transformed into a Prefect Flow, and is logged in the Prefect Cloud when run:

    load_after_1975 = Pipeline(
        "Load after 1975",
        load_from_csv("deniro.csv"),
        clean_year(),
        filter_rows("{Year} > 1975"),
        write_csv("after_1975.csv")
    )

In addition to providing modern data orchestration out of the box, the Prefect integration will make it possible for us to integrate the extensive Prefect Integrations library with Pipelines.

Revision 2 Demo

This code sets up and runs the same series of pipelines as before:

    clean_year = CompoundPipe(
        filter_rows("{Year} is not None"),
        convert("Year", int)
    )

    load_after_1975 = Pipeline(
        "Load after 1975",
        load_from_csv("deniro.csv"),
        clean_year(),
        filter_rows("{Year} > 1975"),
        write_csv("after_1975.csv")
    )
    split_on_1980 = Pipeline(
        "Split on 1980",
        load_from_csv("deniro.csv"),
        clean_year(),
        split_data("'gte_1980' if {Year} >= 1980 else 'lt_1980'"),
        for_streams({
            "lt_1980": write_csv("before_1980.csv"),
            "gte_1980": write_csv("after_1979.csv")
        })
    )

    save_lotr_books = Pipeline(
        "Save LOTR Books",
        load_lotr_books_from_api(),
        write_csv("lotr_books.csv")
    )

    after_1990_and_all_time = Pipeline(
        "Copy into streams test",
        load_from_csv("deniro.csv"),
        clean_year(),
        copy_data_into_streams("0", "1"),
        for_streams({
            "0": CompoundPipe(
                filter_rows("{Year} > 1990"),
                write_csv("after_1990.csv")
            )(),
            "1": write_csv("all_years.csv")
        })
    )

    dashboard = Dashboard(
        load_after_1975,
        split_on_1980,
        save_lotr_books,
        after_1990_and_all_time,
    )
    dashboard.run()

Here are the runs on my Prefect cloud image

The declarative Pipeline syntax is easy to comprehend next to the Prefect viewer makes it easy to see the data flow image

Prefect error handling shows you which pipe failed, what the error was, and even what time it failed. image


Original Draft - Revision 1

Prototype

This prototype is in the /pipelines folder in the PR branch. All of the code is contained in the parsons_pipelines.py file, which is a runnable file that executes three pipelines based on a stored CSV file and the open-source Lord of the Rings API.

Here are some highlights:

Declarative pipelines syntax, made by composing pipes.

    load_after_1975 = Pipeline(
        load_from_csv("deniro.csv")
        (
            print_data("Raw Data")
        )(
            clean_year()
        )(
            filter_rows({
                "Year": lambda year: year > 1975
            })
        )(
            print_data("After 1975")
        )(
            write_csv("after_1975.csv")
        )
    )
    split_on_1980 = Pipeline(
        load_from_csv("deniro.csv")
        (
            print_data("Raw Data")
        )(
            clean_year()
        )(
            split_data(lambda row: 1 if row["Year"] >= 1980 else 0)
        )(
            all_streams(print_data("Split Data"))
        )(
            for_streams({
                0: write_csv("before_1980.csv"),
                1: write_csv("after_1979.csv")
            })
        )
    )

Pipe definitions are normal Parsons' code

@define_pipe("convert")
def convert(data: Table, *args, **kwargs) -> Table:
    return data.convert_column(*args, **kwargs)

@define_pipe("write_csv")
def write_csv(data: Table, csv_name: str) -> Table:
    data.to_csv(csv_name)
    return data

Load data in via pipes, either using Parsons or requests

@define_pipe("load_from_csv", input_type=PipeResult.Unit)
def load_from_csv(filename: str, **kwargs) -> Table:
    return Table.from_csv(filename, **kwargs)

@define_pipe("load_lotr_books", input_type=PipeResult.Unit)
def load_lotr_books_from_api() -> Table:
    # Set up the endpoint and headers
    url = "https://the-one-api.dev/v2/book"
    headers = {}

    # Make the request to the API
    response = requests.get(url, headers=headers)
    response.raise_for_status()  # Raises an HTTPError if the response was an error

    # Convert the JSON response into a Parsons Table
    books_json = response.json().get("docs", [])
    books_table = Table(books_json)

    return books_table

Trivially compose & name commonly-combined pipes

    clean_year = lambda: (
        filter_rows({
            "Year": lambda year: year is not None
        })
    )(
        convert(
            "Year",
            lambda year_str: int(year_str)
        )
    )

Group pipelines together in a Dashboard to facilitate logging, reporting, etc

    dashboard = Dashboard(
        load_after_1975,
        split_on_1980,
        save_lotr_books
    )
    dashboard.run()

Call Dashboard with a logger to get free logging of the output of every step in every pipeline

    dashboard = Dashboard(
        load_after_1975,
        split_on_1980, save_lotr_books,
        logger=CsvLogger()
    )
    dashboard.run()

logging

Generate an HTML report with your pipelines' results

    dashboard.run()
    dashboard.generate_report("report.html")

pipeline_report

Individual pipelines can be run to retrieve their data and captured as a Parsons table

    save_lotr_books_data: Table = Pipeline(
        load_lotr_books_from_api()
        (
            write_csv("lotr_books.csv")
        )
    ).run()
    save_lotr_books_data.to_bigquery(...)

Next Steps

If the Parsons contributors decide to move forward with the proposal, I believe these features are necessary to implement an MVP of the pipelines framework:

These features would be nice to implement, but I don't believe they need to be completed to justify releasing the framework:

austinweisgrau commented 5 months ago

My 2 cents on transformations - in general I think folks should be discouraged from doing transformations on data with python. Doing transformations in a database is going to be preferred for maintainability, clarity, and performance most of the time.

I could be wrong here but also my expectation would be that most Parsons users will be more familiar with SQL than with python, especially if the transformations will need to be formulated as some kind of lambda function map as in these examples, I imagine SQL is going to be significantly more accessible.

shaunagm commented 4 months ago

Finally taking a look (sorry for the delay). We can chat about these questions at the contributor meeting: