WordPress / openverse

Openverse is a search engine for openly-licensed media. This monorepo includes all application code.
https://openverse.org
MIT License
214 stars 175 forks source link

Separate the batched update DAG into manually and automatically triggered DAGs #4457

Open AetherUnbound opened 3 weeks ago

AetherUnbound commented 3 weeks ago

Description

Currently we have one batched_update DAG which runs all the batched updates: https://airflow.openverse.org/dags/batched_update/grid. This includes both the batched updates that are triggered by the popularity refresh every month, as well as batched update runs that are kicked off by maintainers (e.g. #4091). Since all the DAG runs show as "manually triggered", this can make it hard to see a history of the latter cases that isn't muddied by the regular, automatic batched update runs.

We should split the batched update DAG into two separate DAGs: automated_batched_update and batched_update. automated_batched_update will be the DAG which is triggered by the popularity refresh and any other similar refresh/update processes we may include in the future. batched_update would serve as the entrypoint for running one-off cleanup or update operations which are triggered by maintainers. Note: The DAG names are split this way because we may want to preserve the existing manual-run history for aggregated reporting down the line, and it's easier to filter out the popularity runs from the existing DAG's history than it is to select runs across multiple DAGs.

Both DAGs would be functionally equivalent for the time being, although we may add new functionality to the manual batched update DAG eventually. This means we should be able to convert the batched update DAG definition into a dynamic DAG which produces both DAGs under different names. The popularity refresh DAGs will also need to be updated to trigger against the autiomated_batched_update rather than the current one.

sarayourfriend commented 3 weeks ago

@AetherUnbound just wondering about this, is there a way (and if so, a difference) to inline the tasks from one DAG into another DAG, rather than triggering a remote DAG and waiting for it?

For example, in https://github.com/WordPress/openverse/pull/4429, it almost seems like there could be some way of calling batched_update_dag.function() to get the tasks inlined into the trim_and_deduplicate_tags DAG, rather than remotely triggering it. If it doesn't work because of the batched update DAG's templated param references, if there was an intermediary function that created the tasks, and accepted each of the params as a real argument, that was then called by inlining DAGs with direct arguments, as well as the batched_update_dag with the template strings... would that work, without needing to separate DAGs and to separate the history of popularity refreshes from the DAG that controls them? I don't know if this is actually better or worse, for what the goal of this issue is, and is mostly a curiosity rather than a specific suggestion. If this is possible, then I'm curious of the trade-offs, from your perspective (no need to go into too much depth either way).

Thinking something like this:

def batched_update(select_query, update_query, *etc, **etcetc):
    ...

@dag(...)
def batched_update_dag():
    batched_update(select_query="{{ param.select_query }}", update_query="{{ param.update_query }}", ...etcetc)

@dag(...)
def trim_and_deduplicate_tags():
    for media_type in media_types:
        batched_update(select_query=..., update_query=..., ...etcetc)

etc for popularity referesh

One thing I can see is that it's clearly not as well suited for mapped tasks (like the media type loop in the example).

Is it better practice to have separate DAGs like this, rather than putting the tasks into a "parent" DAG? Having a bunch of isolated DAGs triggered by each other has interesting parallels to the actor model of programming, compared to this inlining which might be analgous to some other kind of composition (subroutines?).

Again, just a curiosity, not trying to make a recommendation about this specific issue, and I can move this somewhere else if you prefer to discuss this elsewhere, just let me know!

AetherUnbound commented 4 days ago

Thanks for asking this @sarayourfriend! There used to be a more canonical way to do this using SubDAGs, but that approach has been deprecated in favor of TaskGroups. You can read more about each in the documentation linked above.

In my mind, what this approach comes down to is how easy it is for us to access the run history in Airflow. With the trim_and_deduplicate_tags example where the tasks are inlined - once we delete the code for that DAG, the DAG itself will still exist (I believe) in the DAG list. We can remove it from the list too, but doing so would make it much harder to see the batched update run if those tasks were inlined into the trim_and_deduplicate_tags DAG itself. Having separate batched_update DAGs which can be triggered by other DAGs means that we can add or remove those other DAGs and their history as much as we like, without having to worry about preserving the history of the batched runs themselves. This is way more advantageous than having (for example) a dozen different DAGs existing in perpetuity, which have each called the batched update tasks inline and thus need to exist in the DAGs list forever. Hope that makes sense!

To answer your general question though, TaskGroups are indeed the way to go for that sort of thing, even if it's not as useful in this case.

sarayourfriend commented 4 days ago

Makes sense, thanks.

I wonder if we are approaching getting a historical record of the runs in a roundabout way (for the specific information we hope to retain and learn from a run history). I know the DAG run history is nice because it just exists automatically and feels implicit, but could the batched update history be recorded elsewhere? If we pushed batched update history to a different location, we wouldn't need to make it so awkward to do manual runs, and it might even be easier to understand the history at a glance (we can give every run a meaningful name, perhaps output interesting information about the number of affected records directly into the history, instead of needing to dig into the logs). It could be a good idea if this history incorporated (or was itself incorporated into) a broader history of transformations to the data model and data set itself.

I also wanted to ask about whether concurrency would need special consideration. Is there any at all? The existing batched update DAG allows 10 concurrent runs. Would these need to be split between the automated and manual dags?

AetherUnbound commented 3 days ago

Your question about an execution log actually came up during the monthly priorities meeting this last month, which is what prompted the creation of this issue:

A number of issues around possible changes to the batched update, including splitting manual & automated runs into separate DAGs, increased reporting for manually run batched updates, a document defining how batched updates should be run, and possibly a document which serves as an “execution log” for batched updates (we might be able to use airflow directly for this, especially if we split the runs out).

Splitting these runs out into two separate DAGs is the easiest way, right now, to separate this information and make it simpler to access going forward. There's a difference between the automated, regular updates to the database (e.g. popularity refresh, maybe dead link ETL down the line) and updates that we perform manually to correct something. Having separation of those not only the way we look back at histories but also in Airflow itself would be useful, hence the creation of separate DAGs. I'm not against also having the manual runs documented somewhere else, too!

The concurrency is a good question too. It's hard to manage concurrency in a simple way because we can't be certain which batched updates might be conflicting unless we force batched updates to operate by provider as well and manage concurrency that way (or similar). Given the regularity of the automated updates, and the irregularity of the manually triggered updates, I think having the same concurrency for each DAG is fine.