ArroyoSystems / arroyo

Distributed stream processing engine in Rust
https://arroyo.dev
Apache License 2.0
3.67k stars 206 forks source link

Arroyo throws "This feature is not implemented" when attempting to insert data from a CTE #653

Open GoldToothRichards opened 3 months ago

GoldToothRichards commented 3 months ago

Bug

The following query runs from the Arroyo UI, and allows you to see a working preview:

WITH trades as (
    SELECT
        extract_json_string(value, '$.source') as source,
        extract_json_string(value, '$.base') as base,
        extract_json_string(value, '$.quote') as quote,
        extract_json_string(value, '$.exchange') as exchange,
        CAST(array_element(extract_json(value, '$.volume_base'), 1) AS DOUBLE) as volume_base,
        CAST(array_element(extract_json(value, '$.volume_quote'), 1) AS DOUBLE) as volume_quote,
        CAST(array_element(extract_json(value, '$.price'), 1) AS DOUBLE) as price,
        extract_json_string(value, '$.timestamp') as 'timestamp'
    FROM coincap_crypto_trades_source
),
prices as (
    SELECT
        TUMBLE(interval '1 second') as window,
        trades.source as source,
        trades.base as base,
        trades.quote as quote,
        trades.exchange as exchange,
        SUM(trades.volume_base) AS total_volume_base,
        SUM(trades.volume_quote) AS total_volume_quote
    FROM trades
    GROUP BY
        window,
        trades.source,
        trades.base,
        trades.quote,
        trades.exchange
)

-- INSERT INTO coincap_crypto_prices_sink
SELECT
    prices.source as source,
    prices.base as base,
    prices.quote as quote,
    prices.exchange as exchange,
    prices.total_volume_base as total_volume_base,
    prices.total_volume_quote as total_volume_quote,
    prices.total_volume_quote / prices.total_volume_base as vwap,
    prices.window as window
FROM prices;

However, if you un-comment the INSERT line, it will throw a validation error saying "This feature is not implemented".

Workaround

By using CREATE VIEW statements instead of WITH statements, you can get a working preview while including the INSERT statement:

CREATE VIEW trades as (
    SELECT
        extract_json_string(value, '$.source') as source,
        extract_json_string(value, '$.base') as base,
        extract_json_string(value, '$.quote') as quote,
        extract_json_string(value, '$.exchange') as exchange,
        CAST(array_element(extract_json(value, '$.volume_base'), 1) AS DOUBLE) as volume_base,
        CAST(array_element(extract_json(value, '$.volume_quote'), 1) AS DOUBLE) as volume_quote,
        CAST(array_element(extract_json(value, '$.price'), 1) AS DOUBLE) as price,
        extract_json_string(value, '$.timestamp') as 'timestamp'
    FROM coincap_crypto_trades_source
);

CREATE VIEW prices as (
    SELECT
        TUMBLE(interval '1 second') as window,
        trades.source as source,
        trades.base as base,
        trades.quote as quote,
        trades.exchange as exchange,
        SUM(trades.volume_base) AS total_volume_base,
        SUM(trades.volume_quote) AS total_volume_quote
    FROM trades
    GROUP BY
        window,
        trades.source,
        trades.base,
        trades.quote,
        trades.exchange
);

INSERT INTO coincap_crypto_prices_sink
SELECT
    prices.source as source,
    prices.base as base,
    prices.quote as quote,
    prices.exchange as exchange,
    prices.total_volume_base as total_volume_base,
    prices.total_volume_quote as total_volume_quote,
    prices.total_volume_quote / prices.total_volume_base as vwap,
    prices.window as window
FROM prices;
GoldToothRichards commented 3 months ago

Unfortunately, my pipeline still failed even after getting the preview working from the UI.

Logs

GoldToothRichards commented 3 months ago

Hey @mwylde. I tested this again from the master branch after the changes from 655. Everything is working as expected on my end now!