dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.11k stars 1.39k forks source link

Airtable Integration #2407

Open yuhan opened 4 years ago

yuhan commented 4 years ago
fire commented 3 years ago

Is it possible to prioritize this?

catherinewu commented 3 years ago

Hi @fire great to hear from you! Could you tell us a bit more about your use case and whether there are specific actions you would like to see supported? It seems like we're currently leaning towards adding an airtable resource (for read/write to airtable) and an airtable sensor

fire commented 3 years ago

I was thinking of using airtable as a submission form and then using dagster to process it.

Imagine a form with various fields and attachments. A dagster process will sense the airtable for changes. The dagster process will take the attachment and other fields. Do operations on it and then submit it to a different airtable table so it can be interpreted - as per access permissions.

I am also researching baserow. baserow.io/ It is probably not worth the effort to create the resource, but they're fairly similar in design and the whole system is selfhostable.

helloworld commented 3 years ago

Hey @fire, this would be pretty easy to do on your own with the sensor API. The result is that you will have one dagster pipeline run configured with the record ID for every new Airtable form submission. Once you define the sensor appropriately, Dagster will make sure that you will process each row exactly once.

Here's more info: https://docs.dagster.io/overview/schedules-sensors/sensors#main

Here's what the code would look like. You can just use the python requests library to query the Airtable API and query the Airtable base wherever there is a TODO

from dagster import solid, pipeline, sensor, RunRequest

def get_new_airtable_record_ids(last_record_id=None):
    # TODO: Get all record ids in the table past last_record_id
    return [...]

@sensor(pipeline_name="process_airtable_record_pipeline")
def airtable_sensor(context):
    new_airtable_record_ids = get_new_airtable_record_ids(last_record_id=context.last_run_key)
    for record_id in new_airtable_record_ids:
        yield RunRequest(
            run_key=record_id,
            run_config={"solids": {"read_airtable_record": {"config": {"record_id": record_id}}}},
        )

@solid(config_schema={"record_id": str})
def read_airtable_record(context):
    record_id = context.solid_config["record_id"]
    record = ...  # TODO: Query Airtable API and get record
    return record

@solid
def process_airtable_record(_context, recored):
    # TODO: Do something with the record
    pass

@pipeline
def process_airtable_record_pipeline():
    process_airtable_record(read_airtable_record())