An extensible task specification and compiler for local and distributed workflows.
To install with test dependencies, in a virtual environment with Python >= 3.10, run:
$ pip install -e ".[test]"
In ecoscope-workflows
, tasks (python functions) are composed into DAGs
via compilation specs defined in YAML. These specs can then be compiled
to various targets including serial Python scripts (to run locally) and Airflow DAGs (to run on Kubernetes),
via the ecoscope-workflows
CLI.
The extensible task registry supports registration of user-defined and third-party tasks.
Data connections are a special type of parameter that can be provided to a task, which automatically resolves a client for an external data source at runtime from pre-configured fields and/or environment variables.
Tasks are strongly-typed Python functions wrapping a unit of work to be completed as the node of a Directed Acyclic Graph (DAG). In pseudocode, with comments describing the key features of a task definition:
# python's builtin annotated type let's us attach extra metadata to
# function parameters, such as descriptions which can be used if the
# task is to be configured through a form-based web interface. it is
# also essential to leverage pydantic's validation machinery.
from typing import Annotated
# we leverage pydantic to augment that function metadata for ease of
# introspection and type parsing/coercion in the distributed context
from pydantic import Field
# we define some of our own custom subscriptable annotations as well.
# the `DataFrame` type allows us to require input and output data to
# our task conforms to expected schemas
from ecoscope_workflows.annotations import DataFrame
# finally, our `@distributed` decorator gives us the ability to auto-
# magically deserialize inputs and/or serialize outputs to the task,
# if the workflow compilation target requires it. also, it lets us
# attach top-level metadata to the task such as an `image`, container
# resource requirements, and `tags` for sorting and categorization.
from ecoscope_workflows.decorators import distributed
# if a task handles tabular data, than both the input and output
# data require a schema, which is subscripted to our `DataFrame`
# annotation. this allows us to check task compatibility at DAG
# compile time, prevent mis-matching of tasks, and validate data
# for correctness before releasing it to downstream tasks.
InputSchema = ...
OutputSchema = ...
@distributed(image="custom-image:latest", tags=["foo"])
def my_cool_analysis_task(
input_dataframe: DataFrame[InputSchema],
some_int_parameter: Annotated[
int,
Field(default=1, description="An integer used for math."),
],
another_float_parameter: Annotated[
float,
Field(default=2.0, description="An float, for math."),
],
) -> DataFrame[OutputSchema]:
# contrary to standard python convention, imports of any heavy
# data analysis dependencies are deferred to the function scope.
# this allows the compiler to import tasks and introspect their
# call signatures, without requiring `awesome_pydata_package` to
# exist in the compilation/parsing environment. this becomes
# especially valuable when you consider that each task is capable
# of being run in a totally isolated container environment, so
# these "inner" task dependencies (versions, etc.) do not necessarily
# need to be compatible with those of other tasks in the same DAG.
from heavy_duty_pydata_package import cool_analysis
return cool_analysis(
input_dataframe,
some_int_parameter,
another_float_parameter,
)
Compilation specs define a DAG of known tasks, which can either be built-ins, or tasks registered via the extensible task registry.
The inline comments in this example explain what each line means:
# the workflow name
name: calculate_time_density
# for distibuted compilation targets (e.g. Airflow), this is where task
# results will be serialized for passing between nodes (i.e. tasks)
cache_root: gcs://my-bucket/ecoscope/cache/dag-runs
# the tasks. these names either have to be built-ins or third-party registrations
# via the "ecoscope_workflows.tasks" entry point. to see a list of known tasks for
# a given python environment, from the terminal, run `ecoscope-workflows tasks` to
# dump the current task registry to stdout.
tasks:
# root tasks (which have no dependencies on the output of other tasks) are specified
# by their name, followed by empty curly braces, like so:
get_subjectgroup_observations: {}
# this next task has a dependency defined within it...
process_relocations:
# this means, that the `process_relocations` task takes an argument `observations`,
# and we want the value passed to this argument to be the return value of the task
# `get_subjectgroup_observations`, which is the root task of this workflow.
observations: get_subjectgroup_observations
relocations_to_trajectory:
# the pattern continues: `relocations_to_trajectory` has an argument named
# `relocations`. populate its value from the return value of `process_relocations`
relocations: process_relocations
calculate_time_density:
# etc.
trajectory_gdf: relocations_to_trajectory
draw_ecomap:
# and etc., until the last task!
geodataframe: calculate_time_density
Note that any arguments for a task not specified in the spec will need to be passed at the time the workflow (script, Airflow DAG, etc.) is invoked. The following command
$ ecoscope-workflows get-params --spec ${PATH_TO_SPEC}
will return these invocation-time parameters as either jsonschema (for programmatic consumption)
or as a fillable yaml form (for humans); use the --format
option to choose!
$ ecoscope-workflows --help
usage: ecoscope-workflows [-h] {compile,tasks,get-params} ...
options:
-h, --help show this help message and exit
subcommands:
{compile,tasks,get-params}
compile Compile workflows
tasks Manage tasks
get-params Get params
Don't see the task you want here already? First, check the
task-request
label on the issue tracker
to see if anyone else has requested this task. If not, please raise an issue describing your requested task!
This will give you an opportunity to crowdsource implementation ideas from the core team and community.
It's possible your request will become a built-in task in ecoscope-workflows
one day! But until then,
we provide an means for extending the tasks visible to the compilation environment via Python's entry points.
To extend the task registry, simply:
tasks
tasks
, define your extension tasks using the @distibuted
decorator, and adhering to the other
style conventions described in tasks above.pyproject.toml
, use projects.entry-points
to associate the fully qualified
(i.e., absolute) import path for your tasks module with the "ecoscope_workflows".tasks
entry point. For example:# pyproject.toml
[project]
dependencies = ["ecoscope_workflows"]
[project.entry-points."ecoscope_workflows"]
# here we are imagining that your extension package is importable as `my_extension_package`,
# but you will no doubt think of a catchier name than that! note that you must provide a
# top-level `.tasks` module, i.e.:
tasks = "my_extension_package.tasks"
With these steps in place, simply install your package in the compilation Python environment, and then run:
$ ecoscope-workflows tasks
You should see your extension packages listed and can now freely use them in compilation specs.
This same mechanism is how the built-in tasks are collected as well! If you're curious how this works, check out the
pyproject.toml
for our package, as well as theregistry
module. This design is inspired byfsspec
.
Data connections are configuration that allows us to resolve clients for external data services
at runtime. To create a new connection, run ecoscope-workflows connections create
, for example:
$ ecoscope-workflows connections create --type earthranger --name amboseli
In this case, we are creating an EarthRangerConnection
named amboseli
. This connection can then
be resolved into a client at runtime in any task which takes an argument annotated with the type
EarthRangerClient
, for example:
from ecoscope_workflows.annotations import DataFrame, EarthRangerClient
from ecoscope_workflows.decorators import distributed
OutputSchema: ...
@distributed
def fetch_data_from_earthranger(
# the `EarthRangerClient` annotation here tells us what type this
# parameter *will be* by the time we enter the body of this task.
client: EarthRangerClient,
) -> DataFrame[OutputSchema]:
# once we get here, we have a real client
client.get_some_data()
When configuring this task to run, the name of a pre-configured connection can then be passed
to the client
argument as a string, and it will be resolved into an actual client object automatically
based on the pre-configured connection of the same name. For example:
# params.yaml
fetch_data_from_earthranger:
client: "amboseli"
To develop an new example workflow:
.tasks
submodule that best fits their use:
io
: Fetching data from third parties. Anthing requiring a client, token, credentials, etc. should go here.preprocessing
: Munging data prior to analysisanalysis
: Performing an analytical function.results
: Encapulating the output of analyses as maps, summary tables, etc.examples/compilation-specs
Once you have a compilation spec, you may want to interact with your compiled workflow in the form of a Jupyter notebook.
To do so, first compile your spec to jupytext with:
$ ecoscope-workflows compile \
--spec examples/compilation-specs/${WORKFLOW_NAME}.yaml \
--template jupytext.jinja2 \
--outpath examples/dags/${WORKFLOW_NAME}_dag.jupytext.py
The generated file examples/dags/${WORKFLOW_NAME}_dag.jupytext.py
can be opened directly in Jupyter Lab or Notebook
using the jupytext extension, or converted into .ipynb
with the
jupytext CLI.
Running your workflow interacively in a Jupyter environment may be useful for development.
Once you are satisfied that your workflow runs as expected in the interactive environment, you may generate example dags in any applicable forms, along with parameters files.
$ ecoscope-workflows compile \
--spec examples/compilation-specs/${WORKFLOW_NAME}.yaml \
--template script-sequential.jinja2 \
--outpath examples/dags/${WORKFLOW_NAME}_dag.script_sequential.py
As jsonschema:
$ ecoscope-workflows get-params \
--spec examples/compilation-specs/${WORKFLOW_NAME}.yaml \
--format json \
--outpath examples/dags/${WORKFLOW_NAME}_params.yaml
As fillable yaml:
$ ecoscope-workflows get-params \
--spec examples/compilation-specs/${WORKFLOW_NAME}.yaml \
--format yaml \
--outpath examples/dags/${WORKFLOW_NAME}_params.yaml
$ python3 examples/dags/${WORKFLOW_NAME}_dag.script_sequential.py \
--config ${PATH_TO_FILLED_YAML_PARAM_FORM}
TODO: `ecoscope-workflows compile` offers a useful `--testing` mode which allows for
mocking the output of tasks in a compiled DAG using example data packaged alongside
the task definition. Documentation for this feature is forthcoming!