bddap / dagyo

User-specified dataflow protocol and scheduler.
Apache License 2.0
4 stars 0 forks source link

Definitions

Dagyo

Dagyo is a distributed protocol for runing user-constructed data flow trees.

Data Flow Tree

A DAG (Directed Acyclic Graph) where vertices are dagyo programs and edges data streams.

Dagyo Progdef (Program Definition)

Two peices of data define a dagyo program:

Dagyo Flow

A running data flow tree. Resources allocated to the flow include Daggo Executors and job

Dagyo Job

A runnable instance of a progdef. Dagyo Job are serialized and stored in a queue that Executors pull from. A job describes where to pull inputs, and where to push outputs.

In addition to the custom outputs specified by a job's Progdef. Jobs are provided with two additional outputs:

In addition to the custom inputs specified by a job's Progdef. Jobs are provided with one additional input:

Dagyo Executor

A runner of Dagyo Jobs. Each executor services exactly one progdef, but may run multiple Jobs at a time, depending on the progdef's specified scaling behavior.

Dagyo Sheduler

A program that takes data flow trees and spawns dagyo workers to execute them. The sheduler is responsible for:

Typename

A string representing the data that is carried over a stream. Every typename defines its own serialization. An edge from one progdef's output and another progdef's input is valid if and only if the two typenames are equal.

Each typename should have human readable documentation which describes the type and serialization. Protobufs for defining types could be an execllent fit.

Each type needs to be convertable to and from an array of octets.

Implementing Custom Dagyo Executors

Executors are defined using Docker, and described using a JSON file.

When a Executor starts up, it reads the DAGYO_QUEUE environment variable and connects to a message queue at that address. The DAGYO_JOBS environment variable tells the executor which mailbox to pull jobs from.

When an deserialization error occurs, the dagyo executor should fail the job immediately by pushing to the failure queue.

Msc.

https://blog.containerize.com/2021/07/09/top-5-open-source-message-queue-software-in-2021/

Why not X tool?

Benthos: we need dynamic, user-defined data flow trees, not static pipelines.

Apache Beam: we need dynamic, user-defined data flow trees, not static pipelines.

Apache Kafka: we need dynamic, user-defined data flow trees, not static pipelines. We don't need durability.

Apache Flink: we need dynamic, user-defined data flow trees, not static pipelines.

Apache Airflow: Airflow is too expressive. It would grant the author of a data flow tree ability to run arbitrary code. Airflow is not itended for streaming.

Why not use aws lambda to run executors? Timeouts. No support for streaming. No support for GPU acceleration. No support for binary serialization. Extra vendor lock-in.

Kubeflow claims to be specifically for ML but maybe is general enough to use in place of dagyo or as a layer under dagyo. Warrants further investigation. Some things to check:

what sort of message broker do we need?

RabbitMQ has nice tutorials: https://www.rabbitmq.com/getstarted.html

Planned Upgrade Strategy:

Later we can add lifecycle hooks to the container definitions that tell progdefs to stop taking new jobs then we'll allocate a large termination grace period, on the order of days, to allow any in-progress jobs to complete before the pod shuts down.

This method should handle auto-scaledowns too. When a pod is being scaled down, the signall will tell it to stop taking new jobs due to the large termination grace period, the pod will be allowed to finish any in-progress jobs before it shuts down.

After an upgrade, how do we ensure jobs in the out-of-date job queues are all processed?

Plan for secrets management

We'll define a map from Progdef name to a enviroment dictionary. That list will go into the clusters configmap. We'll then set each container's env_from setting to pull from configmap.

Compute Resource Requirements

Progdef metadata will eventually include a field for compute resource requirements. We'll use that to set the resources field on the container definition.

Plan for Failure Tolerace

Failures, AKA "Panics", are aggregated for an entire flow. If one vert fails, the entire flow fails. This leave the decision of what to do next to the user. Progdefs should be designed to minimize the chance of flow-stopping failures. Here are some tips for reducing Panics.

Progdefs that perform IO should use retries when appicable and helpful.

Effective Progdefs with recoverable failure modes should have with explicit failure modes as part of the defined interface. For example, when a transformation on some data might fail, the output type should be Result, not T. This output can be piped into, for exaple, an assert vert, or an ignore_errors vert or a log_errors vert.

Another potential pattern could be to define an explicit "error" output stream for the progdef. This may be a better fit for some circumstances, such as when an progdef has multiple outputs and it would be unclear on which stream to report the error.

Of course, when there is an actual show-stopping failure, the flow should fail. An example of a show-stopping failure is when a progdef with a defined interface recieves a value that fails to deserialize. Deserialization errors on specified types should be treated as panic-worthy bugs.

Pod Preemption

https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/

Flows are not recoverable so we never want to kill an executor while it's running a job. We will be setting a high value for the gracefully termination period of executor pods, so pre-emption would not work against these pods in the first place.

Recovery

Once popped from the queue, the state of a dagyo job exist in only one place: the memory of the executor that is processing the job. This is an intentional design choice.

The ordering of elements within dagyo streams is respected. Additionally dagyo ensures that not job is processed twice. Each job is processed by at most one executor. Executors' statefullnes is respected; dagyo will never swap out an executor mid-job.

In an alternate universe. Resumable executors were implemented by tracking the state each job. This substantially complicated things for progdef writers. Progdefs needed to be be written as state machines with serializable state. This paradigm is sometimes referred to as "Checkpoint and Restore". While a nice set of abstractions might have made Checkpoint and Restore less of a burden for Progdef implementers, the version of daggo existing in our universe chooses not to spend any strangeness budget on the issue.

See also: section 3.4 "Fault tolerance and availability" of Naiad for more thoughts on "Checkpoint and Restore".

This design choice is intentional, but you are welcome to get in touch if you have ideas for implementing recoverable jobs. There may be a way to do this without putting too much burden on progdef implementers.

Misc Things Dagyo Needs

License

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.