confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
102 stars 1.04k forks source link

earliest/latest_by_offset don't work with structs, maps, and arrays #5437

Closed MichaelDrogalis closed 2 years ago

MichaelDrogalis commented 4 years ago

Describe the bug

earliest/latest_by_offset are critical functions for converting changelogs to streams, but they don't work with all the data types. Namely, they don't support structs, maps or arrays. These functions are one of the only mechanisms for directly during a stream into a table, so the lack of coverage can really hurt.

To Reproduce

ksqlDB 0.9.0:

ksql> CREATE STREAM s1 (
>    k VARCHAR,
>    b STRUCT<
>        e VARCHAR,
>        f INT
>    >,                                                                                                                          
>    c MAP<VARCHAR, INT>,
>    d ARRAY<INT>
>) WITH (
>    kafka_topic = 's1',
>    partitions = 1,
>    value_format = 'avro',
>    key = 'k'
>);

 Message        
----------------
 Stream created 
----------------
ksql> CREATE TABLE t1 AS
>    SELECT k,
>           LATEST_BY_OFFSET(b),
>           LATEST_BY_OFFSET(c),
>           LATEST_BY_OFFSET(d)                                                                                                  
>    FROM s1
>    GROUP BY k
>    EMIT CHANGES;
Function 'LATEST_BY_OFFSET' does not accept parameters (STRUCT<E STRING, F INTEGER>).
Valid alternatives are:
LATEST_BY_OFFSET(BOOLEAN val)
LATEST_BY_OFFSET(DOUBLE val)
LATEST_BY_OFFSET(INT val)
LATEST_BY_OFFSET(BIGINT val)
LATEST_BY_OFFSET(VARCHAR val)
For detailed information on a function run: DESCRIBE FUNCTION <Function-Name>;

Both earliest + latest by offset should support structs, maps, and arrays.

purplefox commented 4 years ago

I don't think this is a bug. I'm not sure there's an easy way of supporting this using the current UDAF framework.

MichaelDrogalis commented 4 years ago

@purplefox Maybe bug was the wrong label. But you can see how you're basically toast if you can't use this with all your data types, right?

purplefox commented 4 years ago

Maybe @agavra can chime in here - he is the expert on the UDAF type system. Perhaps there is some esoteric way of doing this.

big-andy-coates commented 4 years ago

And decimals too!

wlaforest commented 4 years ago

Until this is supported we should indicate when a type isn't supported. Currently I get an uncaught exception.

big-andy-coates commented 4 years ago

Until this is supported we should indicate when a type isn't supported. Currently I get an uncaught exception.

That's unexpected. It should return an error message. Can you provide example statements to recreate and the actual result please?

big-andy-coates commented 4 years ago

I've investigated this. Fixing this will involve enhancing the UDAF framework to support something similar to the schemaProvider field of the @Udf annotation. I'll look to raise a KLIP. Moving to next milestone, as this is now beyond what is possible in v0.11.

agavra commented 4 years ago

I'm going to drop this from the milestone and put this at p1, when we decide to pull in this enhancement we can add it to the current milestone

agavra commented 3 years ago

Bumping down the priority of this because of #5652

bunkerdives commented 2 years ago

Should perhaps be updated in the docs, which already mention such for COLLECT_LIST and COLLECT_SET?

Namely, the docs for those 2 functions state "currently only works for simple types (not Map, Array, or Struct)" -- as this function should as well.

josh-endries commented 2 years ago

Just ran into this due to decimal. :(

jnh5y commented 2 years ago

https://github.com/confluentinc/ksql/pull/8878 should close this out!

adminnz commented 2 years ago

When will this be available within confluent cloud ?

colinhicks commented 2 years ago

@adminnz, we don't have a public release date to share yet, but I can say that this feature will be available quite soon in Confluent Cloud, as part of ksqlDB 0.25.1.