thieman / dagobah

Simple DAG-based job scheduler in Python
Do What The F*ck You Want To Public License
755 stars 160 forks source link

Run Jobs inside Jobs #60

Open thieman opened 10 years ago

thieman commented 10 years ago

It would be cool to be able to run a Job inside of another Job (basically like it's a Task). For those of you not familiar with the insane jargon, I'm talking about a DAG that could look like, for a simple example:

Job 1: Task 1 -> Job 2 -> Task 2

Job 2: Task 3 -> Task 4

Job 1 would complete Task 1, start Job 2, monitor its completion status, and if it finishes successfully then proceed on to Task 2.

I think the first step is to make Jobs either scheduled or able to be called from inside other Jobs. There will be a lot of edge cases to argue about along the way.

rclough commented 10 years ago

I've corresponded privately on this issue, but would like to note that this is a priority issue for me and my team, and I intend on working on the implementation.

Some thoughts:

I think personally I prefer notifications over polling, but am always open to ideas in the other direction.

rclough commented 10 years ago

Another edge case: Failed tasks. If a task fails, and that task was a job reference, then "Start jobs from failed tasks" needs to drill down into the child job, and trigger "start job from failed task" on the child job (theoretically many levels deep)

rclough commented 10 years ago

Other cool idea (non essential) but double clicking a Job task and seeing its steps (even status) or at least double clicking and opening that job page.

Which brings up another idea of potentially instead of literally calling the job, just inheriting the tasks and running them "natively" within the single job. Complexities I can think of are command logging and failed task handling. Not sure if I'm into it, I'm just brain storming.

thieman commented 10 years ago

DAG cycle detection sounds like a fun problem. Needs to be able to support the same Job being called multiple times within the main Job (as long as it doesn't create a cycle) but also needs to not get stuck in an infinite loop in DAG resolution. Certainly achievable.

I think the idea of inheriting the Tasks of each child job is definitely the simplest approach. This allows us to sidestep a lot of the hard problems with managing child Jobs as first-class citizens.

So, how about this for a simple version: when a Job is started from the beginning, it snapshots the current Task config of all its child Jobs. It inlines these tasks for purposes of its own DAG validation and execution. Changes to the child Job after this initial snapshot will not be propagated up to the parent Job until it is started from scratch again. All Job configuration like email settings comes only from the parent Job.

Some implementation details:

I think this is a pretty simple approach and avoids a lot of intricate coupling between parents and children. The hard work will probably be mostly in the UI and in tweaking reporting (probably want to expand the child job if there's a failure but keep it condensed if everything worked). Thoughts?

rclough commented 10 years ago

Sounds like a fair idea for implementation to me.

General thoughts

Implementation

When it comes to the overall runtime, does this sound correct?

  1. Start Job is pressed on "Job 1"
  2. Job 1 creates the DAG, generating snapshots of each job at that time.
  3. Job runs to completion (expansion on failure, bot otherwise follows normal job workflow)
  4. Snapshots are destroyed (so if by next run, latest version of the job may be used)

Some edge cases: Job with job-references fails, and then is scheduled to run again before failure is resolved. How is this case currently handled? If it is to run again, do we re-generate the snapshots?

Cycle Detection

I was thinking about cycle detection, and realized that blind inheritance of job references could just lead to us adding tasks ad-infinitum, for example:

To mitigate this, we need a cycle detection algo that does not require the graph to be "pre built".

My idea is you could do a simple cycle detection algorithm skipping anything that's not a job reference (Think of this as cycle detection on a meta-graph comprised of just job references).

Basically we modify something like DFS or whatever and the "get next node" or "get child nodes" basically just works by getting the nearest job references (or null, if none). I'd need to flesh this out a bit more but I think it can be done without too much headache with existing algos. It could help if I came up with some examples too, to visualize it.

thieman commented 10 years ago

The logging question is legitimate but we can deal with it when we get there. The implementation you laid out here lines up with what I was thinking. The edge case should follow the current behavior for when a Job hits its scheduled start time:

If Job is waiting or failed => start Job from scratch. This would cause a new snapshot. If Job is running => Do nothing

I've got an idea for cycle detection and will try to flush it out and post back here soon. Might be similar to what you're thinking of.

thieman commented 10 years ago

Okay, this is how I think the new cycle detection could work.

You start with a Job which is a DAG composed of Tasks and other Jobs. You also begin with an empty set which I will call the context. The context is the set of all Jobs that you are "inside" at any given point. Trying to expand a Job that is already in the context indicates that you are in a cycle.

  1. Add the current Job's ID to the context. When you start out, this means the parent Job.
  2. Perform a topological sort on the current DAG without expanding any child Jobs. This verifies that the current Job is acyclic before expansion.
  3. Traverse the nodes in topological order. For each node that is a Job, do the following:
    1. If this Job is in the current context, fail.
    2. Run this algorithm again on this node, passing the current context.

I'll try to illustrate the beginning of this algorithm on a simple Job with two child Jobs here.

We have three Jobs. Job 1 will be the parent. Job 1 calls Jobs 2 and 3, Job 2 calls Job 3. These DAGs are all fine. Let's prove it.

We start with Job 1. First, we add Job 1 to the context. This means that anything that tries to call Job 1 inside of Job 1 will result in a cycle. We then topologically sort Job 1 to ensure that there are no cycles in the nodes themselves before expansion. This checks out, so we then start traversing the nodes. When we hit Job 2, we expand it and call the validation algorithm on the expanded nodes with the starting context of [Job 1].

We now run the algorithm again using the expansion of Job 2. We add Job 2 to the context, giving us [Job 1, Job 2]. Any calls to these Jobs inside this block will result in a cycle. We topologically sort without expanding Job 3 to convince ourselves that the nodes themselves are kosher. We then traverse the nodes again until we get to Job 3.

We repeat the same process here, and our context ends up being [Job 1, Job 2, Job 3]. If any of the three jobs are called in here, we have a cycle. We then sort and traverse. Since Job 3 calls no other Jobs, we end up exiting successfully and crawl back up the stack of the original DAG.

The next step is to expand Job 3 from the outermost scope. Just like when we called Job 2 from the outermost scope, the starting context we pass to that call of the algorithm is just [Job 1].

Did any of that make sense? I always feel I have to admit a degree of defeat when I am forced to take pictures to explain a point. :smile:

thieman commented 10 years ago

Oh and of course you can come up with your own approach as long as it meets the same requirements, this is just a suggestion. The one thing that might be missing from your idea (as I was thinking of it, anyway) might be detection of non-Job-related cycles inside of child Jobs. The parent Job cannot trust that the child Jobs are acyclic by themselves and must verify it.

rclough commented 10 years ago

I think this is pretty much the way I was thinking of doing it, but verbalized a bit better (and visualized! :+1: ). I don't think pictures mean defeat, theyre a great way to visualize the problem and see any potential issues :smile:

I didn't have non-job related cycles because I assumed that there was some sort of cycle detection in place that would prevent a cyclic job from being saved, in which case it would be safe to assume child jobs are acyclic. But if that's not the case, adding a cycle check on child jobs is a trivial addition.

The only thing that's not quite covered in this is when you have to explore the next branch, ie how do you know when to remove a job from the context because you're now exploring a different path of the graph? I assume there's some level of magic you can do with the topological sort to figure this out?

thieman commented 10 years ago

It is currently possible to save a Job with a cycle, but a cyclic Job will refuse to start. I originally did this so you wouldn't get errors while performing a bunch of add/delete edge operations to the DAG. This could be reconsidered.

Popping the context isn't actually necessary, you rely on the call stack to do this for you. The calls would look like this, where nodes contains all the nodes and their edges and context is a set of job IDs.

def verify_dag(nodes, context=set()):
    ...

# calls to this function would look like this in my example above
verify_dag(job_1)
    verify_dag(job_2, {job_1_id})
        verify_dag(job_3, {job_1_id, job_2_id}) # when this returns, go all the way back to outer scope
    verify_dag(job_3, {job_1_id})

The context is just keeping track of all of the parent Jobs of the DAG at any given point. It might be more clear to think of these non-sequentially. The calls above are really just doing this:

  1. Job 1 starts validation.
  2. Job 1 needs to call Job 2 and Job 3 at some point. When it does, those need to know that Job 1 was their parent.
  3. When Job 2 gets called, it calls its own version of Job 3. This version of Job 3 needs to know that Jobs 1 and 2 are its parents.
rclough commented 10 years ago

Ah ha. I see now. Good shit.

As long a job with cycles doesn't start, thats the most important part, but being able to save a job with cycles may be misleading.

thieman commented 10 years ago

I agree that this becomes more confusing with Jobs-in-Jobs. Job 1 refusing to start because it depends on Job 2 which is in a bad state is shitty. I'll add an issue for it.

rclough commented 10 years ago

I'm going to start working on a branch for this. Is there any other prerequisite work that should be done before I start developing this?

thieman commented 10 years ago

I merged the fix for #77, so you should be good to go here.

thieman commented 10 years ago

Another note: feel free to submit small PRs for small components of this system so we can merge them as they come in. I'm fine with having not-quite-used-yet code in master if it helps us have shorter PR feedback cycles.

rclough commented 10 years ago

Dope, will do. I think I'm gonna work on having it available in the backend first, and then tackle wiring it up in the front end, so the back end components could be a separate, if not a few different PRs

thieman commented 10 years ago

Sounds like a plan.

rclough commented 10 years ago

My first step for adding jobs-in-jobs is going to be adjusting/defining the data model, so I was wondering what you thought might be the best way to support a "job" as a "task". I see a few options:

  1. Take the existing data model, and use the existing fields somehow to signify a Job reference. For example, making the "command" prefixed with something to signify that its referencing a job.
  2. Add a column with a job_id that is default null (or non existent in case of mongo). When it is set, this acts both as a signifier that this task is a job, and gives you the ID of that job.
  3. Have a completely new object that extends the Task class

Other considerations: setting start and end times as well as things like task status for tasks that are actually a job will require a special logic, which may warrant a separate class? I'm not sure if the task handles this logic or if the job does it by polling, which might mean a separate class isn't necessary.

Thoughts?

hmmmm

rclough commented 10 years ago

also things like stderr and std out etc

thieman commented 10 years ago

I like adding a job_id column to signify that a Task runs a sub-Job. I also agree that you will probably end up wanting to sub-class the Task class to hold all the logic specific to this case.

rclough commented 10 years ago

I think I will make it a job_name column, because although jobs have IDs, they are not readily available by the dagobah object, as they are stored by name.

rclough commented 9 years ago

so #160 is in, and I'm starting on "expanding" jobs into one giant graph (when the "snapshot" is initialized).

I'm thinking I'll need to add a predecessors(node) to py-dag in order to implement this. This is because, when we expand a JobTask, we need all of the JobTask's ind_nodes to have edges coming from the JobTask's predecessors ("parents", if this were a tree).

Also, I'd want to add another downstream function, downstreams_final(node) to get all of the nodes which ONLY don't have further nodes downstream (I assume downstream_all gets you all the nodes in between the current and the "final" downstreams as well)

I'm working on code now and it's actually fairly complex (like how does one merge the edge and node dictionaries properly? for example) so any thoughts are welcome.

rclough commented 9 years ago

let me revise that downstreams_final idea.

What's actually needed is all nodes in the DAG that have no downstreams. Not sure what a good name for this is, or if theres a canonical name.

rclough commented 9 years ago

Here's my first pass at coding it. This is untested, and will not run because I havent implemented some of the aforementioned functions (I decided to call that last one dag_ends() for the "ends" of a DAG). But you get the gist:

    def expand(self, dag=None):
        """
        This method takes some DAG object, and expands all the underlying
        tasks recursively:
            * Starting with ind_nodes, traverse the graph
            * If the node is expandable:
                * Call expand on the JobTask, save expanded version "Expanded"
                * Connect all parents of this JobTask to the ind_nodes of
                  "Expanded"
                * Get a list of all downstream nodes in "Expanded" that don't
                  have a downstream node, and add edges to all downstream
                  nodes of the JobTask.
            * Delete JobTask from graph
            * Continue traversing from the downstreams of the deleted JobTask
        """
        dag = dag.deepcopy()
        traversal_queue = dag.ind_nodes()
        expanded = []
        while traversal_queue:
            task = traversal_queue.pop()
            if not self.implements_expandable(task) or task in expanded:
                continue

            expanded.append(task)
            expanded_dag = dag.tasks[task].expand()

            # Empty Job expansion
            if not expanded_dag:
                pred = dag.predecessors(task)
                children = dag.downstream(task)
                [dag.add_edge(p, c) for p in pred for c in children]
                dag.delete_task(task)
                continue

            # Prepend all expanded task names with "<jobname>_" to prevent
            # task name conflicts
            [expanded_dag.edit_task(t, {"name": "{0}_{1}".format(task, t)})
             for t in expanded_dag.tasks]

            # Merge node and edge dictionaries (creating 2 unconnected DAGs,
            # in one graph)
            final_dict = default_dict(list)
            for key, value in dag.graph:
                final_dict[key].append[value]
            for key, value in expanded_dag.graph:
                final_dict[key].expand(value)
            dag.graph = final_dict

            # Add edges between predecessors and start nodes
            predecessors = dag.predecessors(task)
            start_nodes = expanded_dag.ind_nodes()
            [dag.add_edge(p, s) for p in predecessors for s in start_nodes]

            # Add edges between the final downstreams and the child nodes
            final_tasks = expanded_dag.dag_ends()
            children = dag.downstream(task)
            [dag.add_edge(f, c) for f in final_tasks for c in children]

            # add children to traversal queue and delete old reference
            traversal_queue.extend(children)
            dag.delete_node(task)

        return dag

EDIT: updated with some slightly cleaner code EDIT 2: corrections

rclough commented 9 years ago

Also this is probably obvious, but I did a breadth-first traversal, not sure if there's any advantage over depth first, but its just hat my brain defaulted to

rclough commented 9 years ago

maybe it should be called end_nodes() which goes nicely with ind_nodes() which are basically the opposite concept (nodes with no incoming edges vs nodes with no outgoing edges)

thieman commented 9 years ago

I'm thinking I'll need to add a predecessors(node) to py-dag in order to implement this. This is because, when we expand a JobTask, we need all of the JobTask's ind_nodes to have edges coming from the JobTask's predecessors ("parents", if this were a tree).

This makes sense to me, if I remember the py-dag structure this should be pretty simple and hopefully O(n).

What's actually needed is all nodes in the DAG that have no downstreams. Not sure what a good name for this is, or if theres a canonical name.

I believe the standard term here is "leaf node." So maybe all_leaves?

Would review the actual code right now but my brain isn't having it, will take a stab at it tomorrow.

rclough commented 9 years ago

yeah, since the edges are stored in a map of Edge->list of edges, it can be done in O(n) simply by traversing the graph keys once

thieman commented 9 years ago

Thoughts after looking at the code:

  1. Does the "prepend expanded tasks with _" thing allow us to have multiple instances of the same JobTask within the Job? Or do we need to add some sort of counter component to allow that?
  2. Does edit_task do any database access or is it just an in-place object mutation?
  3. I don't really understand what the "merge node and edge dictionaries" step is doing or why we need it.
  4. It seems like this could traverse over the same JobTask multiple times if it has many parents, what is accounting for that?
rclough commented 9 years ago
  1. By restriction of dagobah, we cannot have 2 tasks with the same name. So 2 of the same jobtasks would have to have different TASK names. So by prepending the child tasks with the task names, we ensure no name conflicts (mostly)
  2. Good question, I'll need to look into it, but this should be a copy of the dag in any case. good eye though. I'm about to get off work so I'll check soon.
  3. This is adding all of the tasks from the underlying jobtask's job to the current job
  4. I think I need a bit more clarification on this?
rclough commented 9 years ago

RE # 4, I think I get it. The expanded_tasks list prevents re-expanding a task once it has been expanded once

rclough commented 9 years ago

so I realized last week that both the underlying DAG (the job.graph, in practice) and the task list need to be synced if we want to allow in place task expansion, and basically in this implementation, they are combined. I'm working on an actual PR now that will manage both basically, by adding a "snapshot_tasks" list just like we do with a snapshot of the DAG