confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
111 stars 1.04k forks source link

Not being able to query non-materialized tables should be better explained #7169

Open yuranos opened 3 years ago

yuranos commented 3 years ago

It's clear from the documentation that it's not possible to query non-materialized tables:

If a table is created directly on top of a Kafka topic, it's not materialized. Non-materialized tables can't be queried, because they would be highly inefficient.

However, it's not clear what exactly happens if one tries to. In my case, I tried different types of queries for non-materialized collections and have just got no result. No complains, no warnings, just empty result. What's more, a lot of documents can mislead beginners. For example this one about partitioning for JOINs:

-- clicks stream, with no or unknown key.
-- the schema of stream clicks is: USERID BIGINT | URL STRING
CREATE STREAM clicks (
    userId BIGINT, 
    url STRING
  ) WITH (
    kafka_topic='clickstream', 
    value_format='json'
  );

-- users table, with userId primary key. 
-- the schema of table users is: USERID BIGINT PRIMARY KEY | FULLNAME STRING
CREATE TABLE users (
    id BIGINT PRIMARY KEY, 
    fullName STRING
  ) WITH (
    kafka_topic='users', 
    value_format='json'
  );

-- join of users table with clicks stream, joining on the table's primary key and the stream's userId column:
-- join will automatically repartition clicks stream:
SELECT 
  c.userId,
  c.url, 
  u.fullName 
FROM clicks c
  JOIN users u ON c.userId = u.id;

Such select woudn't work. Well, it wouldn't work even with materialized tables without EMIT CHANGES, but even with it, it just didn't return anything. And again: No complains, no warnings, just empty result.

I can just say, as a lead of a team that is now actively adopting ksqlDB that it takes some for people to start paying attention to materialized vs non-materialized collections, especially when they begin with small amount of data.

So, the two documents linked in the ticket, along with any other documents that provide similar examples, should be more verbose about materialization and persistent queries.

yuranos commented 3 years ago

CLI will actually complain if I DON'T add EMIT CHANGES in the end and give a very explicit message: image But if I do add EMIT CHANGES in the end, the query will just halt and give no result back: image

For streams it's different. Without EMIT CHANGES I get the same verbose error message, but with EMIT CHANGES I get instantaneous result.

yuranos commented 3 years ago

Now, I don't even know if it's a documentation issue or a bug that I need to report separately. I have tried creating mat streams/tables from the corresponding non-mat streams/tables and while mat streams were queriable, mat tables were not(when created from a non-mat tables):

CREATE TABLE mxDimValueTableRekeyed (
    DimensionValue_Key bigint PRIMARY KEY,
    Dimension_Code string,
    Code string,
    Name string)
    WITH (kafka_topic='dimension_value', value_format='avro');

CREATE TABLE mxDimValueTableRekeyedMat AS SELECT * FROM mxDimValueTableRekeyed EMIT CHANGES;

select * from mxDimValueTableRekeyedMat emit changes;
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
|DIMENSIONVALUE_KEY              |DIMENSION_CODE                  |CODE                            |NAME                            |
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
....
...
......

Can wait forever to no avail.

agavra commented 3 years ago

Hello @yuranos thanks for filing the issue and pointing to some documentation holes, we'll try to get those fixed ASAP.

Note that when you issue EMIT CHANGES the default behavior is to ignore anything that happened before you issued the query. If you want to see changes from the beginning of time issue SET 'auto.offset.reset' = 'earliest'.

I have tried creating mat streams/tables from the corresponding non-mat streams/tables

The example you gave doesn't materialize the table, you need to use some kind of aggregation to create a queryable state store (well, that will change in 0.17 after https://github.com/confluentinc/ksql/pull/7085 but as of the latest releases that's the case). You can read https://docs.ksqldb.io/en/latest/how-to-guides/convert-changelog-to-table/ for more information.

yuranos commented 3 years ago

Hi @agavra I'm aware of SET 'auto.offset.reset' = 'earliest' thing and I always do this when I start my CLI(I work in local docker deployment for now). What's more, as I explained in the ticket all my stream queries work, only the table queries don't.

yuranos commented 3 years ago

As for the link you've provided - if it's the only way to work with mat tables, it's making them of any use for only a very narrow set of cases. Because every time I use GROUP BY, I need to list all the fields I want to get in my table in that grouping clause.

As a result, the primary key of the table will be inferred from all those fields. It's explained very well in https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/create-table-as-select/, mentioned in your link, and also touched upon in https://www.confluent.io/blog/ksqldb-0-10-updates-key-columns/ Without those manipulations I would get something like:

Non-aggregate SELECT expression(s) not part of GROUP BY: AS_VALUE(DIMENSION_CODE), AS_VALUE(CODE), AS_VALUE(NAME), DIMENSION_CODE, CODE,
 NAME
Either add the column to the GROUP BY or remove it from the SELECT.

But such composite PRIMARY KEY is of no use for our case because I need to JOIN this table with other collections(streams). So, I need a way to get a mat table from the topic with a PRIMARY KEY of my choosing and at the same time get other field from the underlying topic into the table.

agavra commented 3 years ago

What's more, as I explained in the ticket all my stream queries work, only the table queries don't.

That's interesting, can you PRINT the topic for the table that you're trying to read from? There might be some other issues going on (e.g. deserialization errors). You can also look at the processing log to see if anything happened when reading individual messages (https://docs.ksqldb.io/en/latest/reference/processing-log/#processing-log)

Because every time I use GROUP BY, I need to list all the fields I want to get in my table in that grouping clause.

This isn't necessary, you can use LATEST_BY_OFFSET on the columns that you don't want to group by (I believe the guide references that).

yuranos commented 3 years ago

Ok, let's see. First of all, yes, PRINT works perfectly well. After all, why wouldn't it if all queries on streams also do. I have tried creating both a stream and a table from the same topic.

As for LATEST_BY_OFFSET, I'm not sure I understand. Can you elaborate? Every time I use any function in the selection, ksqlDB also forces me to use all the fields that are included in the GROUP BY. When I do wrap all needed fields, no matter what functions are use, I would always get an exception akin to:

ksql> CREATE TABLE dimValueTable AS SELECT AS_VALUE(DimensionValue_Key) AS DimensionValue_Key, AS_VALUE(Dimension_Code) AS Dimension_
Code, AS_VALUE(Code) AS Code, AS_VALUE(Name) AS Name, LATEST_BY_OFFSET(DimensionValue_Key) AS aggDimensionValue_Key, LATEST_BY_OFFSET(Di
mension_Code) AS aggDimension_Code, LATEST_BY_OFFSET(Code) AS aggCode, LATEST_BY_OFFSET(Name) AS aggName, COUNT(*) FROM mxDimValueStream
 GROUP BY DimensionValue_Key, Dimension_Code, Code, Name EMIT CHANGES;
Keys missing from projection. 
The query used to build `DIMVALUETABLE` must include the grouping expressions DIMENSIONVALUE_KEY, DIMENSION_CODE, CODE and NAME in it
s projection.

The fields inside GPOUP BY are the fields I subsequently will need in my JOINs on the table. I can't remove them from GROUP BY because then I can't get them into the message(like with AS_VALUE(Code) AS Code).

As for the processing logs, I might try to look into that today, that's a good idea, just requires a bit more time to investigate.

yuranos commented 3 years ago

The only other suspicion I have, and I don't know if it's related, is that I can't use DESCRIBE on either tables or streams I create:

ksql> describe dimValueTable;
Failed to deserialise object
Caused by: Cannot construct instance of
    `io.confluent.ksql.rest.entity.SourceDescription`, problem:
    queryOffsetSummaries
 at [Source:
    (byte[])"[{"@type":"sourceDescription","statementText":"describe
    dimValueTable;","sourceDescription":{"name":"DIMVALUETABLE","windowType":nul
    l,"readQueries":[],"writeQueries":[{"queryString":"CREATE TABLE DIMVALUETABLE
    WITH (KAFKA_TOPIC='DIMVALUETABLE', PARTITIONS=3, REPLICAS=3) AS SELECT\n 
    AS_VALUE(DIMVALUESTREAM.DIMENSIONVALUE_KEY) DIMENSIONVALUE_KEY,\n 
    AS_VALUE(DIMVALUESTREAM.DIMENSION_CODE) DIMENSION_CODE,\n 
    AS_VALUE(DIMVALUESTREAM.CODE) CODE,\n  AS_VALUE(DIMVALUESTREAM.NAME)
    NAME,"[truncated 1990 bytes]; line: 1, column: 2474] (through reference chain:
    io.confluent.ksql.rest.entity.KsqlEntityList[0]->io.confluent.ksql.rest.entity.S
    ourceDescriptionEntity["sourceDescription"])
Caused by: queryOffsetSummaries
ksql> describe dimValueStream;
Failed to deserialise object
Caused by: Cannot construct instance of
    `io.confluent.ksql.rest.entity.SourceDescription`, problem:
    queryOffsetSummaries
 at [Source:
    (byte[])"[{"@type":"sourceDescription","statementText":"describe
    dimValueStream;","sourceDescription":{"name":"DIMVALUESTREAM","windowType":n
    ull,"readQueries":[{"queryString":"CREATE TABLE DIMVALUETABLE WITH
    (KAFKA_TOPIC='DIMVALUETABLE', PARTITIONS=3, REPLICAS=3) AS SELECT\n 
    AS_VALUE(DIMVALUESTREAM.DIMENSIONVALUE_KEY) DIMENSIONVALUE_KEY,\n 
    AS_VALUE(DIMVALUESTREAM.DIMENSION_CODE) DIMENSION_CODE,\n 
    AS_VALUE(DIMVALUESTREAM.CODE) CODE,\n  AS_VALUE(DIMVALUESTREAM.NAME) NAME,\n
    DIMVALUEST"[truncated 1985 bytes]; line: 1, column: 2469] (through reference
    chain:
    io.confluent.ksql.rest.entity.KsqlEntityList[0]->io.confluent.ksql.rest.entity.S
    ourceDescriptionEntity["sourceDescription"])
Caused by: queryOffsetSummaries
agavra commented 3 years ago

First of all, yes, PRINT works perfectly well. After all, why wouldn't it if all queries on streams also do. I have tried creating both a stream and a table from the same topic.

Can you paste the output of the first few rows and the "guessed" key/value types? I was looking to make sure the serialization matched up.

As for LATEST_BY_OFFSET, I'm not sure I understand. Can you elaborate?

The point here is that you don't need to group by all the fields, you can group by only the fields that you want and then for the other ones you use LATEST_BY_OFFSET. For the ones that you do group by, you don't need to useAS_VALUE` (in fact, you shouldn't use as value unless you want to also duplicate them into the value in the kafka message).

For example, if you wanted (I know you want all, but this is for example) only to group by DimensionValue_Key you could do this (note that I do not use AS_VALUE for it):

CREATE TABLE dimValueTable AS DimensionValue_Key, LATEST_BY_OFFSET(Dimension_Code) AS aggDimension_Code, LATEST_BY_OFFSET(Code) AS aggCode, LATEST_BY_OFFSET(Name) AS aggName, COUNT(*) FROM mxDimValueStream
 GROUP BY DimensionValue_Key EMIT CHANGES;

ksql> describe dimValueStream; Failed to deserialise object Caused by: Cannot construct instance of

This almost certainly means your CLI and your server are mismatched versions. Can you share the ksql splash screen that you have?

yuranos commented 3 years ago

Thanks, @agavra I can drop you both the splash screen and the docker compose snippet.

version: '2'

services:
  ksqldb-server:
    image: confluentinc/cp-ksqldb-server
    hostname: ksqldb-server
    container_name: ksqldb-server
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088

  ksqldb-cli:
    image: confluentinc/ksqldb-cli
    container_name: ksqldb-cli
    depends_on:
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true
    environment:
      ...

Here's the splash screen:

yuranos@xps-9570:~/.../ksql (master) $ docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2020 Confluent Inc.

CLI v0.12.0, Server v6.0.0 located at http://ksqldb-server:8088
Server Status: <unknown>

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> 

I don't remember if I mentioned that before, but I run it against Confluent Cloud. So, that's pretty much my entire docker-compose file(I just remove the creds and other configs in in env.vars). Do you think there might be a version incompatibility because of implicit tag(latest)?

Back to the GROUP BY clause. I need all the values I put into GROUP BY in my resulting table. I don't need grouping at all. I don't need any aggregation for that matter. I just can't find another way to create a mat table with rekeyed topic. Ultimately, I would want a join by a foreign key, but I guess it's still in progress.

PeterLindner commented 3 years ago

I need all the values I put into GROUP BY in my resulting table. I don't need grouping at all. I don't need any aggregation for that matter. I just can't find another way to create a mat table with rekeyed topic. Ultimately, I would want a join by a foreign key, but I guess it's still in progress.

@yuranos to work around foreign key joins you could try to aggregate all values into an array (with COLLECT_LIST or COLLECT_SET) then join to your stream and use EXPLODE afterwards. This workaround has many limitations and gotchas, but may work for simple usecases until foreign key joins are supported properly.

Hope it helps.

yuranos commented 3 years ago

Hi @PeterLindner Thank you, I also saw you message in #4424, but I'm not sure what you suggest is applicable to my usecase. I'm not so much interested in one-to-many relation, as I am in rekeying the table so that I can at least somehow join it with a stream to have a non-windowed result.

yuranos commented 3 years ago

@agavra, I had some progress. At least I was able to get a rekeyed table. I'm not sure it's the most idiomatic way to do it because initially I wanted to only operate streams and tables and abstract away from topics. But I think in my initial example, I misinterpreted how tables work. I tried:

CREATE TABLE mxDimValueTableRekeyed (
    DimensionValue_Key bigint PRIMARY KEY,
    Dimension_Code string,
    Code string,
    Name string)
    WITH (kafka_topic='dimension_value', value_format='avro');

on the 'unkeyed' topic, that is where all the keys were nulls. I thought a table should be able to extract a key by the field name from the message body. That was naive of me.

Anyway, what I did is created a stream from the initial unkeyed topic, then rekeyed it with:

CREATE STREAM mxDimValueStream (DimensionValue_Key bigint, Dimension_Code string, Code string, Name string)
    WITH (kafka_topic='dimension_value', value_format='avro');

CREATE STREAM mxDimValueStreamRekeyed
  WITH (partitions = 3, value_format='avro', key_format='avro', kafka_topic='pksqlc-12j9j.dimension_value.rekeyed')
  AS SELECT * FROM mxDimValueStream PARTITION BY DimensionValue_Key EMIT CHANGES;

As a result, I've got a topic with the right key, in my case DimensionValue_Key. Now, I just used CREATE TABLE off that topic:

CREATE TABLE mxDimValueTable (
                                        DimensionValue_Key bigint PRIMARY KEY,
                                        Dimension_Code string,
                                        Code string,
                                        Name string)
    WITH (kafka_topic='pksqlc-12j9j.dimension_value.rekeyed',
        value_format='avro',
        key_format='avro');

I don't like the fact that I have to create a table providing a topic, that I also supplied name for beforehand, since based on Kafka Streams principles I don't really need to bother about inner topics. And this one was supposed to be an inner topic.

In any case, now I have to come up with one last step to take. I have successfully mxDimValueTable joined with another table by that DimensionValue_Key and need to add one last entity to the mix. That last entity should be joined with the joined table, but somehow rekeyed in the process. The joined table has DimensionValue_Key as a key, and JobTask_Key field in its body. I need to join by that JobTask_Key field. What's the most straightforward way to do that? I cannot rekey a table directly, so what comes to mind is converting table->stream, then rekeyed that stream and converting back to table, because as a final result I need a join of 3 entities, something liek this:

SELECT *
FROM mxJobTaskStream AS jtask
         JOIN mxJobTaskDimTable AS jtaskdim ON jtask.JobTask_Key = jtaskdim.JobTask_Key
         JOIN mxDimValueTable AS dimVal ON jtaskdim.DimensionValue_Key = dimVal.DimensionValue_Key EMIT CHANGES;

I have managed to join mxDimValueTable and mxJobTaskDimTable using DimensionValue_Key. Now I need to get it joined with mxJobTaskStream by a different key(mxJobTaskStream - doesn't have to be a stream. I can create a table from it). The key point here is that I need to rekey a table with the minimum number of iterations. I'm also really disappointed by this:

Joins cause data re-partitioning of a stream only if the stream was marked for re-partitioning. If both streams are marked, both are re-partitioned.

...because I don't see anything in the dev guides about marking a stream for re-partitioning.

yuranos commented 3 years ago

@agavra , just found a similar workaround posted by @rmoff : https://github.com/confluentinc/ksql/issues/1405