Closed tobiasraabe closed 1 year ago
Hey @tobiasraabe, I'm not sure if this is the right place for this discussion, but I just noticed this issue so I thought I'd chime in.
I have wrapped pytask in my own way so that the underlying processing function is independent of the task, so that
# in lib.py
def process(df: pd.DataFrame, nrows: int, annotations: Path) -> pd.DataFrame:
...
Separately, the tasks can be defined, along with how to load and save the inputs and outputs:
# tasks.py
class Loader(typing.Protocol):
@property
def path(self) -> Path:
...
def load(self) -> Any:
...
class Saver(typing.Protocol):
@property
def path(self) -> Path:
...
def save(self, data: Any) -> None:
...
class DataFrameLoader:
...
class DataFrameSaver:
...
class JsonLoader:
...
class MyTask:
def __init__(self, func, *, loaders: dict[str, Loader], saver: Saver):
self.func = func
self.loaders = loaders
self.saver = saver
def to_pytask(self):
depends_on = [l.path for l in self.loaders.values()]
@pytask.mark.depends_on(depends_on)
@pytask.mark.produces(self.saver.path)
def my_task(depends_on, produces):
kwargs = {name: loader.load() for name, loader in self.loaders.items()}
result = self.func(**kwargs)
self.saver.save(result)
return my_task
task_process = MyTask(
func=lib.process,
loaders={
"df": DataFrameLoader("input.csv"),
"nrows": JsonLoader("config.json", "nrows"),
"annotations": "annotations.txt", # IDK how to support passing in literal paths, probably could avoid a PathLoader if we were clever.
},
saver=DataFrameSaver("output.csv"),
).to_pytask()
This could also be turned into a decorator API. Also I have glossed over a few things, like I have an additional layer that makes it possible to use positional args in addition to kwargs, and a layer for translating the raw output of a function, in case the function returns two things, so that you can have a separate saver for each.
I didn't suggest this as a feature earlier because it really was breaking to the whole API, but if we are going to make breaking changes anyway, could we lean all the way into direct dependency injection?
Hi @NickCrews! It is definitely the right time to talk about changes! I started collecting ideas in this discussion. We should probably move it there so other people are also notified.
Unfortunately, the Python API is immature or almost inexistent. Sorry that you had to work around it like this. But I am really interested in your use case. I am trying to figure out what would be the desired interface for you.
pytask.main()
?Task
class in pytask. A user could create such class and set all necessary attributes like depends_on
and produces
. Since there happens some validation before the object is instantiated, it might not be a super safe way.@pytask.mark.task
decorator. Your example could become this (Note: I am using some features from the new pytask interface in v0.4):I hope this example still covers everything that you need. If I have missed an important detail of your workflow, please, tell me.
This solution would also be cool, because except for the run_tasks
function, everything should be there already in the v0.4 release.
How do you run this workflow? Do you use the CLI or pytask.main()?
I use the CLI, but only because there isn't a good python API. In addition to run_tasks
, a basic need would be to filter tasks, similar to the -k option on the command line (but ideally more flexible, since we have the full power of python). With those two things I could make functional API for my app to execute certain tasks. A later bonus would be an API to be able to interrogate the task graph to find upstream/downstream tasks, file deps and outputs of a task, etc, but those would be nice-to-haves.
Your example looks good! I think it would work for me. I like your use of typing.Annotated, but I'm not quite sure why that is required? Is there a comprehensive overview of all the changes coming? I would love to browse.
There are a few things that I think could be improved though:
FilePathNode
. A Saver only needs to provide two things (I think, correct me if I'm missing something): A path known at scheduling time, and a function to do the saving at run time. A Loader only needs to provide two things: A path known at scheduling time, and a function to do the loading at run time. Since that is so simple, I would prefer the simplicity, future-proofness, and decoupled-ness of a duck-typed Protocol. When creating a Task, we can add some translation code that turns this external representation into the FilePathNode objects that get used internally.Am I missing something with the purpose of the .value
property of a Saver? is that just some boilerplate required to satisfy the requirements of a FilePathNode
, or could this return something other than self
for some Savers? If it's boilerplate, that is another argument for what I propose above.
pytask.mark.task()
? In fact, I actually have the output stuff separated into two separate steps:
a. Take the raw output of the function (which might be a single value, or maybe a tuple of two values, or a dict, etc) and translate this into a dict of output_name: value
, so every bit of output has a name.
b. Take this dict of values, and for each output, dispatch it to the proper Saver.Here is my full wrapper code that shows the above. It also takes another step of assigning a unique name to every single input and output (which in this case has a simple 1:1 mapping to a Path, but in theory these could be separated), and adding a DataStore that acts as a cache so that once data is loaded or saved once you don't have to load it again. The DataStore also separates the concerns of IO from the Task object. The Task just says self.datastore.load(name_of_data)
and it gets the data. The DataStore is the one that actually holds and orchestrates the Loader and Saver objects. Hopefully there is something in there that you find interesting.
from __future__ import annotations
from collections.abc import Mapping
from functools import cached_property
import logging
from pathlib import Path
from typing import Any, Callable, Hashable, Iterable, Protocol, TypeVar
import fire
import ibis
from ibis.expr.types import Table
import pytask
from noatak import io
logger = logging.getLogger(__name__)
class PInputs(Protocol):
"""The specification for the inputs to a task function.
This is used to convert input data from a name:value mapping to
the args and kwargs that the task function expects.
"""
@property
def names(self) -> frozenset[str]:
"""The names of all inputs"""
raise NotImplementedError
def convert_input(
self, input: Mapping[str, Any]
) -> tuple[tuple[Any], dict[str, Any]]:
"""
Convert a k:v mapping to args and kwargs suitable for passing to a function
"""
raise NotImplementedError
def __repr__(self) -> str:
return f"{self.__class__.__name__}({set(sorted(self.names))})"
class Inputs(PInputs):
def __init__(self, args: Iterable[str], kwargs: Mapping[str, str]):
self.args = frozenset(args)
self.kwargs = dict(kwargs) # TODO make this immutable
# map of arg_name: input_name
# arg_name must be the key because one input might be used for multiple args
@cached_property
def names(self) -> frozenset[str]:
return frozenset(self.args) | frozenset(self.kwargs.values())
def convert_input(
self, input: Mapping[str, Any]
) -> tuple[tuple[Any], dict[str, Any]]:
args = tuple(input[arg] for arg in self.args)
kwargs = {arg: input[name] for arg, name in self.kwargs.items()}
return args, kwargs
@classmethod
def make(
cls,
inputs: str | Iterable[str] | Mapping[str, str] | None,
):
if inputs is None:
args = tuple()
kwargs = {}
return cls(args, kwargs)
elif isinstance(inputs, str):
args = (inputs,)
kwargs = {}
return cls(args, kwargs)
try:
kwargs = dict(inputs.items())
args = tuple()
return cls(args, kwargs)
except AttributeError:
args = tuple(inputs)
kwargs = {}
return cls(args, kwargs)
class POutputs(Protocol):
"""The specification for the outputs of a task function."""
@property
def names(self) -> frozenset[str]:
"""The names of all outputs"""
return self._names
def convert_output(self, output: Any) -> dict[str, Any]:
"""Convert the output of a task function to a name:value mapping."""
raise NotImplementedError
@classmethod
def make(cls, names: str | Iterable[str] | Mapping[str, Hashable] | None):
if names is None:
return NoneOutputs()
elif isinstance(names, str):
return ScalarOutputs(names)
try:
kwouts = dict(names.items())
return MappingOutputs(kwouts)
except AttributeError:
return IterableOutputs(names)
def __repr__(self) -> str:
return f"{self.__class__.__name__}({set(sorted(self.names))})"
class NoneOutputs(POutputs):
_names = frozenset()
def convert_output(self, output: Any) -> dict[str, Any]:
return dict()
class ScalarOutputs(POutputs):
def __init__(self, name: str):
self._name = name
self._names = frozenset((name,))
def convert_output(self, output: Any) -> dict[str, Any]:
return {self._name: output}
class IterableOutputs(POutputs):
def __init__(self, names: Iterable[str]):
self._names = tuple(names)
def convert_output(self, output: Any) -> dict[str, Any]:
return dict(zip(self._names, output))
class MappingOutputs(POutputs):
def __init__(self, names: Mapping[str, str]):
self._kwargs = dict(names)
self._names = frozenset(names.keys())
def convert_output(self, output: Any) -> dict[str, Any]:
return {name: output[raw_name] for name, raw_name in self._kwargs.items()}
class PLoader(Protocol):
@property
def names(self) -> frozenset[str]:
"""The names of the data that can be loaded."""
raise NotImplementedError
def path(self, name: str) -> Path:
raise NotImplementedError
def load(self, name: str) -> Any:
raise NotImplementedError
class PSaver(Protocol):
@property
def names(self) -> frozenset[str]:
"""The names of the data that can be saved."""
raise NotImplementedError
def path(self, name: str) -> Path:
raise NotImplementedError
def save(self, name: str, data: Any) -> None:
raise NotImplementedError
_LoaderOrSaverT = TypeVar("_LoaderOrSaverT", PLoader, PSaver)
class DataStore:
def __init__(
self,
loaders: Iterable[PLoader] = [],
savers: Iterable[PSaver] = [],
data: dict[str, Any] | None = None,
):
self._loaders = frozenset(loaders)
self._savers = frozenset(savers)
self.datas = dict(data) if data is not None else {}
try:
self._loader_map = self.build_map(loaders)
except ValueError as e:
raise ValueError("Duplicate loader names") from e
try:
self._saver_map = self.build_map(savers)
except ValueError as e:
raise ValueError("Duplicate saver names") from e
def evolve(self, loaders=None, savers=None, data=None):
return DataStore(
loaders=loaders or self._loaders,
savers=savers or self._savers,
data=data or self.datas,
)
def get_loader(self, name: str) -> PLoader:
try:
return self._loader_map[name]
except KeyError as e:
raise ValueError(f"No loader for {name}") from e
def get_saver(self, name: str) -> PSaver:
try:
return self._saver_map[name]
except KeyError as e:
raise ValueError(f"No saver for {name}") from e
def load(self, name: str) -> Any:
if name in self.datas:
return self.datas[name]
result = self.get_loader(name).load(name)
self.datas[name] = result
return result
def save(self, name: str, data: Any) -> None:
saver = self.get_saver(name)
saver.save(name, data)
self.datas[name] = data
@staticmethod
def build_map(ls: Iterable[_LoaderOrSaverT]) -> dict[str, _LoaderOrSaverT]:
m = {}
duplicates = set()
for loader in ls:
for name in loader.names:
if name in m:
duplicates.add(name)
else:
m[name] = loader
if duplicates:
raise ValueError(duplicates)
return m
class Task:
def __init__(
self,
func,
*,
inputs: PInputs,
outputs: POutputs,
data_store: DataStore,
name: str | None = None,
):
self.func = func
self.inputs = inputs
self.outputs = outputs
self.data_store = data_store
self.name = name or func.__name__
def load(self) -> tuple[tuple[Any], dict[str, Any]]:
logger.info(f"Reading {self.inputs}")
input = {name: self.data_store.load(name) for name in self.inputs.names}
return self.inputs.convert_input(input)
def save(self, raw_output: Any):
output = self.outputs.convert_output(raw_output)
logger.info(f"Writing {self.outputs}")
for name, data in output.items():
self.data_store.save(name, data)
def __call__(self, *args, **kwargs):
return self.func(*args, **kwargs)
def cli(self, verbose: bool = False):
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(level=level, force=True)
self.run()
def fire(self):
fire.Fire(self.cli)
def run(self):
logger.info(f"Running {self.func.__name__}")
args, kwargs = self.load()
result = self.func(*args, **kwargs)
self.save(result)
def to_pytask(self):
"""Convert into a pytask task function."""
input_paths = frozenset(
(self.data_store.get_loader(name).path(name) for name in self.inputs.names)
)
output_paths = frozenset(
(self.data_store.get_saver(name).path(name) for name in self.outputs.names)
)
@pytask.mark.depends_on(input_paths)
@pytask.mark.produces(output_paths)
def my_task(depends_on, produces):
self.cli()
return my_task
class _TableLoaderSaverBase:
def __init__(self, names: Iterable[str], path_getter):
self.names = frozenset(names)
self.path_getter = path_getter
def path(self, name: str) -> Path:
return self.path_getter(name)
class TableLoader(_TableLoaderSaverBase):
def load(self, name: str) -> Table:
return ibis.read_parquet(self.path(name))
class TableSaver(_TableLoaderSaverBase):
def save(self, name: str, data: Table) -> None:
io.to_parquet(data, self.path(name))
def make_task(
ins, outs, path_getter
) -> Callable[[Callable], Task]:
"""
Decorator that creates a task that reads and writes tables with the given names.
"""
i = Inputs.make(ins)
o = POutputs.make(outs)
loader = TableLoader(i.names, path_getter)
saver = TableSaver(o.names, path_getter)
data_store = DataStore(loaders=[loader], savers=[saver])
def wrapper(func):
return Task(func, inputs=i, outputs=o, data_store=data_store)
return wrapper
def run_task(name: str):
# s flag means don't capture stdout, so
# we get logging output as tasks run
session = pytask.main({"k": name, "s": True})
if session.exit_code != 0:
raise RuntimeError(f"Task {name} failed", session)
Also an interesting API worth exploring would be if Loaders and Savers exposed their path using the __fspath__
protocol used by os.PathLike. But maybe an explicit .path
attribute/method would be better.
a basic need would be to filter tasks
Should be possible by passing main({"expression": "..."})
, isn't it? A more flexible approach is, for example, implemented in https://github.com/darrenburns/ward. I think it allows you to even search for code in the test function body.
A later bonus would be an API to be able to interrogate the task graph
I am sure you found it, but in case and for now, you find everything under session.dag
which is a networkx.DiGraph
. We could build on top of it.
Is there a comprehensive overview of all the changes coming?
I created #400 to track the changes in v0.4 that are developed on branch v4
to keep the main branch clean for patches. In #392, you find a preliminary documentation for the new features. Follow the RTD link in the Github Actions Checks for a built version.
Don't require me to import and inherit from
FilePathNode
.
That is a good point and I really like how you use protocols and how they might better fit to pytask. Let me explain what is happening.
MetaNode
that all inherit from and that requires nodes to have a name
and a state method (to produce a value to detect changes like hashes or modification timestamps) and it should also add .value
which is maybe missing.FilePathNode
for code sharing and as a quick solution. Thereby, your node reuses the necessary methods which works because it also relies on paths. And we get some other benefits that are only implemented for FilePathNode
s like prettier formatting of paths.FilePathNode
.MetaNode
could also be a protocol and save an import.I further have to think about it, but it sounds like protocols are preferable over abcs.
Am I missing something with the purpose of the .value property of a Saver?
.value
is determines what is injected into the task. It should be part of the protocol for nodes.
Can we avoid passing in the saver ...
I have to further think about this one. I find it super interesting and allowing task functions to return could make some things a lot simpler for users. Thanks for sharing your code examples! I will take a look soon.
main({"expression": "..."})
I don't see "expression" as one of the options on the CLI? Per https://pytask-dev--392.org.readthedocs.build/en/392/reference_guides/api.html#pytask.main, I thought that this expects the same args as the CLI. What would "expression" do? I think making main() actually have typed args would help here. But exposing main() might not even be the best idea, I think ideally it would be more granular with pytask.find_tasks()
and then you could filter tasks like filtered = [t for r in pytask.find_tasks() if is_good_task(t)]
and then pytask.run_tasks(filtered)
Note that my example was one of my first uses of Protocols, so I'm not quite doing it correctly in my example, and am using them more like ABCs.
Thanks for the explanation of MetaNode, etc. That makes sense, and I like how you did that internally. I think those should stay as internal details though, and the public interface is you give pytask something with a .path() and a .load()/.save() method, and pytask can create Nodes from those.
I agree it is quite opaque. I had to check myself. Internally, the config is a big dictionary and this line determines the name of the option internally. So, "expression"
should be the key for -k
and "marker_expression"
for -m
. I partially agree with a typed main. It needs to be somewhat flexible since plugins can add more options, but we cannot dynamically change the signature. Something like **kwargs
might help.
Your examples made me reread this amazing guide (https://hynek.me/articles/python-subclassing-redux/) and I think now I understand protocols a lot better.
The new release looks awesome @tobiasraabe ! Thanks so much for your work on this! The new release solves many of the missing features I was looking for.
Thank you for the nice words, @NickCrews. Our discussions helped to shape this release significantly. Thanks for your ideas and time!
Todo
PythonNode
as the default. #384NamedTuple
are supported for@pytask.mark.task(kwargs=...)
. #397Product
to get rid of@pytask.mark.produces
. #394@pytask.mark.depends_on
and@pytask.mark.produces
.PythonNode
, restrictions and possible hashing mechanisms.@pytask.task
over deprecated usage.collect_utils.py
.main()
with builtin arguments. Rest stayskwargs
. Renamemain()
tobuild()
. #411Discussion