apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.03k stars 14.28k forks source link

Support External Dataset Updates #29162

Open michaelmicheal opened 1 year ago

michaelmicheal commented 1 year ago

Description

To increase the extensibility and integration capabilities of Datasets, I think we should

  1. Add a register_external_dataset_change(self, dataset: Dataset, extra=None, session: Session) method in DatasetManager. This would allow for the updating a dataset without a task_instance, which is necessary for updating datasets across Airflow instances.
    def register_external_dataset_change(
        self, *, dataset: Dataset, session: Session, extra=extra, **kwargs
    ) -> None:
        """
        For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
        the dataset event
        """
        dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
        if not dataset_model:
            self.log.info("DatasetModel %s not found", dataset_model)
            return
        session.add(
            DatasetEvent(
                dataset_id=dataset_model.id,
                extra=extra,
            )
        )
        if dataset_model.consuming_dags:
            self._queue_dagruns(dataset_model, session)
        session.flush()

    Alternatively, task_instance could be an optional parameter in the default register_dataset_change method

  2. Add update_dataset endpoint. This endpoint would call the register_external_dataset_change method and register a dataset change without a task_instance or dag_id
  3. Formalize the concept of an "external" dataset update and possibly even add a parameter in the Dataset definition to indicate whether or it should accept external dataset updates. This would allow for the external triggering nature of a particular Dataset to be displayed in the Datasets graph in the UI.

Use case/motivation

This year we moved to a multi-instance Airflow architecture, where we deploy multiple instances of Airflow in production. With Datasets, each instance of Airflow has it's own set of datasets, but we need to manage dataset dependencies across instances.

We've taken advantage of the great extensibility of the configurable DatasetManager (kudos to whoever designed that) by overriding the register_dataset_change method to broadcast the DatasetEvent to each instance.

class CustomDatasetManager(DatasetManager):
    def register_dataset_change(self, *, task_instance: TaskInstance, dataset: Dataset, extra=None, session: Session, **kwargs) -> None:
        # Record the dataset event and trigger DAGs in the local instance
        super().register_dataset_change(task_instance=task_instance, dataset=dataset, extra=extra, session=session, **kwargs)

        # Send a request to the other instances to trigger DAGs that depend on the dataset
        for instance_url in instance_urls:
            url = f"{instance_url}/api/v1/shopify/admin/custom_endpoint"
            # execute request

To make this work, we add a custom endpoint for registering a dataset change register_external_dataset_change. Since a separate Airflow instance doesn't know about the DAG or task_instance outlet that triggered the updating of the dataset, our endpoint calls a custom external change method that we added to our custom DatasetManager

This works because DatasetEvent has the dag and task_instance related info nullable.

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

jalajk24 commented 1 year ago

Can I work on this? I am pretty new to open source

michaelmicheal commented 1 year ago

@jalajk24 Before working on it, I want to see whether or not @potiuk and others think of this feature, to see whether there is appetite for this change. Once we have a game plan, I'm happy to collaborate on this. How Datasets evolve upstream is pretty important to my team, so I'd like to be pretty involved though.

michaelmicheal commented 1 year ago

Any thoughts on this @potiuk, @dstandish, @ashb?

MatrixManAtYrService commented 1 year ago

It would be pretty handy to have this.

I'd like to use it to "pass the baton" from a CI/CD pipeline into Airflow. Like:

data which depends on the newly updated service should be recalculated so that it reflects the new logic

I suppose I could have the external pipeline use the rest API to manually trigger the requisite dags, but I like the loose coupling that comes with updating a dataset and leaving it to the dag authors to consume it or not.

cmarteepants commented 1 year ago

Yes! I love the idea of formalizing the concept of external datasets, and having a proper (extensible) endpoint. I have come across some custom solutions to work around this, it would be great for something official

dstandish commented 1 year ago

@jalajk24 Can I work on this? I am pretty new to open source

Yes you can!

@michaelmicheal @jalajk24 Before working on it, I want to see whether or not @potiuk and others think of this feature, to see whether there is appetite for this change. Once we have a game plan, I'm happy to collaborate on this. How Datasets evolve upstream is pretty important to my team, so I'd like to be pretty involved though.

Yeah I think that adding this feature, generally speaking, is pretty uncontroversial. The question is just what it should look like.

If you are interested and want to work on it, then of course one option is to simply make a PR. But it might be better (and this is what i'd recommend) to form a group, perhaps call it a special interest group (we've done it before), for dataset development. Then you can have occasional meetings to discuss and formulate a plan. If it starts to feel AIP-worthy, you can make an AIP. If not, you can just proceed with development. I would be interested in participating in such a group.

Thanks for your interest.

dstandish commented 1 year ago

I should add... re special interest group... how to do... well 1 you could ask in dev channel on community slack that a channel sig-datasets be created or perhaps sig-event-driven or something.

And / or you could just start a thread in dev channel, figure out a time to meet with small group of interested folks, then plan to meet. Then, optionally, advertise meeting on dev mailing list. Then, post-meeting, can share meeting notes on dev list so community is informed.

Something like this would be a reasonable way to approach an issue like this, where the exact solution may not be so easy to design and it would benefit from collaboration.

Additionally, with respect to the concern where, you don't want to put effort in if it will not be accepted, doing things a bit more formally and methodically like this helps there. Because this way, before you do any dev work, you get community buy in (or at least some ability to say "you snooze you lose" down the road).

dstandish commented 1 year ago

I see there was already on the PR. Looks like there were a few points of uncertainty that need to be sorted out.

michaelmicheal commented 1 year ago

For context, I was working on a PR to ensure that an dataset endpoint would align with my team's multi-instance architecture. We no longer planned to use our in-house solution, so I became a lot less opinionated on it, and it seemed like there were a lot of strong opinions, so it became less of a priority for me. Happy to be part of a discussion if it's in AIP territory

edumuellerFSL commented 10 months ago

Hi, we use Airflow in our team and this is something we also feel is currently missing in airflow, especially to facilitate cross-instance dataset awareness and for custom integrations.

I did raise a related issue since I plan to start working on this: #36686