ArroyoSystems / arroyo

Distributed stream processing engine in Rust
https://arroyo.dev
Apache License 2.0
3.81k stars 222 forks source link

Unable to use `UNNEST` on json arrays #755

Closed ahassany closed 1 month ago

ahassany commented 1 month ago

I'm trying to unnest a JSON array, however, with current supported json functions https://doc.arroyo.dev/sql/scalar-functions/json I'm unable to find one that produces a JSON array.

I tried several things however, they all failed.

CREATE TABLE flows (
    ts TIMESTAMP NOT NULL,
    payload JSON,
    watermark TIMESTAMP GENERATED ALWAYS AS (ts - INTERVAL '10' SECOND) STORED

) WITH (
  connector = 'kafka',
  format = 'json',
  bootstrap_servers = 'kafka-1:19091',
  topic = 'raw-flows',
  type = 'source',
  'source.offset' = 'earliest',
  'source.read_mode' = 'read_uncommitted',
  event_time_field = 'ts',
  watermark_field = 'watermark'
);

SELECT
    ts,
    json_get_int(payload, 'sequence_number') AS sequence_number,
    UNNEST(json_get_json(payload, 'sets')) AS flow_set
FROM
    flows;

Got the the error: Error during planning: unnest may only be called on arrays

And with casting I got:

SELECT
    ts,
    json_get_int(payload, 'sequence_number') AS sequence_number,
    UNNEST(CAST(extract_json(flows.payload, 'sets') AS JSON ARRAY)) as flow_set
FROM
    flows;

Got the error: SQL error: ParserError("Expected ), found: ARRAY at Line: 22, Column 61")

mwylde commented 1 month ago

Apologies for the confusion here, as this behavior is not well documented.

We have two sets of JSON functions (https://doc.arroyo.dev/sql/scalar-functions/json), jsonget which were added in 0.12 and the older extract_json and extract_json_string functions. The `jsonget` functions are much faster and generally easy to use, but they aren't capable of returning arrays currently.

So here we would want to use extract_json. However, JSON ARRAY isn't a valid SQL data type so that's why you're getting that syntax error—you want TEXT[] instead. In this case casting is also unnecessary, because extract_json always returns an array of all matches. You should also note extract_json takes a JSONPath as its second argument. So you might try something like

SELECT
    ts,
    json_get_int(payload, 'sequence_number') AS sequence_number,
    UNNEST(extract_json(flows.payload, '$.sets')) as flow_set
FROM
    flows;

When working with the JSON functions, I'd recommend playing around with them directly in preview (like running SELECT extract_json(payload, '$.sets) by itself) to get a feeling for the outputs.

We hope to add support for arrays to json_get in the next release and deprecate the extract_json functions so that this is less confusing.

ahassany commented 1 month ago

Thank you @mwylde that explains it.