Open constantinpape opened 4 years ago
@m-novikov @Tomaz-Vieira and I had a discussion about how to go on with this and came up with some metrics to compare the tools:
We'll look into the list and try to characterize the different tools according to the metrics Current distribution of work:
Airflow | Snakemake | |
---|---|---|
Language | Python | Python |
Extendability | Extendable executors and operators | Runs arbitrary shell commands |
Activity (github) | 45 pull-requests merged last week | 3 pull-requests merged last week |
Executors | Has kubernetes executor, seems possible to implement unicore one | Communicates with cluster using templated command |
Non-python compat | Good, runs arbitrary bash | Good, runs arbitrary bash |
One-shot / server | Server | One-shot |
45 pull-requests merged last week
Wow!
Metaflow | |
---|---|
implementation language | Python |
extendability | no developer docs |
activity (on github) | > 20 commits during past month |
support for different executors | not really, titus and aws |
non python compatibility | probably yes ;) |
one-shot / server | it monitors progress |
Here are two lists of workflow management systems:
It's hard to choose because some wfms are somewhat specialized or don't support complex workflows or are tied to a technology or language (but many are also language-agnostic). I'd suggest looking for those used in the bioinformatics communities as you're more likely to find people with experience in them in the lab. Have a look at Nexflow. I'd also be interested in trying to move your workflow to Galaxy. Think of Galaxy as a web front-end for an HPC.
Thanks @jkh1. I had a quick look at Nextflow
yesterday, and I am not so sure if it's such a good fit for us: the workflow specification look rather static and for us python is a pretty big advantage over groovy, because it makes integration with our tools easier.
That being said, we should have a closer look.
Galaxy is on our "to-explore list", see https://github.com/ilastik/cloud_ilastik/issues/71#issuecomment-631540185. Once we have a first overview, it would be great to hear more of your or Yi's perspective.
As I was looking through Galaxy, I stumbled this, which I think is very relevant:
CWL | |
---|---|
Language | YAML (wraps executables rather than libraries/function calls) |
Extendability | Workflows can be composed and reused. Spec can use vendor extensions |
Activity (github) | 2 merged PRs last week for the spec repo. The reference runner (cwltool) and Galaxy (which cna consume cwl files) are very active |
Executors | There are several implementations of the spec. Galaxy has partial support, Airflow should be production-ready |
Non-python compat | Good, runs arbitrary binaries |
One-shot / server | Implementation-dependent. Cwltool seems to be one-shot, the others are mostly servers |
CWL is a specification for running workflows, written in YAML. It is a mechanism for wrapping command line executions, defining its inputs, outputs, environment, required resources and dependencies between steps of a workflow, and it has enough room in its configuration space to have runs be reproducible. There are multiple implementations of the standard, including a reference implementation built by the people responsible for the specification, which can be used to run workflows locally. Other implementations claim to be able to run workflows on top of pretty much anything (Slurm, Torque, Openstack, AWS, Azure). There seems to be decent adoption of the spec in the genomics communities.
It seems to me like this would be the most flexible approach (at least when compared to committing to Galaxy and their custom XML workflow descriptions) , since it wouldn't tie anyone to any particular service/framework/runner, and instead the users could choose the one they prefer (including, potentially, Galaxy, which is what I was supposed to look into in the first place =P )
I support going with CWL. CWL is promoted by EOSC-Life at least as a standard for capturing adequate descriptions of workflows. It has indeed some traction in bioinformatics where it can be seen as competing with snakemake and nextflow.
to run workflows on top of pretty much anything
This is not limited to CWL. Galaxy, snakemake and nextflow can do this.
implementation language | Python |
extendability | tasks are composable, pipelines are not |
activity (on github) | 1.8k stars, 112 active issues last week (76 closed + 36 new), 1 merged PR last week |
support for different executors | k8s, Celery, dask, Airflow, AWS, GCP; seems possible to implement a custom one |
non-python compatibility / how to run generic executables | Using subprocess , some integrations exist (e.g. for bash) |
one-shot / server | both (uses executors as servers) |
Dagster's main idea is to have a functional pipeline description solution, that is, it requires to define data dependencies between tasks, not the execution order. All environment-related side-effects (e.g. sending a notification or updating a status table) should be moved out of tasks, so that they could be changed depending on environment. All writes to external sources (filesystem, database, etc.) should be described in the task. The framework also encourages users to check quality of the data as it flows through the system, using type annotations and validations. Conditional execution is supported: conditionally yield in the task body, but connect all outputs in the pipeline definition. Tasks can be generated programmatically. Also, tasks can be nested. Failed tasks can be re-executed (as least from UI).
example_pipeline.py
import os
import numpy
import scipy.ndimage
from dagster import execute_pipeline, pipeline, solid
# Define a task. "context" is a required parameter.
@solid
def create_data(context):
data = numpy.random.default_rng(42).random((300, 200))
context.log.info(f"created random data with shape {data.shape}")
# Shortcut for "yield dagster.Output(data, name="result")"
# Can also return multiple outputs just by using multiple "yield" statements.
return data
# Type annotations are used as shortcuts for input/output definitions.
@solid
def gaussian(context, data, *, sigma: float):
return scipy.ndimage.gaussian_filter(data, sigma)
@solid
def difference(context, a, b):
return a - b
# Pipeline definition: just call solids without "context" as if they are functions.
@pipeline
def my_pipeline():
data = create_data()
# Aliases are needed to create 2 different DAG nodes from the same solid.
# Note that "sigma" parameters are missing; they are read from external config.
feature1 = gaussian.alias("gaussian_1")(data)
feature2 = gaussian.alias("gaussian_2")(data)
difference(feature1, feature2)
config.yaml
# Automatically save intermediate results in a filesystem
storage:
filesystem:
# Configuration for unspecified values.
# Could be also configured from Python with dicts.
# The structure depends on the specific task, but one can define a config schema if needed.
solids:
gaussian_1:
inputs:
sigma:
value: 5.0
gaussian_2:
inputs:
sigma:
value: 3.0
Invocation: dagster pipeline execute -f example_pipeline.py -p my_pipeline -e config.yaml
It's also possible to run this inside a Python script, or start a dagit web UI and click "Execute" there.
dagit UI (shows an execution error of the same pipeline):
implementation language | Python |
extendability | tasks are composable, pipelines are not |
activity (on github) | 2.6k stars, 29 active issues last week (15 closed + 14 new), 30 active PRs last week (23 merged + 7 proposed) |
support for different executors | dask, k8s, AWS Fargate, custom environment |
non-python compatibility / how to run generic executables | ShellTask + a lot of integrations |
one-shot / server | both (has server backend with dashboard and DB) |
Prefect is "Airflow 2.0", started by people from the original Airflow team. Tasks are connected with data dependencies, and could be parametrized. There is a rich notion of a task state and conditions on which a task is supposed to be run. A task is run depending on it's trigger: it could be triggered by inbound tasks, or always run (suitable for shutting down things or clearing resources). Reference tasks define the success state of the whole pipeline (e.g. if all tasks are OK, but the server failed to shut down, consider it a success). Tasks can also be retried with the specified delay, skipped or paused until explicitly resumed (from within the task itself).
example_flow.py
import numpy
import scipy.ndimage
import prefect
from prefect import task, Flow, Parameter
# Define a task.
# Tasks can also be defined as classes.
@task
def create_data():
data = numpy.random.default_rng(42).random((300, 200))
logger = prefect.context.get("logger")
logger.info(f"created random data with shape {data.shape}")
return data
@task
def gaussian(data, *, sigma: float):
return scipy.ndimage.gaussian_filter(data, sigma)
@task
def difference(a, b):
return a - b
# Functional way to create a pipeline.
# There is also a verbose imperative way.
with Flow("My Flow") as flow:
data = create_data()
sigma1 = Parameter("sigma1")
sigma2 = Parameter("sigma2")
feature1 = gaussian(data, sigma=sigma1)
feature2 = gaussian(data, sigma=sigma2)
difference(feature1, feature2)
# Run the flow with parameters.
state = flow.run(sigma1=3.0, sigma2=5.0)
assert state.is_successful()
# This will register the workflow with the server.
# flow.register()
Invocation: just run with python example_flow.py
.
Web UI is packaged as docker/docker-compose (server, PostgreSQL DB, GraphQL engine) with the total size of ~3GB.
Thanks for posting the two other frameworks @emilmelnikov. I think we can start to vote now which tools we should further explore by implementing the example workflow (I am working on this and will share it soon).
Just use the symbol to vote for each tool. I would say everyone has 3 votes and in the end we should at least go with one of the python tools (Airflow, Metaflow, Dagster, Prefect) and one of the non-python tools (Snakemake, Galaxy, Nextflow).
Do you have a list of features you require? Does it have to be python-based? I would suggest to look more into those already widely used in the EMBL and wider bioinformatics communities. It seems to me that many of the features listed for the last two are available from at least some of the others but maybe would need to be looked at in more details there. Also I am thinking here about interoperability and who's going to maintain and/or re-use this. Also on the Galaxy/CWL front, there is no CWL runner for Galaxy, it's in the works but won't be ready any time soon. What will be soon available is a CWL description of a Galaxy workflow.
Do you have a list of features you require?
A (probably not exhaustive) list of requirements:
cloud-ilastik
(we are not sure ourselves yet how exactly this will look)Does it have to be python-based?
It's not strictly necessary, but given that everything we need to support is in python, it would make a lot of sense. If it's not python based, the additional boilerplate code should really be minimal. Also, given that we are all python developers, this is a big maintainability plus.
I would suggest to look more into those already widely used in the EMBL and wider bioinformatics communities. It seems to me that many of the features listed for the last two are available from at least some of the others but maybe would need to be looked at in more details there. Also I am thinking here about interoperability and who's going to maintain and/or re-use this.
Yes, it would be good to have interoperability with these tools. However, my initial impression when looking at something like Galaxy or snakemake was always that these don't really fit our use cases well and that it would require a lot of boilerplate to port one of my more complex workflows, that contain iterative or recursive job specifications, to this.
In any case, our current plan is to make a prototype implementation for an example workflow (still work in progress) with a couple of tools to actually quantify this.
Also on the Galaxy/CWL front, there is no CWL runner for Galaxy, it's in the works but won't be ready any time soon. What will be soon available is a CWL description of a Galaxy workflow.
Thanks, that's good to know!
We had a mini hackathon where @emilmelnikov @Tomaz-Vieira @k-dominik and @m-novikov worked on implementing the example workflow I put together in some of the tools we discussed here. The proof of concept implementations are here: https://github.com/constantinpape/cloud-ilastik-example-project/pulls.
We have tried the following tools:
prefect
: implementing the workflow was relatively straight forward, the code is easy to understand, it is possible to also implement the "scatter-gather" style tasks in prefect. One remark by Maxim: it would be nice to hide some of the complexity of tasks like connected components in some sort of "meta node", there seems to be a subflow concept for this, but seems like it's still in the discussion stage. Maxim hasn't tried actually running in parallel mode yet.dagster
: implementing the high level dependencies of the workflow is easy (i.e. "Ilastik Prediction" -> "Threshold" ...), for parallelization this still uses a threadpool. Emil thinks that using dagster also for the parallelization part is possible but probably not a perfect fit.galaxy
: setting up a dev instance of galaxy was quite some work; the galaxy lingo is pretty different from what we are used to. There is a galaxy instance running on the EMBL server, but in order to deploy something there it needs to go through an admin of the instance.cwl
: yaml based workflow description, supposed to be portable. Default runner seems quite limited in features, but is easy to use for development. Most sophisticated implementation seems to be Arvados. Not meant to express control flow (loops, branching). Can express software environment in which workflows should be run, but how much the implementations will honor those is still unexplored.Of course all conclusions are preliminary after only a few hours of working on this.
We had a rather long discussion about the following point: There are two separate concepts:
In principle both could be handled in the same large computational graph, but it's also possible to split it into one graph for the high-level workflow and separate graphs for the tasks. Having it in one big graph would provide most flexibility on how the computation is run:
However, this approach also comes with a down-sides:
The fact that we deal with large 3d volumes, where parallelizing (non-trivial) computation creates dependencies between the tasks (through scatter-gather tasks and dependencies of errors) that are not there for applications where one mostly needs to parallelize over independent files (e.g. a lot of 2d images).
For the first approach (big computational graph), it looks like prefect is the way to go. For the second approach (separate computational graphs), it would probably be best to use a tool more tailored for deploying computations (dask!) for deployment/paralellization and then handle dependencies with one of the workflow managers (we could still go with prefect; but in this case it might also not be so hard to switch the workflow manager).
What usage modes / application do we actually want to support? Who is the target audience?
Implement the reference workflow with dask, figure out if / how we can implement some of the additional requirements with dask, e.g. handling computation failure. Figure out the scope ;).
The distinction between low/high level is what I was trying to get at because indeed some workflow management systems are designed for the high level case of chaining tools together. If the workflow is embedded in the application itself then you need lower level management which usually means interfacing with the programming language used which some wfm (e.g. for python) do.
Regarding scope and audience, systems like Galaxy are meant to make HPC accessible to non computer-savvy users. It is essentially a server providing a GUI around tools to compose workflows and transparently dispatch them to an HPC backend. There's a bit of infrastructure admin involved but then I believe this scales better in terms of number of users and it is not limited to one scientific domain which means that one EMBL instance could serve all of EMBL. This should be a consideration if support is handed over to some EMBL support team as we don't want to scatter support over too many different systems. If the wfm is to be embedded in the application then obviously it's for the developers to support :)
Here's my thinking after digesting the discussion today a bit more: For my use-cases the option of separating low-level + high-level computation would make more sense. I think writing some logic around the low level computation framework (most likely dask) for error handling etc. and working on integrating it nicely with the high-level framework is more feasible then having the all-in-one framework solution (for which as far as I can see none of the workflow managers is really made). This has the additional benefit that it makes the whole approach more modular and it might be possible to support more than one high level framework (at least for some use cases). I think this model would fit the ilastik batch prediction use case pretty well too.
However, it is not a good fit for the interactive ilastik use-case. But I think the requirements for this use-case is so different that we would need to end up with different systems anyways. If we could use dask for the interactive ilastik backend, we could still share functionality though. Let's see.
Regarding scope and audience, systems like Galaxy are meant to make HPC accessible to non computer-savvy users. It is essentially a server providing a GUI around tools to compose workflows and transparently dispatch them to an HPC backend. There's a bit of infrastructure admin involved but then I believe this scales better in terms of number of users and it is not limited to one scientific domain which means that one EMBL instance could serve all of EMBL. This should be a consideration if support is handed over to some EMBL support team as we don't want to scatter support over too many different systems. If the wfm is to be embedded in the application then obviously it's for the developers to support :)
I think the requirements for "More general, create framework for large 3d image processing for members of the lab and other developers."
and "Make large 3d image processing available to users at EMBL (and HBP) through a convenient tool."
are indeed very different.
For the former Galaxy (and also cwl) are not a good fit. There model of execution is just not a good fit for developing and deploying large-scale research code, where neither dependencies nor the steos workflow are fixed.
On the other hand, it could make sense to port some more stable workflows to Galaxy to make them more easily available.
To follow up on #70, here are some related libraries I had a look at while implementing my research code for scalable algorithms in https://github.com/constantinpape/cluster_tools and how I think they relate to what we want to do:
cluster_tools
, but I find it opaque for expressing more complex job relations and re-running tasks, so I would probably not choose it again. initially developed at spotify.Additional resources mentioned by @wolny and @jkh1: