Open makkus opened 8 months ago
Most of the basics of how to create a kiara pipeline have already been covered by:
https://dharpa.org/kiara.documentation/latest/extending_kiara/pipelines/assemble_pipelines/
So this is a deeper dive into some of the important things around this topic, and explains how describing workflows declaratively makes a difference compared to the traditional procedural way of doing data science.
As an example, for now lets the logic.nand
pipeline that is included in the kiara_plugin.core_types
plugin.
It's as simple as it gets, it contains 2 steps: an and
operation (returns True
only if both inputs are also True
, and a not
operaition (negates the input).
Both are connected via the output of the and
step being connected to the input of the not
step. This means of course that the not
step can't be executed before (or at the same time) as the and
step, because it needs that input first to be available.
kiara can figure out the order in which steps have to be run, and you can let it show you what it thinks with:
kiara pipeline explain logic.nand
As you can see from that output, it recognizes that it needs two separate 'stages', and which step (or steps -- for pipelines with more steps than this simple example) it needs to assign to each stage. This can be arbitrarily complex, and in some cases there it's not clear in which stage a step should run (imagine a 5 stage pipeline, and one step only has a direct user input, but needs to run before the last stage: it could run in stage 1, 2, 3, or 4, technically).
Job descriptions are implemented in the [JobDesc][kiara.interfaces.python_api.models.job.JobDesc] class. The can be used to hold a reference to an operation, as well as inputs for that specific operations in a single Python object (or json/yaml file). A simple example job description using our logic.namd
pipeline would look like so (as a yaml file called my_namd.yaml
for example:
operation: logic.nand
inputs:
a: True
b: True
kiara can run this easily, via:
kiara run my_namd.yaml
Or via the API:
kiara = KiaraAPI.instance()
result = kiara.run_job('my_namd.yaml')
dbg(result)
Or, more verbose (of course you can assemble the JobDesc
instance any way you see fit):
from kiara.interfaces.python_api.models.job import JobDesc
kiara = KiaraAPI.instance()
job = JobDesc.create_from_file('my_namd.yaml')
result = kiara.run_job(job)
dbg(result)
For more complex examples that use a pipeline file as operation, have a look at the kiara_plugin.tabular example jobs. I'm happy to answer questions that go further in detail, but don't want to write to much here, so as always, just ping me and ask.
The main thing to understand is that a Job description contains the operation as well as inputs.
The 'code-view' feature that was requested is similar to the explain
command above. It is implemented as a 'renderer`:
This is a minor feature that was implemented mainly for the code-view feature, but I also use it internally for some debug/dev work. It basically provides a small, modular mini-framework where you can write a renderer for an internal kiara model (a pipeline, job description, value, data-type, you get the idea). Except for the renderer(s) I describe below, none of them are in any way 'production' (or even more than a stub in many cases), but feel free to try them out if you are interested. You can see the available renderers via:
kiara render list-renderers
And since we are only interested in renderers that take a 'pipeline' as input, we can filter like:
kiara render --source-type pipeline list-renderers
That still shows quite a few renderers, for now we are interested in the one that has python_script
as target. We can run it like:
kiara render --source-type pipeline --target-type python_script item <the_pipeline_op_name_or_file>
so:
kiara render --source-type pipeline --target-type python_script item logic.nand
Checking the output of that command, we can see it is a python script, only missing some inputs. We can copy that text, and edit the inputs like:
# A boolean describing this input state.
pipeline_input_a = True
# A boolean describing this input state.
pipeline_input_b = True
Save it in a file nand.py
, and run it via:
python nand.py
Not super interesting, but we should get a False
output...
The code view feature was requested in order to let users who use lumy
(or initial GUI) see code that would do the same thing they were doing via the graphical interface. As a Jupyter notebook.
If you install the kiara_plugin.jupyter plugin into your environment, you should also see a jupyter_notebook
target renderer. We can use it similarly:
kiara render --source-type pipeline --target-type jupyter_notebook item logic.nand > nand.ipynb
jupyter lab nand.ipynb
Edit the inputs again, and you should again be able to run this pipeline as Python code within Jupyter, using the kiara Python API.
Play around a bit more with this, and use more complex pipelines to see how this works in those cases.
The main advantage of using a declarative pipeline structure is that it makes it easier to reason about the workflow, as it's a static data structure, that can be probed, visualized, etc. You can see at a glance what the pipeline does, and what the inputs are. You can also see the order in which steps are executed, and what the dependencies are between steps, whereas with code you don't really have that information as readily available, and you can't really do much (meta-)investgation computationally (unless you consider jumping into AST parsing but that would be a different level of complexity again).
kiara contains two main Python classes that relate to Pipelines and their structure:
PipelineStructure
, but it also provides methods to set inputs, run internal steps (all of them, or one by one), check the current internal state of each step, current inputs, outputs, etc. This needs an implementation of a PipelineController
to do some of the work, and I'll write up some simple examples below, but in general, this is not trivial to use, unfortunately, because of the overall complexity of everything involved. Still, I think it's worth trying to understand this, as it has advantages over running operations procedurally, in Python code.All of those Python classes start with a PipelineConfig
. This is usually create from a file like:
from kiara.models.module.pipeline import PipelineConfig
pc = PipelineConfig.from_file("/home/markus/projects/kiara/kiara_plugin.tabular/examples/pipelines/init.yaml")
dbg(pc)
But also check out the other Classmethods of PipelineConfig
to get an idea how to create one for your specific circumstance.
The other way to get such a config would be from the get_operation
kiara API endpoint, if you know the operation is a pipeline (and not created from a KiaraModule
) class:
op = kiara.get_operation("logic.nand")
pc = op.module.config
print(type(pc))
dbg(pc)
Once we have such a PipelineConfig
instance, we can create a PipelineStructure
from it:
ps = pc.structure
print(type(ps))
Check the classes source code for every method you can have access to, but some of the interesting ones are:
# all step ids
print(f"pipeline steps: {list(ps.step_ids)}")
# a list of lists, each 'root' list representing a stage, and each element in that list
# the steps contained therein
print(f"pipeline stages: {ps.processing_stages}")
for step in ps.steps:
print(f"Step: {step.step_id}")
dbg(step)
In the streamlit demo I showed this is used to automatically render the input forms, one page per stage, for example.
Tangential to what is explained here, there are also other pipeline-related API endpoints, for example to register external pipeline files into the current kiara context. If you want to do more with pipelines, make sure to read through the source code there to get an idea what you can do currently. And tell me if there is more you'd like to do.
Now comes the interesting part, the Pipeline
class itself.
This is easiest created using either a PipelineConfig
or PipelineStructure
instance:
from kiara.models.module.pipeline.pipeline import Pipeline
pipeline = Pipeline.create_pipeline(kiara=kiara, pipeline=pc)
print(type(pipeline))
# or:
pipeline = Pipeline.create_pipeline(kiara=kiara, pipeline=pc)
print(type(pipeline))
dbg(pipeline)
This class method also accepts a string or Python mapping, same as we used above to create the PipelineConfig
, so technically we could have just used this straight away, but I wanted to make sure there is a bit of context of how the classes play together.
The Pipeline
class uses a bit of a hybrid strategy in terms of managing state. It contains the pipeline structure itself (which is immutable at this point), but also holds references to all it's inputs, and intermediate results and outputs (if there are any already). But it does not do any state-management itself, for that it needs a PipelineController
, which checks whether a step or stage (or the whole pipeline) is ready to run, runs it, sets outputs etc.
As you can see from the output of the dbg
statement, the pipeline not only contains the structure, but also those inputs and outputs. Don't be confused by the 'value_ids' for fields that are not set, those are just values with the 'is_set' property `False.
All of this is fairly involved, because it gives developers a high level of control. There are different implementations of PipelineController
s, the most basic and easiest to use being the SinglePipelineBatchController. I'll not explain all of this in too much detail, because I'm not sure at all anyone will actually use it. But suffice to say, if you do want to, let me know and I can write up more details following your questions, or help in some other way.
A simple end-to-end example how this would be used in code is attached to this issue. Of course, the 'logic.nand' pipeline is trivial bordering on boring, but that should make it easier to follow along. Try the same thing with 'logic.xor', or create your own pipeline to see this whole thing become much more interesting and (IMHO) useful.
Hm, zip file is stupid, for reference, here's the source code again directly:
from kiara.api import KiaraAPI
from kiara.models.module.pipeline.controller import SinglePipelineBatchController
from kiara.models.module.pipeline.pipeline import Pipeline
from kiara.utils.cli import terminal_print
kiara = KiaraAPI.instance()
ps = kiara.get_pipeline_structure("logic.nand")
pipeline = Pipeline.create_pipeline(kiara=kiara, pipeline=ps)
terminal_print(pipeline, in_panel="Pipeline details after creation")
# the `job_registry` argument is a bit of a leaky abstraction, but I'm not sure
# it's worth cleaning that one up. Just be aware that this part of the code
# could change at some point (with notice of course)
controller = SinglePipelineBatchController(job_registry=kiara.context.job_registry, pipeline=pipeline)
changed = pipeline.set_pipeline_input(pipeline_input_field="a", input_value=True)
# the result of this is some information how the internal state of the pipeline changed
# because of your input, check that out at your leasure bit in most cases its not important
# now lets check the pipeline state
terminal_print(pipeline, in_panel="Status after first input")
# as you can see, the 'a' field now shows a valid input status, but overall
# the pipeline state is still invalid, since we need one more input:
changed = pipeline.set_pipeline_input(pipeline_input_field="b", input_value=True)
# again, look at the internal state
terminal_print(pipeline, in_panel="Status after second input")
# now we see that step 'and' has a new status: `inputs ready`
# this means we can now kick off processing
# the callback is optional, but it lets us see what is happening,
# which is sometimes useufl
callback_output = []
callback = lambda x: callback_output.append(x)
job_ids = controller.process_pipeline(event_callback=callback)
terminal_print(callback_output, in_panel="Pipeline processing log")
# now lets look at the result
terminal_print(job_ids, in_panel="Processed job ids")
# this is a map with jos that where run, we could have a look at the job records if we wanted:
for step_id, job_id in job_ids.items():
job = kiara.get_job(job_id)
# here we could also see if there where any errors while processing, for example
terminal_print(job, in_panel=f"Processing details for step: {step_id}")
# but much more interestingly, let's look at the state of our pipeline:
terminal_print(pipeline, in_panel="Pipeline state after processing")
# as you can see, this seemed to have processed everything (which makes sense, because
# we supplied every pipeline_input with a valid, and each of those values was valid
# in the context of the pipelines modules
# we can look at every aspect of this pipeline, for example what is the internal state of
# the `and` step:
details = pipeline.get_step_details("and")
terminal_print(details, in_panel="Step details: and")
# and what is the intermediate result (it's 'y' output field
intermediate_outputs = pipeline.get_current_step_outputs("and")
values = kiara.get_values(**intermediate_outputs)
terminal_print(values, in_panel="Current step output(s) for step: and")
# which would be the same as the current input of the 2nd stage step 'not':
values = current_step_input = pipeline.get_current_step_inputs('not')
values = kiara.get_values(**intermediate_outputs)
terminal_print(values, in_panel="Current step input(s) for step: not")
This issue contains an overview of the central data structure in kiara, the pipeline, and associated features related to decoratively describe a data workflow, the central concept around which kiara is built.