dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.72k stars 1.48k forks source link

Describe best practices for advanced usage in docs overhaul #2002

Closed PhilipGarnero closed 3 years ago

PhilipGarnero commented 4 years ago

Hi there,

We're currently using airflow to handle our ETL pipelines but we're not really satisfied as it is not really stable, have performance issues and has not been designed to handle the kind of dags we're lauching.

We're probably going to switch to dagster to try and solve these problems.

However, I've read most of the docs but I'm still unclear on a number of usages we have that are not mentioned :

  1. Is building dynamic dags supported ? Let's say that I have a dag that must run daily for each client we have in the DB. Currently, we have an API that returns a list of clients and we use it in order to build our dags in airflow (with a for loop).
  2. Most of our ETL code is very specific to some of our microservices (because it encapsulates a lot of business logic). This means that our airflow must do some kind of remote code execution. We were using SSHOperator at first and then switched to KubernetesPodOperator for better stability and scalability in order to execute code directly from the microservices images themselves. How can I achieve something similar using dagster ?
  3. Subdags and large dags in airflow are buggy. Is there some kind of limitation regarding pipeline sizes or composite solid depth/size ?
  4. We have to handle task priority in order to prioritise high potential clients over smaller ones. Previously, we could do this using airflow's use of pool, queues and priority weight. Is there a similar mechanism featured in dagster ?
  5. Is there a way to trigger dags in dagster ? Apparently this could be done using the graphql api but I haven't seen docs about this anywhere.

I know these are a lot of questions but we're excited to try out dagster ! Thanks !

schrockn commented 4 years ago

These are great questions. Thanks for engaging with the project so thoughtfully.

  1. Dynamic dags: Currently our DAGs are static. We do this so that we can visualize them before computation easily. However, in order to solve the problem you might not need a fully dynamic DAG. You just need to run the dag N times, once per client, and pass the client through config. This is a use case we currently working on via an officially supported partitioning API. With the partitioning API, you will be able to define a set of partitions (in your case, a list of clients), and use it to create a Schedule that will create a run a single partition every day. We are considering enabling the ability to create runs for N partitions every day, which would solve your use case. In the end this would mean that each DAG run would operate on 1 of your client’s data, but it would be run N times every day. This also means that they would execute independently, which actually could be useful on a few dimensions, including altering priority based on client, resource attribution, etc.

  2. Solids do arbitrary computation, which can include invoking remote code. We are currently building infrastructure to support this. Look for the first example of this in our dagster-pyspark module, which, based on configuration, allows one to execute on a local spark instance or on a remote EMR instance, leveraging our resource system to do so. You might consider a similar strategy, leveraging the Python Kubernetes library within the body of a solid to invoke execution of a Kubernetes Pod via the Kubernetes Job abstraction.

  3. We have no limits to pipeline size or composite nesting depth beyond practical ones, and supporting large-scale pipelines via composition is a first-class goal of the system. In master, we have recently done substantial work to improve our handling of large DAGs, and this will be made available in a release shortly. Our recent work has focused on both raw UI performance, as well as being able to visualize subsets of the DAG using a dynamic expression language similar to what dbt provides. For example, if you only want to display all the predecessor solids of a particular solid foo, just type “*foo” in the bottom selector.

  4. From Dagster’s perspective, this is execution-engine-specific. We are actively developing a Celery executor that implements, pooling, queueing, and priority. Basic versions of these will be in master within a matter of weeks (would be sooner if not for the holidays) and well-supported by the end of January.

  5. We can trigger pipelines based on a Schedule via our ScheduleDefinition abstraction. We currently provide a lightweight wrapper around Cron. We do not officially support sensors or anything similar as of yet. For those you would have to manage that yourself and invoke the run via our GraphQL API. We have an API around invoking that. Example usage here https://dagster.phacility.com/source/dagster/browse/master/python_modules/dagster/dagster/cli/pipeline.py$495-506 in our upcoming backfill support.

Please sign on to the Dagster Slack to say hi. We’d love to collaborate on this with you.

PhilipGarnero commented 4 years ago

Thanks for taking the time to answer our questions ! If we have more, we'll be asking them on the slack.