confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
126 stars 1.04k forks source link

Pull and Push query behavior differs for wrapped keys #7646

Open clemensk1 opened 3 years ago

clemensk1 commented 3 years ago

Describe the bug Pull query shows 0 rows but data exists...

To Reproduce Steps to reproduce the behavior, include:

  1. The version of KSQL. -> 0.17.0, 0.18.0
  2. Sample source data. a sample message looks like: kafka message key:
    "xyz"

kafka message value:

{
"status": "online",
"host": "test",
"timestamp": "1623067935"
}
  1. Any SQL statements you ran I create a table with this statement:
    CREATE TABLE CONNECTION_STATUS (
    `identity` string primary key, 
    `status` string,
    `host` string,
    `timestamp` timestamp
    ) WITH (kafka_topic='connection-status', key_format='kafka', value_format='json');

then I create a queryable table with:

CREATE TABLE QA_CONNECTION_STATUS AS SELECT * FROM CONNECTION_STATUS;

Expected behavior A clear and concise description of what you expected to happen. I expect the following output:

ksql> SELECT * FROM QA_CONNECTION_STATUS WHERE `identity`='xyz';
+---------------+--------------------+----------------+---------------------------------+
|identity       |status              |host            |timestamp                        |
+---------------+--------------------+----------------+---------------------------------+
|xyz            |online              |test            |2021-06-07T11:51:09.458          |
Query terminated

Actual behaviour A clear and concise description of what actually happens, including:

  1. CLI output
ksql> SELECT * FROM QA_CONNECTION_STATUS WHERE `identity`='xyz';
+------------------------------+-------------------+----------------+-----------------------------------+
|identity                      |status             |host            |timestamp                          |
+------------------------------+-------------------+----------------+-----------------------------------+
Query terminated

If I do a push query, I continuously receive data:

ksql> SELECT * FROM QA_CONNECTION_STATUS WHERE `identity`='xyz' EMIT CHANGES;
+---------------+--------------------+----------------+---------------------------------+
|identity       |status              |host            |timestamp                        |
+---------------+--------------------+----------------+---------------------------------+
|xyz            |online              |test            |2021-06-07T11:51:09.458          |
|xyz            |online              |test            |2021-06-07T12:31:09.458          |
|xyz            |online              |test            |2021-06-07T13:01:09.458          |
  1. Error messages
  2. KSQL logs

Additional context Add any other context about the problem here.

agavra commented 3 years ago

I suspect this is the same issue as https://github.com/confluentinc/ksql/issues/7645. In order to get what you want, you'd need to:

CREATE TABLE CONNECTION_STATUS (
    myKey STRUCT<`identity` STRING> primary key, 
    `status` string,
    `host` string,
    `timestamp` timestamp
    ) WITH (kafka_topic='connection-status', key_format='kafka', value_format='json');
SELECT * FROM QA_CONNECTIONS WHERE myKey = STRUCT(`identity` := 'xyz');

or at least something along those lines - I haven't tested it out locally yet.

clemensk1 commented 3 years ago

but why does it work with push queries and not with pull queries? Please Look at "actual behavior" point 1.)

agavra commented 3 years ago

@clemensk1 I had totally missed that part - thanks for pointing it out. That's a mystery to me! I'll add the bug label back and we'll prioritize it soon.

caseypoint commented 3 years ago

I think we may have run across this bug as well. I am actually seeing some data in pull queries for certain keys but not others.

We created our table slightly differently:

CREATE STREAM readings_stream (
    uid STRING key,
    channel STRING,
    name STRING, 
    `group` STRING,
    timestamp BIGINT)
WITH (kafka_topic='readings', value_format='json', timestamp='timestamp');

CREATE TABLE readings_channel_view
AS SELECT uid AS uid,
    COLLECT_SET(CONCAT(name, '¦', channel)) as names,
    COLLECT_SET(CONCAT(`group`, '¦', channel)) as groups
FROM readings_stream
GROUP BY uid
EMIT CHANGES;

(the stream has more fields, omitted for brevity)

In our dev instance we have about 10 unique uid values, and I can run a pull query for only two.

caseypoint commented 3 years ago

I managed to work around this issue, apparently, by adding additional statements to the WHERE clause:

SET 'ksql.query.pull.table.scan.enabled'='true';
SELECT * FROM readings_channel_view WHERE uid = 'foo' AND ARRAY_LENGTH(names) > 0;

I had to set ksql.query.pull.table.scan.enabled because otherwise, the query would fail with:

A comparison must directly reference a key column.  See https://cnfl.io/queries for more info.
Add EMIT CHANGES if you intended to issue a push query.
Pull queries require a WHERE clause that:
 - includes a key equality expression, e.g. `SELECT * FROM X WHERE <key-column>=Y;`.
 - in the case of a multi-column key, is a conjunction of equality expressions that cover all key columns.

If more flexible queries are needed, table scans can be enabled by setting ksql.query.pull.table.scan.enabled=true.

I'm confused as to why this error gets printed; I am directly referencing a key column in that query.

caseypoint commented 3 years ago

Even simpler workaround:

SELECT * FROM readings_channel_view WHERE uid = 'foo' AND uid = 'foo';

Without ksql.query.pull.table.scan.enabled the above still gives the A comparison must directly reference a key column error, which feels like it is wrong.

agavra commented 3 years ago

I'm confused as to why this error gets printed; I am directly referencing a key column in that query.

@caseypoint I haven't had a chance to look at your new information (definitely interesting that this helped solve the problem, my guess is that the table scan itself is what is making it work as opposed to the condition itself) but to explain this error message, if you reference anything other than the key column it requires a table scan.

We have some work lined up to make this a little bit better (#6973) in situations like yours where it's obvious we don't need a table scan - but you can imagine a query like id = 'foo' OR other_field = 'bar' would not be able to utilize only the primary index.

caseypoint commented 3 years ago

Got it, thanks! The error message seemed a bit vague that it was saying all conditions need to directly reference the primary key, not that at least one was required. Of course it makes sense that a disjunction requires a table scan, but it would be nice if a conjunction didn't, as long as one clause directly compares the primary key.