ibis-project / ibis

the portable Python dataframe library
https://ibis-project.org
Apache License 2.0
4.51k stars 549 forks source link

feat(api): Support pattern recognition #8252

Open mfatihaktas opened 4 months ago

mfatihaktas commented 4 months ago

Is your feature request related to a problem?

Pattern recognition in Flink enables searching for a set of event patterns in data streams. Flink comes with a complex event processing (CEP) library which allows for pattern detection in event streams. Flink consolidates CEP and SQL API using the MATCH_RECOGNIZE clause for complex event processing in SQL.

More information is available on the Flink doc.

Describe the solution you'd like

Ibis would support pattern recognition for Flink backend with an API similar to the following

table = con.create_table(...)

expr = table.match_recognize(
    measures= ...,
    partition_by= ...,
    order_by= ...,
    pattern= ...,
    ...,
)

Note: As can be seen in the doc, Flink provides a rich set of pattern matching functionality through MATCH_RECOGNIZE. It is likely that this issue will need to be addressed in multiple iterations.

Other backends

Supporting MATCH_RECOGNIZE:

Not supporting MATCH_RECOGNIZE:

Acceptance criteria

MATCH_RECOGNIZE components in Flink

In Flink, MATCH_RECOGNIZE consists of 6 components that are listed above with brief explanations. Copying an example SQL below to be used as a reference while going through the list.

SELECT *
FROM Ticker
    MATCH_RECOGNIZE(
        PARTITION BY symbol
        ORDER BY rowtime
        MEASURES
            C.rowtime AS dropTime,
            A.price - C.price AS dropDiff
        ONE ROW PER MATCH
        AFTER MATCH SKIP PAST LAST ROW
        PATTERN (A B* C) WITHIN INTERVAL '1' HOUR
        DEFINE
            B AS B.price > A.price - 10,
            C AS C.price < A.price - 10
    )
  1. Partitioning: Logical partitioning of the rows with PARTITION BY clause. Allows for dividing the rows into partitions and looking for patterns within each partition (similar to GROUP BY for aggregations). If partitioning is not used, MATCH_RECOGNIZE clause will be translated into a non-parallel operator to ensure global ordering, which won't be performant. [Flink doc]

  2. Ordering: Ordering of the rows with ORDER BY clause. The first argument to ORDER BY must be a time attribute with ascending ordering. [Flink doc]

  3. Output mode: Describes how many rows should be emitted for every found match. Currently only supports ONE ROW PER MATCH mode that will always produce one output summary row for each found match. [Flink doc]

  4. Define: DEFINE clause specifies conditions that rows have to fulfill in order to be classified to a corresponding pattern variable. If a condition is not defined for a pattern variable, a default condition will be used which evaluates to true for every row.

Every match pattern is constructed from basic building blocks, called pattern variables, to which operators (quantifiers and other modifiers) can be applied. The whole pattern must be enclosed in brackets.

An example pattern with pattern variables A, B, C, D could look like:

PATTERN (A B+ C* D)

The pattern composition is complex with multiple components:

  1. Measures: The MEASURES clause defines what will be included in the output of a matching pattern. It can project columns and define expressions for evaluation.

  2. After match strategy: AFTER MATCH SKIP clause specifies where to start a new matching procedure after a complete match was found.

There are four different strategies:

[Flink doc]

Other concerns that we might want to consider:

API design: match_recognize()

We would take all the required components of MATCH_RECOGNIZE clause through function arguments as

expr = table.match_recognize(
    pattern= ...,
    measures= ...,
    partition_by= ...,
    order_by= ...,
    after_match_strategy= ...,
    output_mode= ...,
)

Some of the components, such as partition_by, order_by, after_match_strategy and output_mode can be expressed with str or the existing Ibis types (e.g., ir.Column). However for pattern and measures, we need to provide new expression types because these are constructed with complex semantics. Pattern is defined in terms of pattern variables and the relation(s) over them and between them, e.g.

        MEASURES
            C.rowtime AS dropTime,
            A.price - C.price AS dropDiff
        PATTERN (A B* C) WITHIN INTERVAL '1' HOUR
        DEFINE
            B AS B.price > A.price - 10,
            C AS C.price < A.price - 10

TODO: What alternative API's can/should we consider? One alternative would be allowing to build match_recognize with subsequent . operators, e.g., expr = table.partition_by(...).order_by(...).pattern(...) ..., but I think this would be confusing for the users (what components are required or optional?) and would not fit the other functions in ibis/expr/api.py.

Code of Conduct

cpcloud commented 4 months ago

Generally to introduce new APIs, we like to do a survey of other systems to get an idea of whether it's something that is supportable across backends.

I think other systems support match_recognize, can you look into whether some of our non-Flink backends support this functionality?

mfatihaktas commented 4 months ago

I think other systems support match_recognize, can you look into whether some of our non-Flink backends support this functionality?

Sure, will do and share my findings here.

Edit: Updated the description with Other backends.