msnidal / dagster-stitch

Stitch ETL integration for Dagster
Apache License 2.0
0 stars 0 forks source link

dagster-stitch

PyPI PyPI - Python Version PyPI - License Code Style - Black

This library provides a Dagster integration for Stitch, a managed batch data ingestion service similar to Fivetran and Airbyte.

Disclaimer

Please note this library is under active development and should be used cautiously! Currently it supports triggering replication jobs and materializing the resulting tables as Dagster assets that can be used in downstream jobs. Reconciliation and other features are not yet included but would be great additions.

Installation

To install the library, run:

$ pip install dagster-stitch

For development, it can be installed locally and tested with:

$ pip install -e .[lint,test]
$ pytest

Configuration

Setup

To use the library, you must configure a stitch resource in your Dagster instance. The resource requires an api_key to authenticate with Stitch. You can find or generate one in the Stitch UI under Account Settings > API Access Keys.

You will also need to note your Stitch account ID and the ID of the data source you want to replicate to be used in the asset or operation configuration. These can be found by navigating to the data source in the Stitch UI and looking at the URL. For example, if the URL is https://app.stitchdata.com/client/12345/pipeline/v2/sources/67890/summary, then the account ID is 12345 and the data source ID is 67890.

Usage

Here's an example of how to instantiate an asset that will materialize the table_name table from the data_source_id data source:

from dagster import Definitions
from dagster_stitch import stitch_resource, build_stitch_assets

stitch_instance = stitch_resource.configured(
    {
        "api_key": "your_stitch_api_key",
        "account_id": 12345,
    }
)
stitch_assets = build_stitch_assets(
    data_source_id=54321,
    destination_tables=["data_source_name.table_name"]
)

definitions = Definitions(
    assets=stitch_assets,
    resources={"stitch": stitch_instance},
)

Note that you should include your data source name prefixed by a point in your destination_tables and if the table do not correspond to a materialized asset, the materialization will raise an error.