confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
45 stars 1.04k forks source link

Joined streams: pair with the most recent update only #4611

Open ShahOdin opened 4 years ago

ShahOdin commented 4 years ago

I have a stream join as below:

CREATE STREAM FOO AS
  SELECT c.title, s.fromInstant, s.toInstant
  FROM CATALOGUE_STREAM c
  FULL OUTER JOIN SCHEDULE_STREAM s WITHIN 48 HOURS
  ON s.ROWKEY = c.ROWKEY
  EMIT CHANGES;

where SCHEDULE_STREAM and CATALOGUE_STREAM are streams based on my kafka topics. and since I'd potentially get multiple messages from SCHEDULE_STREAM and CATALOGUE_STREAM for a given key, I tried the following to match it only with the latest update:

CREATE STREAM BAR AS
  SELECT A1.*
  FROM FOO AS F1 LEFT JOIN FOO AS F2
  ON (F1.ROWKEY = F2.ROWKEY AND F1.ROWTIME < F2.ROWTIME)    
  WHERE F2.ROWTIME IS NULL
  EMIT CHANGES;

see here for an explanation of the idea used here or see the following gist

but I get:

io.confluent.ksql.execution.expression.tree.LogicalBinaryExpression cannot be cast to io.confluent.ksql.execution.expression.tree.ComparisonExpression

CREATE STREAM BAZ AS
  SELECT c.title, s.fromInstant, s.toInstant
  FROM CATALOGUE_TABLE c
  FULL OUTER JOIN SCHEDULE_TABLE s
  ON s.ROWKEY = c.ROWKEY
  EMIT CHANGES;

where SCHEDULE_TABLE and CATALOGUE_TABLE are tables based on my kafka topics. Unfortunately I got:

Invalid result type. Your SELECT query produces a TABLE. Please use CREATE TABLE AS SELECT statement instead.

ShahOdin commented 4 years ago

just came across https://github.com/confluentinc/ksql/issues/3985 which covers a more general discussion of this.