pydantic / logfire

Uncomplicated Observability for Python and beyond! 🪵🔥
https://docs.pydantic.dev/logfire/
MIT License
1.67k stars 45 forks source link

We should build a polars instrumentation #71

Open samuelcolvin opened 2 months ago

samuelcolvin commented 2 months ago

it would give you query/operation times, and the query plan, see this tweet 🐦 .

Richie says:

I don't have the full context, but the query plan can be serialized to json or visualized via .explain.

adriangb commented 2 months ago

So we probably hook into .collect() for LazyFrame? Really it would be nice for polars to provide a global register of hooks to call when it does work.

samuelcolvin commented 2 months ago

cc @ritchie46 in case you have any thoughts?

adriangb commented 2 months ago

To be clear that integration doesn't have to be logfire specific in any way. It could just be something like:

class PolarsTrace:
    def on_start(self, **payload) -> None: ...
    def on_end(self, **payload) -> None: ...

class PolarsTracer:
    def start_span(self) -> PolarsTrace: ...

I guess the question is what goes in payload, I'd say anything thats "free" to get (presumably a plan, execution time, etc.).

ritchie46 commented 2 months ago

I am not entirely sure I understand what a trace is exactly yet.

Things that might be interesting. .profile gives you the result and a DataFrame containing the timings per operations (optimizer, join, grouping etc).

pl.LazyFrame({"foo": [1, 2, 3]}).serialize() gives you JSON repr of the logical plan, which can be reconstructed later (this is cheap as longs as there are no real in-memory tables in the plan).

Other than that there is a hook where you can get a hold of the IR post optimization (https://github.com/pola-rs/polars/pull/15972). Though this is very much leaking internals. We add it so that we can hook cudf in, but I am not sure it would be wise to build upon this.

adriangb commented 2 months ago

I am not entirely sure I understand what a trace is exactly yet.

A trace is really similar to a log statement. The only differences is that it has a start and end (and hence a duration) and a context (where in the execution of the program it started and where it ended).

The LazyFrame.profile() information seems like the kind of thing we'd want. Here's a hacky version:

from typing import Any
import polars as pl

class Tracer:
    def on_collect(self, input: pl.LazyFrame, output: pl.DataFrame, profile: pl.DataFrame, plan: dict[str, Any]) -> None:
        # this would be sent to a remote server not just printed
        print(profile)

def _collect_patch(df: pl.LazyFrame, *args: Any, **kwargs: Any) -> pl.DataFrame:
    plan = df.serialize()
    res, stats = df.profile(*args, **kwargs)
    for tracer in pl.tracers:
        tracer.on_collect(df, res, stats, plan)
    return res

pl.tracers = [Tracer()]
pl.LazyFrame.collect = _collect_patch

df = pl.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]}).lazy()

print(df.sort('a').collect())

The idea would be for polars to provide a hook so that we don't need to monkey patch like this.

I assume a LazyFrame computes a plan on each call chain (e.g. .sort() creates a new LazyFrame with a new plan) so a hook for that that only calls with the .serialize() result would make sense as well.

Is it possible to get similar information for each step of execution to a DataFrame?