confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
102 stars 1.04k forks source link

Create Stream from table #9696

Closed TheDarkFlame closed 7 months ago

TheDarkFlame commented 1 year ago

Is your feature request related to a problem? Please describe. At present, it is possible using the CLI client to view the latest values in a table using something like SELECT * FROM <TABLE> WHERE <KEY> = <VALUE>; the results of which are updated in real-time. It is not possible to get the same results out as a stream.

Describe the solution you'd like A way to run a select from table as a continuous stream.

Describe alternatives you've considered I have looked into subscribing to the internal changelog topic for the table query, however this does not contain all the aggregations

Additional context Use case: I'm looking to aggregate a set of events with a session window and emit the events when the window closes. At present this is impossible. Alternatives such as flink do things like this by building an internal queue and creating watermark events to trigger the closing off of a window, however there does not seem to be anything in KSQLDB that does a similar job.

suhas-satish commented 1 year ago

you can use create table as select from stream and query that

TheDarkFlame commented 1 year ago

Except when said table is built on top of a session window aggregation.

TheDarkFlame commented 1 year ago

Providing a simplified example:

CREATE SOURCE STREAM IF NOT EXISTS filtered_events (
    "AGGREGATION_KEY" VARCHAR,
    "NAME" VARCHAR
    "EVENT_DATETIME" VARCHAR
    )
WITH (KAFKA_TOPIC='input_topic', VALUE_FORMAT='JSON', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX', TIMESTAMP='EVENT_DATETIME', PARTITIONS=1);

CREATE TABLE IF NOT EXISTS aggregated_events AS
    SELECT
        AGGREGATION_KEY,
        COUNT(*) as "event_count",
        COLLECT_LIST("name") as "names",
        EARLIEST_BY_OFFSET("EVENT_DATETIME")
    FROM filtered_events
        WINDOW SESSION (5 SECONDS)
    GROUP BY AGGREGATION_KEY
    EMIT CHANGES;

--This does not work
CREATE STREAM IF NOT EXISTS aggregated_events_stream
        WITH (KAFKA_TOPIC='aggregated-stream', VALUE_FORMAT='JSON', partitions=1) AS
    SELECT * FROM aggregated_events
    EMIT CHANGES;
--Error Message:
--Could not determine output schema for query due to error: KSQL does not support persistent queries on windowed tables.

--But this works in the KSQL interactive CLI
SELECT * FROM aggregated_events
EMIT CHANGES;

@suhas-satish

baliberdin commented 1 year ago

I saw this issue last week. I think there is a "workaround" which is to create a stream over the aggregate table output topic. After this command CREATE TABLE IF NOT EXISTS aggregated_events ... A new persistent query and topic are created, then you can create a stream without "AS SELECT":

CREATE STREAM xyz(
  AGGREGATION_KEY VARCHAR KEY, 
  event_count BIGINT, ...) 
WITH(kafka_topic='<topic created by last command>', value_format='json', ...)

I don't know anything about the side effects, but I also tested it with WINDOWED EMIT FINAL tables and looks good

TheDarkFlame commented 1 year ago

@baliberdin The information you shared resolved my issue, I found that the aggregation key shows up as null always in the stream created, which was a little odd, however doing a AS_VALUE(AGGREGATION_KEY) AS "agg_key" allowed getting around this. also noteworthy, is that its still not possible to do a select * on the table, instead it is required to manually specify the schema.

For completeness, I've shared a full query-set:

CREATE SOURCE STREAM IF NOT EXISTS filtered_events (
    "AGGREGATION_KEY" VARCHAR,
    "NAME" VARCHAR
    "EVENT_DATETIME" VARCHAR
    )
WITH (KAFKA_TOPIC='input_topic', VALUE_FORMAT='JSON', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX', TIMESTAMP='EVENT_DATETIME', PARTITIONS=1);

CREATE TABLE IF NOT EXISTS aggregated_events 
  WITH (KAFKA_TOPIC='aggregation-table', VALUE_FORMAT='JSON', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX', TIMESTAMP='EARLIEST_TIMESTAMP', PARTITIONS=1) AS
    SELECT
        AGGREGATION_KEY as "agg_key",
        AS_VALUE(AGGREGATION_KEY) AS "AGGREGATION_KEY",
        COUNT(*) as "event_count",
        COLLECT_LIST("name") as "names",
        EARLIEST_BY_OFFSET("EVENT_DATETIME") as "earliest_timestamp"
    FROM filtered_events
        WINDOW SESSION (5 SECONDS)
    GROUP BY AGGREGATION_KEY
    EMIT CHANGES;

CREATE STREAM IF NOT EXISTS aggregated_events_stream(
    "agg_key" VARCHAR,
    "event_count" VARCHAR,
    "names" VARCHAR,
    "earliest_timestamp" VARCHAR
  )
  WITH (KAFKA_TOPIC='aggregation-table', VALUE_FORMAT='JSON', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX', TIMESTAMP='EARLIEST_TIMESTAMP', PARTITIONS=1);

CREATE STREAM IF NOT EXISTS output_stream
  WITH (KAFKA_TOPIC='aggregated-stream', VALUE_FORMAT='JSON', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX', TIMESTAMP='EARLIEST_TIMESTAMP', PARTITIONS=1) AS
    SELECT * FROM aggregated_events_stream;

With the following few points being notable:

  1. define the kafka stream for the table
  2. create a stream from the same topic as the table, and specifying the schema (as if it was a new stream from an unknown kafka topic)
  3. rename the grouping expression key AGGREGATION_KEY to "agg_key".
  4. duplicate the aggregation_key value into a new column in the table using AS_VALUE(AGGREGATION_KEY) AS AGGREGATION_KEY
  5. group by with the old name for the AGGREGATION_KEY, since the group by uses the field name from the input stream, the field name in the output table is unimportant for the group by.