tenzir / public-roadmap

The public roadmap of Tenzir
https://docs.tenzir.com/roadmap
4 stars 0 forks source link

Run Pipelines from Python #124

Open dominiklohmann opened 10 months ago

dominiklohmann commented 10 months ago

To enable downstream usage of results of a pipeline in Python, we want to adjust our Python bindings to run any kind of pipeline.

Specifically, we envision three mechanisms here:

  1. The option to run a closed pipeline, and to wait for it to finish.
  2. The option to run a pipeline that ends with bytes, with the ability to read byte chunks incrementally.
  3. The option to run a pipeline that ends with events, with the ability to read Pandas dataframes incrementally.

For the scope of this roadmap item, we are fine with the restriction that the Python bindings require a tenzir binary to be locally available and want to restrict ourselves to mechanism (3).

In a meeting between @dominiklohmann, @tobim, @mavam and @lava we agreed on the following pseudocode interface for the bindings:

from tenzir import Pipeline

# Streaming interface returning one batch at a time:
pipeline = Pipeline("export | where #schema == \"suricata.dns\"")
generator: AsyncGenerator[Tuple[str, pyarrow.Table]] = pipeline.exec(endpoint="localhost:5000")
for schema, arrow_table in generator.exec():
    pd = arrow_table.to_pandas()
# The streaming interface effectively maps onto this CLI:
#   tenzir \
#     --endpoint="localhost:5000" \
#     --implicit-events-sink="to stdout write python-ipc" \
#     --implicit-bytes-sink='' \
#     'export | where #schema == "suricata.dns"'

# Oneshot interface returning one table per schema all at once:
dataframe: Tuple[str, pyarrow.Table] = Pipeline("...").exec().collect()

# Future node API (not scope of this roadmap item)
node = tenzir.Node(tenant_id="t-xxx", node_id="n-yyy", access_key="xxx")
node.deploy(pipeline)

# Future async dataframe API (not scope of this roadmap item)
dataframe: Awaitable[Tuple[str, pyarrow.Table]] = generator.collect_async() 
### Definition of Done
- [ ] Agree on the transport medium for events
- [ ] Agree on the desired API for the Python bindings
- [ ] Implement the required changes
lava commented 10 months ago

In a meeting between @rdettai, @tobim and me, @rdettai brought up the point that making the API return an AsyncGenerator by default:

generator: AsyncGenerator[Tuple[str, pyarrow.Table]] = pipeline.exec(endpoint="localhost:5000")

will potentially make implementation much more complicated since pyarrow has no support for async operations. A regular Generator would already suffice for the purposes of working with the pipeline results in a notebook.

We also agreed that it would probably make sense to add some minimal framing (in particular a length prefix and schema hash) to the planned bitz wire format, so that we can transport individual table frames without having to rely on the arrow readers.