pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.72k stars 2k forks source link

[Time series] Add functions to analyze metrics counters and gauges #3616

Closed volodymyrprokopyuk closed 2 years ago

volodymyrprokopyuk commented 2 years ago

Thank you for the amazing and highly useful work you are doing!

Motivation

Polars data frames + Parquet data files in a cloud object store can be a simpler and much more convenient solution to time series analysis in general and metrics (e. g. Prometheus) analysis in particular compared to TimescaleDB that requires running a PostgreSQL server which quite frequently is an overkill if you only need to generate ad-hoc metrics analysis reports.

Could you consider, please, extend Polars' support for time series analysis in general and metrics counter and gauge analysis in particular? Polars already have groupby_dynamic and groupby_rolling.

Behavior

Right now I miss functions for working with metrics counters

Overall Polars' time series capabilities can take inspiration from the TimescaleDB hyperfunctions to provide ubiquitous time series analysis out of the box.

Thank you for your excellent job!

ritchie46 commented 2 years ago

Thanks I am doing my best. ;)

Could you explain a bit what features you want, I am not a time-series expert, so the counter_agg is not immediately clear to me.

Maybe a 10 row dataframe as input and the expected result might help me.

volodymyrprokopyuk commented 2 years ago

Hi @ritchie46!

Below follows a simple TimescaleDB example along with my reasoning that showcases the counter_agg, delta, time_delta and rate functions used to analyze metric counters. Metric counter is an ever increasing time series (e. g. number of processed API requests), but sometimes a counter value may restart (e. g. due to API server crash). See the below explanation for an example

SQL code

BEGIN;

CREATE TABLE metric (
  ts timestamptz NOT NULL,
  val integer NULL,
  PRIMARY KEY (ts)
);

SELECT create_hypertable('metric', 'ts');

INSERT INTO metric (ts, val) VALUES
('2022-06-08T00:00:01', 0),
('2022-06-08T00:00:06', 2),
('2022-06-08T00:00:11', 3),
('2022-06-08T00:00:16', 6),
('2022-06-08T00:00:21', 2), -- counter reset
('2022-06-08T00:00:26', 4),
('2022-06-08T00:00:31', 7),
('2022-06-08T00:00:36', 9),
('2022-06-08T00:00:41', 3), -- counter reset
('2022-06-08T00:00:46', 4),
('2022-06-08T00:00:51', 6),
('2022-06-08T00:00:56', 7);

SELECT time_bucket('10 sec', m.ts) tsb, min(m.val), max(m.val),
  delta(counter_agg(m.ts, m.val)),
  time_delta(counter_agg(m.ts, m.val)),
  rate(counter_agg(m.ts, m.val))
FROM metric m
GROUP BY tsb ORDER BY tsb;

ROLLBACK;

Execution result

          tsb           │ min │ max │ delta │ time_delta │ rate
────────────────────────┼─────┼─────┼───────┼────────────┼──────
 2022-06-08 00:00:00+00 │   0 │   2 │     2 │          5 │  0.4
 2022-06-08 00:00:10+00 │   3 │   6 │     3 │          5 │  0.6
 2022-06-08 00:00:20+00 │   2 │   4 │     2 │          5 │  0.4
 2022-06-08 00:00:30+00 │   7 │   9 │     2 │          5 │  0.4
 2022-06-08 00:00:40+00 │   3 │   4 │     1 │          5 │  0.2
 2022-06-08 00:00:50+00 │   6 │   7 │     1 │          5 │  0.2
(6 rows)

Counter reset handling

0,  2,  3,  6,  2,  4,  7,  9,  3,  4,  6,  7 <= counter with 2 resets (as captured from a metric endpoint)
0,  2,  3,  6,  8, 10, 13, 15, 18, 19, 21, 22 <= ever increasing counter (after counter_agg)
    2,      3,      2,      2,      1,      1 <= counter delta for the 10 secs time bucket

I hope this helps!

Thank you!

cbilot commented 2 years ago

Let me see if I can help by expressing these concepts in Polars. Let me know if I'm on the right track. =)

We would load your data in your example into Polars as:

import polars as pl
df = pl.DataFrame(
    data=[
        ("2022-06-08T00:00:01", 0),
        ("2022-06-08T00:00:06", 2),
        ("2022-06-08T00:00:11", 3),
        ("2022-06-08T00:00:16", 6),
        ("2022-06-08T00:00:21", 2),
        ("2022-06-08T00:00:26", 4),
        ("2022-06-08T00:00:31", 7),
        ("2022-06-08T00:00:36", 9),
        ("2022-06-08T00:00:41", 3),
        ("2022-06-08T00:00:46", 4),
        ("2022-06-08T00:00:51", 6),
        ("2022-06-08T00:00:56", 7),
    ],
    columns=["ts", "val"],
).with_column(pl.col("ts").str.strptime(pl.Datetime).keep_name())
df
shape: (12, 2)
┌─────────────────────┬─────┐
│ ts                  ┆ val │
│ ---                 ┆ --- │
│ datetime[μs]        ┆ i64 │
╞═════════════════════╪═════╡
│ 2022-06-08 00:00:01 ┆ 0   │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ 2022-06-08 00:00:06 ┆ 2   │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ 2022-06-08 00:00:11 ┆ 3   │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ 2022-06-08 00:00:16 ┆ 6   │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ 2022-06-08 00:00:21 ┆ 2   │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ 2022-06-08 00:00:26 ┆ 4   │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ 2022-06-08 00:00:31 ┆ 7   │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ 2022-06-08 00:00:36 ┆ 9   │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ 2022-06-08 00:00:41 ┆ 3   │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ 2022-06-08 00:00:46 ┆ 4   │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ 2022-06-08 00:00:51 ┆ 6   │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ 2022-06-08 00:00:56 ┆ 7   │
└─────────────────────┴─────┘

From here, we want to capture any time the counter (val) resets (i.e., rolls backward in value). We can accomplish this in Polars using the diff expression nested inside a when/then/otherwise.

df = df.with_column(
    pl.when(pl.col("val").diff() >= 0)
    .then(pl.col("val").diff())
    .otherwise(pl.col("val"))
    .alias("delta_resets")
).with_column(pl.col("delta_resets").cumsum().alias("cum_counter"))
df
shape: (12, 4)
┌─────────────────────┬─────┬──────────────┬─────────────┐
│ ts                  ┆ val ┆ delta_resets ┆ cum_counter │
│ ---                 ┆ --- ┆ ---          ┆ ---         │
│ datetime[μs]        ┆ i64 ┆ i64          ┆ i64         │
╞═════════════════════╪═════╪══════════════╪═════════════╡
│ 2022-06-08 00:00:01 ┆ 0   ┆ 0            ┆ 0           │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-06-08 00:00:06 ┆ 2   ┆ 2            ┆ 2           │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-06-08 00:00:11 ┆ 3   ┆ 1            ┆ 3           │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-06-08 00:00:16 ┆ 6   ┆ 3            ┆ 6           │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-06-08 00:00:21 ┆ 2   ┆ 2            ┆ 8           │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-06-08 00:00:26 ┆ 4   ┆ 2            ┆ 10          │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-06-08 00:00:31 ┆ 7   ┆ 3            ┆ 13          │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-06-08 00:00:36 ┆ 9   ┆ 2            ┆ 15          │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-06-08 00:00:41 ┆ 3   ┆ 3            ┆ 18          │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-06-08 00:00:46 ┆ 4   ┆ 1            ┆ 19          │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-06-08 00:00:51 ┆ 6   ┆ 2            ┆ 21          │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-06-08 00:00:56 ┆ 7   ┆ 1            ┆ 22          │
└─────────────────────┴─────┴──────────────┴─────────────┘

The values in the cum_counter column should match your row labeled ever increasing counter (after counter_agg).

0, 2, 3, 6, 8, 10, 13, 15, 18, 19, 21, 22 <= ever increasing counter (after counter_agg)

From this point, I believe we can capture the values in the larger table as follows:

df.groupby_dynamic(
    index_column="ts", every="10s", period="10s", offset="-1s", include_boundaries=True
).agg(
    [
        pl.col("val").min().alias("min_val"),
        pl.col("val").max().alias("max_val"),
        (pl.col("cum_counter").max() - pl.col("cum_counter").min()).alias("delta"),
        (pl.col("ts").max() - pl.col("ts").min()).alias("time_delta"),
    ]
).with_column(
    (pl.col("delta") / pl.col("time_delta").cast(pl.Int64) * 1_000_000).alias("rate")
)
shape: (6, 8)
┌─────────────────────┬─────────────────────┬─────────────────────┬─────────┬─────────┬───────┬──────────────┬──────┐
│ _lower_boundary     ┆ _upper_boundary     ┆ ts                  ┆ min_val ┆ max_val ┆ delta ┆ time_delta   ┆ rate │
│ ---                 ┆ ---                 ┆ ---                 ┆ ---     ┆ ---     ┆ ---   ┆ ---          ┆ ---  │
│ datetime[μs]        ┆ datetime[μs]        ┆ datetime[μs]        ┆ i64     ┆ i64     ┆ i64   ┆ duration[μs] ┆ f64  │
╞═════════════════════╪═════════════════════╪═════════════════════╪═════════╪═════════╪═══════╪══════════════╪══════╡
│ 2022-06-07 23:59:59 ┆ 2022-06-08 00:00:09 ┆ 2022-06-08 00:00:00 ┆ 0       ┆ 2       ┆ 2     ┆ 5 seconds    ┆ 0.4  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2022-06-08 00:00:09 ┆ 2022-06-08 00:00:19 ┆ 2022-06-08 00:00:10 ┆ 3       ┆ 6       ┆ 3     ┆ 5 seconds    ┆ 0.6  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2022-06-08 00:00:19 ┆ 2022-06-08 00:00:29 ┆ 2022-06-08 00:00:20 ┆ 2       ┆ 4       ┆ 2     ┆ 5 seconds    ┆ 0.4  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2022-06-08 00:00:29 ┆ 2022-06-08 00:00:39 ┆ 2022-06-08 00:00:30 ┆ 7       ┆ 9       ┆ 2     ┆ 5 seconds    ┆ 0.4  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2022-06-08 00:00:39 ┆ 2022-06-08 00:00:49 ┆ 2022-06-08 00:00:40 ┆ 3       ┆ 4       ┆ 1     ┆ 5 seconds    ┆ 0.2  │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2022-06-08 00:00:49 ┆ 2022-06-08 00:00:59 ┆ 2022-06-08 00:00:50 ┆ 6       ┆ 7       ┆ 1     ┆ 5 seconds    ┆ 0.2  │
└─────────────────────┴─────────────────────┴─────────────────────┴─────────┴─────────┴───────┴──────────────┴──────┘

(I've included boundary values, just to show which values are included in each time segment.)

Does the above correctly capture the concepts of a counter_agg and a rate?

volodymyrprokopyuk commented 2 years ago

@cbilot you are the master! Thank you for your time and detailed step-by-step instructions. Your solution correctly implements the requested semantics for counter_agg and rate!

Let me ask few related questions (I've just started using Node.js bindings for Polars, but quite enthusiastic about it).

Thank you very much!

cbilot commented 2 years ago

Parallelization

In order to take advantage of parallel execution all computations must use exclusively use Polar expressions. User-defined functions in the host language (e. g. Python, JavaScript) won't work. Is this correct?

That is essentially correct. Here's what the Polars Cookbook says under the heading "Do not kill the parallelization!"

We have all heard that Python is slow, and does "not scale." Besides the overhead of running "slow" bytecode, Python has to remain within the constraints of the Global Interpreter Lock (GIL). This means that if you were to use a lambda or a custom Python function to apply during a parallelized phase, Polars speed is capped running Python code preventing any multiple threads from executing the function.

This all feels terribly limiting, especially because we often need those lambda functions in a .groupby() step, for example. This approach is still supported by Polars, but keeping in mind bytecode and the GIL costs have to be paid.

To mitigate this, Polars implements a powerful syntax defined not only in its lazy API, but also in its eager API.

The above is Python-specific. Nonetheless, I highly recommend "staying within the Polars API" - that is, try to express your objectives using the Expressions API, without using apply or map to call custom host language functions or functions in external libraries. This gives Polars the greatest chance to parallelize your calculations.

Abstraction mechanisms

The proposed solutions could get quite verbose in terms of Polar expressions and difficult to apply repeatedly. What abstraction mechanisms does Polar provides? Can the host languages be used to abstract out the computations expressed in Polar expressions? Will such abstractions impact parallelization and hence performance? How will look the most concise implementation of the proposed solution in terms of such abstractions?

You can certainly use the host language to abstract and automate your Expressions and code generation. I regularly use Python code to generate lists of Expressions that I need - outside of any context. For example, in this StackOverflow response, I dynamically generate a list of expressions that I will use in a later with_columns context.

expr_list = [
    pl.col("txt")
    .str.extract_all(r".{10}" + search_term + r".{10}")
    .alias("col_" + str(col_num))
    for col_num, search_term in enumerate(search_terms)
]

result = (
    txt_df.with_columns(expr_list)
    .select(pl.exclude("txt"))
    .melt()
    .select(["value"])
    .with_column(pl.Series(values=search_terms).alias("search_item"))
    .explode("value")
)

In this instance, the generation of the list of expressions does not significantly impact performance. It is simply to generate the necessary Polars Expressions, but does not use apply or map to apply external calculations to the data itself.

Pipelines

Another really easy tool are pipelines. This blog post contains several examples of using the pipe method to abstract, compartmentalize, and reuse code. For example (from the blog post):

(df
 .pipe(set_types)
 .pipe(sessionize, threshold=20 * 60 * 1000)
 .pipe(add_features)
 .pipe(remove_bots, threshold=24))

Each pipe calls a function that take a DataFrame (plus parameters) and returns a DataFrame. For example, the add_features function above is defined as follows:

def add_features(dataf):
    return (dataf
             .with_columns([
                 pl.col("char").count().over("session").alias("session_length"),
                 pl.col("session").n_unique().over("char").alias("n_sessions")
             ]))

Using this method, you could easily create functions that generate and append counter_agg and rate columns to a DataFrame.

Node-JS Development

I'm not familiar with the Node-JS development. (Indeed, I'm merely a member of the Polars user community - not a member of the Polars development team.) I would recommend checking out the Discord channel listed on the front page. Both users and developers chat and exchange information. You may find answers to your questions about the Node-JS development there.

Going forward

I would recommend reading the Cookbook (if you haven't already). And familiarizing yourself with the Expressions API. (Expressions are the heart of fast performance in Polars.)

If you get stuck trying to implement something, feel free to post your question on Stack Overflow, tagged with [python-polars] and/or [nodejs-polars]. (The Polars development team prefers that questions are posted to Stack Overflow, not to GitHub.) You'll get the best response on Stack Overflow if you include some sample data, the result you are trying to achieve, and some code that shows what you attempted. Indeed, looking through existing questions/responses on Stack Overflow will provide some very useful tips and tricks when coding with Polars. (I myself learned a great deal that way.)

Good luck!

volodymyrprokopyuk commented 2 years ago

Thank you, @cbilot for the explanation! I'm reading the Cookbook and the Expression API documentation and I'll post my Polars related questions on StackOverflow.

For now I close the issue as there is a solution to my problem even it is not packaged to as a native time series specific Polars function, favoring generic more verbose approach over bloating Polars interface with time series specific functions. However I think that extending Polars with time series analysis functions would be a great addition to the toolbox that will bring more users to Polars.