DataLinkDC / dinky

Dinky is a real-time data development platform based on Apache Flink, enabling agile data development, deployment and operation.
http://www.dinky.org.cn
Apache License 2.0
3.08k stars 1.13k forks source link

When will dinky's flink sql support flink sql cep? #3845

Open jxchanghe opened 6 days ago

jxchanghe commented 6 days ago

Search before asking

Description

No response

Use case

No response

Related issues

No response

Are you willing to submit a PR?

Code of Conduct

Zzm0809 commented 6 days ago

现在就能 直接写

jxchanghe commented 6 days ago

现在就能 直接写

我在Dinky的Flink sql作业开发窗口,用 LAST,FIRST 这种函数,检查时就通不过,报calcite解析不了。直接用sql-client去跑,报同样的错误。用的是最新版的Dinky, Flink用的是1.18版本的

Zzm0809 commented 5 days ago

You can write directly now

When I use functions such as LAST and FIRST in the Flink sql job development window of Dinky, they fail to pass the check and the report is that calcite cannot be parsed. I ran it directly with sql-client and got the same error. I am using the latest version of Dinky, and Flink is using version 1.18.

Then what you wrote is wrong> > 现在就能 直接写

我在Dinky的Flink sql作业开发窗口,用 LAST,FIRST 这种函数,检查时就通不过,报calcite解析不了。直接用sql-client去跑,报同样的错误。用的是最新版的Dinky, Flink用的是1.18版本的

那你写的不对, 原生 sql-client 都语法校验不通过,说明你的 cep sql 有问题

如下可以正常提交(此demo 根据 flink 官网样例而来,此处不注重结果,只注重逻辑), 请对比你的 sql 是否存在问题 链接: 模式匹配


DROP table if EXISTS Ticker;
CREATE TABLE if not EXISTS Ticker (
    symbol STRING,
    price BIGINT,
    tax BIGINT,
    rowtime as proctime()
) WITH (
    'connector' = 'datagen',
    'rows-per-second'='5',
    'fields.symbol.kind'='random',
    'fields.symbol.length'='5',
    'fields.price.min'='100',
    'fields.price.max'='500',
    'fields.tax.min'='0',
    'fields.tax.max'='50'
);

SELECT *
FROM Ticker
    MATCH_RECOGNIZE (
        PARTITION BY symbol
        ORDER BY rowtime
        MEASURES
            START_ROW.rowtime AS start_tstamp,
            LAST(PRICE_DOWN.rowtime) AS bottom_tstamp,
            LAST(PRICE_UP.rowtime) AS end_tstamp
        ONE ROW PER MATCH
        AFTER MATCH SKIP TO LAST PRICE_UP
        PATTERN (START_ROW PRICE_DOWN+ PRICE_UP)
        DEFINE
            PRICE_DOWN AS
                (LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR
                    PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1),
            PRICE_UP AS
                PRICE_UP.price > LAST(PRICE_DOWN.price, 1)
    ) MR;
jxchanghe commented 4 days ago

You can write directly now

When I use functions such as LAST and FIRST in the Flink sql job development window of Dinky, they fail to pass the check and the report is that calcite cannot be parsed. I ran it directly with sql-client and got the same error. I am using the latest version of Dinky, and Flink is using version 1.18.

Then what you wrote is wrong> > 现在就能 直接写

我在Dinky的Flink sql作业开发窗口,用 LAST,FIRST 这种函数,检查时就通不过,报calcite解析不了。直接用sql-client去跑,报同样的错误。用的是最新版的Dinky, Flink用的是1.18版本的

那你写的不对, 原生 sql-client 都语法校验不通过,说明你的 cep sql 有问题

如下可以正常提交(此demo 根据 flink 官网样例而来,此处不注重结果,只注重逻辑), 请对比你的 sql 是否存在问题 链接: 模式匹配

DROP table if EXISTS Ticker;
CREATE TABLE if not EXISTS Ticker (
    symbol STRING,
    price BIGINT,
    tax BIGINT,
    rowtime as proctime()
) WITH (
    'connector' = 'datagen',
    'rows-per-second'='5',
    'fields.symbol.kind'='random',
    'fields.symbol.length'='5',
    'fields.price.min'='100',
    'fields.price.max'='500',
    'fields.tax.min'='0',
    'fields.tax.max'='50'
);

SELECT *
FROM Ticker
    MATCH_RECOGNIZE (
        PARTITION BY symbol
        ORDER BY rowtime
        MEASURES
            START_ROW.rowtime AS start_tstamp,
            LAST(PRICE_DOWN.rowtime) AS bottom_tstamp,
            LAST(PRICE_UP.rowtime) AS end_tstamp
        ONE ROW PER MATCH
        AFTER MATCH SKIP TO LAST PRICE_UP
        PATTERN (START_ROW PRICE_DOWN+ PRICE_UP)
        DEFINE
            PRICE_DOWN AS
                (LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR
                    PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1),
            PRICE_UP AS
                PRICE_UP.price > LAST(PRICE_DOWN.price, 1)
    ) MR;

可以了。感谢。确实是cep sql写的有问题。建议文档中着重补充下flink cep sql相关部分。搜索网上一些最近的帖子中,有的还说dinky的flink sql目前不支持cep。