JuliaParallel / Dagger.jl

A framework for out-of-core and parallel execution
Other
597 stars 64 forks source link

Using Dagger in an epidemiological analysis: visualising an implied DAG and other examples of usage #512

Open SamuelBrand1 opened 1 month ago

SamuelBrand1 commented 1 month ago

Hi everyone,

I'm implementing an analysis batch for an epidemiological modelling package https://github.com/CDCgov/Rt-without-renewal/tree/main/pipeline .

We're wondering if there is a functionality for graph plotting a DAG after it has been implied (e.g. by using Dagger.@spawn to generate a number of Thunks)? I've noted DaggerWebDash but it would be much easier to parse if there was a complete tutorial example of a workflow and/or some links to example usage in the Julia community.

seabbs commented 1 month ago

To add to this ideally we would be able to visualise the DAG both before and after running it - is that supported?

Slightly unrelated but are you aware of any real world uses cases of relatively complex Dagger workflows? We have had a look but not been very successful. We are particularly interested in modelling our pipeline (and the dependencies between jobs) and being able to abstract parallelization without pipeline changes (so we could switch from local -> Slurm -> something cloud like azure batch) so resources in that direction would be great.

jpsamaroo commented 1 month ago

Hey there! There is definitely a way to do this, but I realize the full example isn't documented. You can do something like this:

using Dagger, GraphViz

# Enable logging, with required logging events for GraphViz
Dagger.enable_logging!(;tasknames=true, taskdeps=true, taskargs=true, taskargmoves=true)

# Run your code that uses `Dagger.@spawn` (or use a `DArray` or `DTable`)
x = Dagger.@spawn 1+1
y = Dagger.@spawn x*2
z = Dagger.@spawn x/3
fetch(z)

# Fetch and show the logs (shows up automatically in Jupyter)
logs = Dagger.fetch_logs!()
Dagger.render_logs(logs, :graphviz)

However, I've noticed that the above example doesn't produce any viewable DAG, because there was an oversight in the dependency calculation that missed some task dependencies - I'm working on fixing this now, along with writing docs for all of this! There are also a number of options for the :graphviz renderer that are passed as keyword arguments - I'll document these shortly. For now, if you do Dagger.render_logs(logs, :graphviz; disconnected=true), you can at least see each individual task as a bubble, but once I get my fixes merged, you'll also be able to see the dependencies between tasks.

SamuelBrand1 commented 1 month ago

Thanks for getting back to us so quickly!

Nice idea to include GraphViz as a dep, look forward to the fix!

jpsamaroo commented 1 month ago

To add to this ideally we would be able to visualise the DAG both before and after running it - is that supported?

Each time you do logs = Dagger.fetch_logs!(), you get the logs generated since the last time either enable_logging!() or fetch_logs!() was called, whichever was more recent. So you can just call fetch_logs!() each time you want to visualize just the latest logs.

You can also see the whole DAG since the start by manually combine logs from multiple fetch_logs!() calls, but it's a bit annoying to do (you have to vcat the Vectors in each entry of the logs dictionary) - I can also add a utility for this.

Slightly unrelated but are you aware of any real world uses cases of relatively complex Dagger workflows?

Depends on what you mean by "complex" - we have an active user who utilizes Dagger for their reinsurance pricing engine (https://www.youtube.com/watch?v=CD7U1IEXWBM), which is reasonably complicated. Our public user base is still small, but I've developed Dagger to support larger applications and use cases. If Dagger fails to work for your use case, it's likely just a bug, and is something I can help investigate and improve!

We are particularly interested in modelling our pipeline (and the dependencies between jobs) and being able to abstract parallelization without pipeline changes (so we could switch from local -> Slurm -> something cloud like azure batch) so resources in that direction would be great.

This is definitely something Dagger can support - local already works (just use Distributed's addprocs for multi-worker/multi-node parallelism). Slurm can work if you can connect up your Julia processes with Distributed.addprocs, as Dagger will then use them automatically. It's probably the same situation for Azure Batch, although I've not used it before. MPI-based Slurm support is WIP, and may or may not be convenient for all kinds of arbitrary workflows, but it's still something I want to improve.

seabbs commented 1 month ago

I can also add a utility for this.

That sounds like it would be really handy (@SamuelBrand1 is good at Julia but I have no idea so any help is useful).

we have an active user who utilizes Dagger for their reinsurance pricing engine

Thanks for this will check it out. Something I have in packages that I think is useful is some kind of list of known uses - something like that in Dagger.jl might be handy if not present?

This is definitely something Dagger can support

We are really excited about this part of the functionality!

jpsamaroo commented 1 month ago

Docstrings and fixes are in https://github.com/JuliaParallel/Dagger.jl/pull/513 - please give these changes a try and let me know if it doesn't work for you!

SmalRat commented 3 weeks ago

Hi!

I have run into problems somewhat related to the topic discussed, so will describe their fixes here (opened the corresponding pull request).

I have been using Dagger 0.18.8 for some time utilizing its functionality to plot the DAG. A simple example in an old API:

using Distributed
using Colors
using GraphViz
using Cairo
using Dagger
using FileIO

function taskA(simple_arg, dependencies...)
    return "Result of task A"
end

function oldAPI_graph_setup() 
    a = Dagger.delayed(taskA)(1, )
    b = Dagger.delayed(taskA)(2, a)
    c = Dagger.delayed(taskA)(3, a, b)

    return c
end

function dot_to_png(in, out, width=7000, height=2000)
    dot_code = read(in, String)
    graph = GraphViz.load(IOBuffer(dot_code))
    GraphViz.layout!(graph)

    surface = Cairo.CairoSVGSurface(IOBuffer(), width, height)
    context = Cairo.CairoContext(surface)

    GraphViz.render(context, graph)
    write_to_png(surface, out)
end

# Configure LocalEventLog
ctx = Dagger.Sch.eager_context()
ctx.log_sink = Dagger.TimespanLogging.LocalEventLog()

graph_thunk = oldAPI_graph_setup()
collect(ctx, graph_thunk)

logs = Dagger.TimespanLogging.get_logs!(ctx)
open("graph.dot", "w") do io
    Dagger.show_plan(io, logs, graph_thunk)
end

dot_to_png("graph.dot", "graph.png", 1000, 300)

The resulting content of graph.dot :

strict digraph {
graph [layout=dot,rankdir=LR];
n_4 [label="taskA
85.006 ms",color="#000000",shape="ellipse",penwidth=5];
n_5 [label="taskA
20.606 ms",color="#000000",shape="ellipse",penwidth=5];
n_6 [label="taskA
21.005 ms",color="#000000",shape="ellipse",penwidth=5];
n_arg_1_to_5 [label="2",color="black",shape="ellipse",penwidth=5];
n_arg_1_to_5 -> n_5 [label="Move: 7.6 us",color="black;0.5:#000000",penwidth=2];
n_arg_1_to_4 [label="1",color="black",shape="ellipse",penwidth=5];
n_arg_1_to_4 -> n_4 [label="Move: 7.4 us",color="black;0.5:#000000",penwidth=2];
n_arg_1_to_6 [label="3",color="black",shape="ellipse",penwidth=5];
n_arg_1_to_6 -> n_6 [label="Move: 5.6 us",color="black;0.5:#000000",penwidth=2];
n_4 -> n_5 [label="Move: 20.278 ms",color="#000000;0.5:#000000",penwidth=2];
n_4 -> n_6 [label="Move: 22.9 us",color="#000000;0.5:#000000",penwidth=2];
n_5 -> n_6 [label="Move: 7.8 us",color="#000000;0.5:#000000",penwidth=2];
}

graph.png:

image

However, when updated to 0.18.11, that stopped working. As I see, the show_plan() function became a kind of internal one (now _show_plan()) being replaced by show_logs(), so I changed:

    Dagger.show_plan(io, logs, graph_thunk)

to:

    Dagger.show_logs(io, graph_thunk, logs, :graphviz_simple)

in my code. That has thrown an error:

ERROR: LoadError: UndefVarError: `istask` not defined

The reason is write_dag(io, t::Thunk) uses istask(t) (and dependents(t)) which were not imported (in GraphVizSimpleExt). Fixed that.

Next, I spotted that my graph visualization looks like that (compare with the previous picture): image

The reason can be seen in a graph.dot file:

strict digraph {
graph [layout=dot,rankdir=LR];
n_5 [label="taskA - 5"];
n_4 [label="taskA - 4"];
n_6 [label="taskA - 6"];
5 -> 6
4 -> 5
4 -> 6
}

First of all, node names differ when they are described (label etc.) and when the edges are written. After that was fixed in GraphVizSimpleExt.jl, generated files were:

strict digraph {
graph [layout=dot,rankdir=LR];
n_5 [label="taskA - 5"];
n_4 [label="taskA - 4"];
n_6 [label="taskA - 6"];
n_5 -> n_6
n_4 -> n_5
n_4 -> n_6
}

image

The next issue is that thunks' arguments cannot be seen. There are two reasons: first is that show_logs(io::IO, t, logs, vizmode::Symbol; options...) uses write_dag(io, t::Thunk) (logs argument is essentially omitted):

...
show_logs(io::IO, t, logs, vizmode::Symbol; options...) =
    show_logs(io, t, Val{vizmode}(); options...)
...
show_logs(io::IO, t::Thunk, ::Val{:graphviz_simple}) = _show_plan(io, t)
show_logs(io::IO, logs::Vector{Timespan}, ::Val{:graphviz_simple}) = _show_plan(io, logs)
...
function _show_plan(io::IO, t)
    println(io, """strict digraph {
    graph [layout=dot,rankdir=LR];""")
    write_dag(io, t)
    println(io, "}")
end
...
function write_dag(io, t::Thunk)
...
end

and, secondly, write_dag(io, t::Thunk) does not print non-Thunk and non-Chunk arguments. However, having Thunk should be enough to display such arguments of the DAG. After fixing the last issue, the files look like this:

 strict digraph {
graph [layout=dot,rankdir=LR];
n_5 [label="taskA - 5"];
n_4 [label="taskA - 4"];
n_6 [label="taskA - 6"];
n_5 -> n_6
n_4 -> n_5
n_4 -> n_6
n_arg_1_to_5 [label="2"];
n_arg_1_to_5 -> n_5;
n_arg_1_to_4 [label="1"];
n_arg_1_to_4 -> n_4;
n_arg_1_to_6 [label="3"];
n_arg_1_to_6 -> n_6;
}

image

As for the former issue, that also can be fixed by correcting the functions' signatures. Result:

strict digraph {
graph [layout=dot,rankdir=LR];
n_4 [label="taskA
49.071 ms",color="#000000",shape="ellipse",penwidth=5];
n_5 [label="taskA
12.332 ms",color="#000000",shape="ellipse",penwidth=5];
n_6 [label="taskA
11.447 ms",color="#000000",shape="ellipse",penwidth=5];
n_arg_1_to_5 [label="2",color="black",shape="ellipse",penwidth=5];
n_arg_1_to_5 -> n_5 [label="Move: 2.2 us",color="black;0.5:#000000",penwidth=2];
n_arg_1_to_4 [label="1",color="black",shape="ellipse",penwidth=5];
n_arg_1_to_4 -> n_4 [label="Move: 3.9 us",color="black;0.5:#000000",penwidth=2];
n_arg_1_to_6 [label="3",color="black",shape="ellipse",penwidth=5];
n_arg_1_to_6 -> n_6 [label="Move: 2.2 us",color="black;0.5:#000000",penwidth=2];
n_4 -> n_5 [label="Move: 10.042 ms",color="#000000;0.5:#000000",penwidth=2];
n_4 -> n_6 [label="Move: 13.9 us",color="#000000;0.5:#000000",penwidth=2];
n_5 -> n_6 [label="Move: 3.1 us",color="#000000;0.5:#000000",penwidth=2];
}

image

So currently the usage of

Dagger.show_logs(io, graph_thunk, logs, :graphviz_simple)

yields: image

while omitting logs info:

Dagger.show_logs(io, graph_thunk, :graphviz_simple)

results in: image

SmalRat commented 3 weeks ago

Overall, these are not only fixes for an old API but for the modern one as well, because as for now, it looks like there was no possibility to visualize modern API DAGs due to the multiple dispatch issue described earlier and write_dag() signatures (show_logs(io::IO, t, logs, vizmode::Symbol; options...) and show_logs(io::IO, t, vizmode::Symbol; options...) always used write_dag(io, t::Thunk) in the end, and modern API would need a kind of write_dag(io, t::DTask). Now write_dag(io, logs::Vector, t) can also be used)

Nevertheless, there are still a few problems with visualizing a modern API, so I also tried to address them.

First, current show_logs() signatures do not allow passing DTask there. After importing DTask and correcting signatures, the following code:

using Distributed
using Colors
using GraphViz
using Cairo
using Dagger
using FileIO

function taskA(simple_arg, dependencies...)
    return "Result of task A"
end

function modernAPI_graph_setup() 
    a = Dagger.@spawn taskA(1)
    b = Dagger.@spawn taskA(2, a)
    c = Dagger.@spawn taskA(3, a, b)

    return c
end

function dot_to_png(in, out, width=7000, height=2000)
    dot_code = read(in, String)
    graph = GraphViz.load(IOBuffer(dot_code))
    GraphViz.layout!(graph)

    surface = Cairo.CairoSVGSurface(IOBuffer(), width, height)
    context = Cairo.CairoContext(surface)

    GraphViz.render(context, graph)
    write_to_png(surface, out)
end

# Configure LocalEventLog
ctx = Dagger.Sch.eager_context()
ctx.log_sink = Dagger.TimespanLogging.LocalEventLog()

graph_thunk = modernAPI_graph_setup()
fetch(graph_thunk)

logs = Dagger.TimespanLogging.get_logs!(ctx)
open("graph.dot", "w") do io
    Dagger.show_logs(io, graph_thunk, logs, :graphviz_simple)
end

dot_to_png("graph.dot", "graph.png", 900, 200)

gives: image

However, usage of:

Dagger.show_logs(io, graph_thunk, :graphviz_simple)

or

Dagger.show_logs(io, logs, :graphviz_simple)

still fails (the latter one fails for an old API too). Dagger.show_logs(io, logs, :graphviz_simple) is fixed by making t optional in write_dag(io, logs, t), but using DTask instead of Thunk required writing a few conversion functions.

I have also added a part of the code to infer the thunk arguments solely from the logs. Getting the non-Thunk arguments out of DTask seems to be quite unstable, as it requires unwrapping WeakThunk, which sometimes returns 'nothing' (after being GCd). Hence, some parts of the DAG disappear on the visualization, which is probably worse than nothing. Therefore, I changed the workflow to the following (left only the stable parts of the code by default):

Old API

show_logs(io, logs, :graphviz_simple) - arguments and times are inferred show_logs(io, graph_thunk, logs, :graphviz_simple) - arguments and times are inferred show_logs(io, graph_thunk, :graphviz_simple) - only compute nodes and arguments are deduced from the thunk

Modern API

show_logs(io, logs, :graphviz_simple) - arguments and times are inferred show_logs(io, graph_thunk, logs, :graphviz_simple) - arguments and times are inferred show_logs(io, graph_thunk, :graphviz_simple) - an error suggesting usage of logs is thrown

jpsamaroo commented 2 weeks ago

Nice work on fixing this @smalrat! I'm planning to merge your PR (maybe with some tweaks that I'll suggest in the PR comments), but I also want to provide a few comments for clarity on why this code was neglected:

Regardless, great work on this, I really appreciate the hard work with figuring it all out!

SamuelBrand1 commented 2 weeks ago

Looking forward to results of this PR!