confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
119 stars 1.04k forks source link

ksqldb 0.15.0 - PARTITION BY re-keying stream stops working after sometime #7214

Open konstan opened 3 years ago

konstan commented 3 years ago

Describe the bug

The stream that re-keys another one using PARTITION BY, stops producing results after running for some time. The queries that read from that stream receive no results.

To Reproduce Steps to reproduce the behavior, include:

KSQL - 0.15.0 image confluentinc/ksqldb-server:0.15.0

The ksql statements are defined here https://github.com/nuvla/deployment/blob/master/streams/ksqldb/statements.replicas-1.sql#L273 and CREATE STREAM NB_TELEM_RESOURCES_REKYED_S is the one that stops working. The FROM stream, the failing one rekeys, works with no problems.

Expected behavior

The stream should be working.

Actual behaviour

The stream stops working after some time.

Attempting to query the stream from ksql shell produces no results.

Here is one of the many such warnings in ksql-streams.log. CSAS_NB_TELEM_RESOURCES_REKYED_S_637 is the querey that writes from the stream re-keying stream that stops working.

[2021-03-11 19:18:51,727] WARN stream-thread [_confluent-ksql-default_query_CSAS_NB_TELEM_RESOURCES_REKYED_S_637-90b18f01-fa5a-449c-a813-bcd6f5d9a6c9-StreamThread-2] Detected that the thread is being fenced. This implies that this thread missed a rebalance and dropped out of the consumer group. Will close out all assigned tasks and rejoin the consumer group. (or
g.apache.kafka.streams.processor.internals.StreamThread:613)
org.apache.kafka.streams.errors.TaskMigratedException: Consumer committing offsets failed, indicating the corresponding thread is no longer part of the group; it means all tasks belonging to this thread should be migrated.
        at org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction(TaskManager.java:1143)
        at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1064)
        at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:945)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:722)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:559)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:539)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1139)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1004)
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1486)
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1434)
        at org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction(TaskManager.java:1141)
        ... 5 more
[2021-03-11 19:18:51,729] INFO stream-thread [_confluent-ksql-default_query_CSAS_NB_TELEM_RESOURCES_REKYED_S_637-90b18f01-fa5a-449c-a813-bcd6f5d9a6c9-StreamThread-2] task [0_0] Suspended running (org.apache.kafka.streams.processor.internals.StreamTask:279)
[2021-03-11 19:18:51,745] INFO stream-thread [_confluent-ksql-default_query_CSAS_NB_TELEM_RESOURCES_REKYED_S_637-90b18f01-fa5a-449c-a813-bcd6f5d9a6c9-StreamThread-2] task [0_0] Closing record collector dirty (org.apache.kafka.streams.processor.internals.RecordCollectorImpl:276)
[2021-03-11 19:18:51,745] INFO stream-thread [_confluent-ksql-default_query_CSAS_NB_TELEM_RESOURCES_REKYED_S_637-90b18f01-fa5a-449c-a813-bcd6f5d9a6c9-StreamThread-2] task [0_0] Closed dirty (org.apache.kafka.streams.processor.internals.StreamTask:501)
[2021-03-11 19:18:51,745] INFO stream-thread [_confluent-ksql-default_query_CSAS_NB_TELEM_RESOURCES_REKYED_S_637-90b18f01-fa5a-449c-a813-bcd6f5d9a6c9-StreamThread-2] at state RUNNING: partitions [_confluent-ksql-default_query_CSAS_NB_TELEM_RESOURCES_REKYED_S_637-Join-repartition-0, NUVLABOX_REKYED_S-0] lost due to missed rebalance.
        lost active tasks: []
        lost assigned standby tasks: []
 (org.apache.kafka.streams.processor.internals.StreamThread:101)

Restarting ksqldb doesn't help.

Dropping the stream directly returns the error below, while I was expecting a suggestion with lists of source/sink queries to terminate.

ksql> drop stream NB_TELEM_RESOURCES_REKYED_S;
class io.confluent.ksql.query.QueryId cannot be cast to class java.lang.Comparable (io.confluent.ksql.query.QueryId is in unnamed module of loader 'app'; java.lang.Comparable is in module java.base of loader 'bootstrap')
ksql>

Heres are the logs from the ksqldb instance. ksqldb.logs.tgz.zip Please remove .zip extension and unpack ksqldb.logs.tgz. I'm new to ksqldb and don't know how to interpret the logs. So, I can not tell exactly when the problem appears. The logs span the time from when the stream was working fine and then at some point stopped producing results.

Here is the extended description of the stream.

ksql> describe extended NB_TELEM_RESOURCES_REKYED_S;

Name                 : NB_TELEM_RESOURCES_REKYED_S
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : NB_TELEM_RESOURCES_REKYED_S (partitions: 1, replication: 1)
Statement            : CREATE STREAM NB_TELEM_RESOURCES_REKYED_S WITH (KAFKA_TOPIC='NB_TELEM_RESOURCES_REKYED_S', PARTITIONS=1, REPLICAS=1) AS SELECT
  TLM.PARENT ID,
  NB.NAME NAME,
  NB.DESCRIPTION DESCRIPTION,
  TLM.ONLINE ONLINE,
  TLM.`online-prev` ONLINE_PREV,
  TLM.RESOURCES RESOURCES,
  ((TLM.RESOURCES->CPU->`load` * 100) / TLM.RESOURCES->CPU->`capacity`) RESOURCES_CPU_LOAD_PERS,
  ((TLM.RESOURCES->RAM->`used` * 100) / TLM.RESOURCES->RAM->`capacity`) RESOURCES_RAM_USED_PERS,                                                                                                                                                                                                                                                                                                                                                                                                                                ((TLM.RESOURCES->DISKS[1]->`used` * 100) / TLM.RESOURCES->DISKS[1]->`capacity`) RESOURCES_DISK1_USED_PERS,
  TLM.`resources-prev` RESOURCES_PREV,                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          ((TLM.`resources-prev`->CPU->`load` * 100) / TLM.`resources-prev`->CPU->`capacity`) RESOURCES_PREV_CPU_LOAD_PERS,                                                                                                                                                                                                                                                                                                                                                                                                             ((TLM.`resources-prev`->RAM->`used` * 100) / TLM.`resources-prev`->RAM->`capacity`) RESOURCES_PREV_RAM_USED_PERS,
  ((TLM.`resources-prev`->DISKS[1]->`used` * 100) / TLM.`resources-prev`->DISKS[1]->`capacity`) RESOURCES_PREV_DISK1_USED_PERS,
  TLM.`current-time` TIMESTAMP,
  TLM.ACL ACL
FROM NB_TELEM_RESOURCES_S TLM
INNER JOIN NUVLABOX_T NB ON ((TLM.PARENT = NB.ID))
PARTITION BY TLM.PARENT
EMIT CHANGES;

 Field                          | Type
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 ID                             | VARCHAR(STRING)  (key)
 NAME                           | VARCHAR(STRING)
 DESCRIPTION                    | VARCHAR(STRING)
 ONLINE                         | BOOLEAN
 ONLINE_PREV                    | BOOLEAN
 RESOURCES                      | STRUCT<CPU STRUCT<load DOUBLE, capacity BIGINT, topic VARCHAR(STRING)>, RAM STRUCT<used BIGINT, capacity BIGINT, topic VARCHAR(STRING)>, DISKS ARRAY<STRUCT<used BIGINT, capacity BIGINT, device VARCHAR(STRING)>>>
 RESOURCES_CPU_LOAD_PERS        | DOUBLE
 RESOURCES_RAM_USED_PERS        | BIGINT
 RESOURCES_DISK1_USED_PERS      | BIGINT
 RESOURCES_PREV                 | STRUCT<CPU STRUCT<load DOUBLE, capacity BIGINT, topic VARCHAR(STRING)>, RAM STRUCT<used BIGINT, capacity BIGINT, topic VARCHAR(STRING)>, DISKS ARRAY<STRUCT<used BIGINT, capacity BIGINT, device VARCHAR(STRING)>>>
 RESOURCES_PREV_CPU_LOAD_PERS   | DOUBLE
 RESOURCES_PREV_RAM_USED_PERS   | BIGINT
 RESOURCES_PREV_DISK1_USED_PERS | BIGINT
 TIMESTAMP                      | VARCHAR(STRING)
 ACL                            | STRUCT<owners ARRAY<VARCHAR(STRING)>, view-data ARRAY<VARCHAR(STRING)>>
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Queries that read from this STREAM
-----------------------------------
INSERTQUERY_643 (RUNNING) : INSERT INTO NOTIFICATIONS_S SELECT     subs.subs_id as id,     AS_VALUE(subs.subs_id) as subs_id,     subs_t.name as subs_name,     subs_t."method-ids" as method_ids,     subs_t.description as subs_description,     CONCAT('edge/', SPLIT(AS_VALUE(nb_tlm.id), '/')[2]) as resource_uri,     nb_tlm.name as resource_name,     nb_tlm.description as resource_description,     'NB load %' as metric,     subs_t.criteria->condition as condition,     CAST(subs_t.criteria->"value" as VARCHAR
) as condition_value,     CAST(nb_tlm.resources_cpu_load_pers as VARCHAR) as "VALUE",     nb_tlm.timestamp as timestamp,     (nb_tlm.resources_prev->cpu->"load" < nb_tlm.resources->cpu->"load") as recovery FROM NB_TELEM_RESOURCES_REKYED_S AS nb_tlm JOIN SUBS_NB_LOAD_BELOW_T AS subs ON subs."resource-id" = nb_tlm.id JOIN SUBSCRIPTION_T AS subs_t ON subs_t.id = subs.subs_id WHERE     subs_t.enabled = true     AND subs_t.category = 'notification'     AND subs_t."resource-kind" = 'nuvlabox'     AND (ARRAY_CON
TAINS(nb_tlm.acl->"owners", subs.owner) OR ARRAY_CONTAINS(nb_tlm.acl->"view-data", subs.owner))     AND ((nb_tlm.resources_prev_cpu_load_pers >= CAST(subs_t.criteria->"value" AS INTEGER)            AND nb_tlm.resources_cpu_load_pers < CAST(subs_t.criteria->"value" AS INTEGER))         OR         (nb_tlm.resources_cpu_load_pers > CAST(subs_t.criteria->"value" AS INTEGER)            AND nb_tlm.resources_prev_cpu_load_pers <= CAST(subs_t.criteria->"value" AS INTEGER))) EMIT CHANGES;
INSERTQUERY_645 (RUNNING) : INSERT INTO NOTIFICATIONS_S SELECT     subs.subs_id as id,     AS_VALUE(subs.subs_id) as subs_id,     subs_t.name as subs_name,     subs_t."method-ids" as method_ids,     subs_t.description as subs_description,     CONCAT('edge/', SPLIT(AS_VALUE(nb_tlm.id), '/')[2]) as resource_uri,     nb_tlm.name as resource_name,     nb_tlm.description as resource_description,     'NB ram %' as metric,     subs_t.criteria->condition as condition,     CAST(subs_t.criteria->"value" as VARCHAR)
 as condition_value,     CAST(nb_tlm.resources_ram_used_pers as VARCHAR) as "VALUE",     nb_tlm.timestamp as timestamp,     (nb_tlm.resources_prev->ram->"used" > nb_tlm.resources->ram->"used") as recovery FROM NB_TELEM_RESOURCES_REKYED_S AS nb_tlm JOIN SUBS_NB_RAM_ABOVE_T AS subs ON subs."resource-id" = nb_tlm.id JOIN SUBSCRIPTION_T AS subs_t ON subs_t.id = subs.subs_id WHERE     subs_t.enabled = true     AND subs_t.category = 'notification'     AND subs_t."resource-kind" = 'nuvlabox'     AND (ARRAY_CONTA
INS(nb_tlm.acl->"owners", subs.owner) OR ARRAY_CONTAINS(nb_tlm.acl->"view-data", subs.owner))     AND ((nb_tlm.resources_prev_ram_used_pers <= CAST(subs_t.criteria->"value" AS INTEGER)            AND nb_tlm.resources_ram_used_pers > CAST(subs_t.criteria->"value" AS INTEGER))         OR         (nb_tlm.resources_ram_used_pers < CAST(subs_t.criteria->"value" AS INTEGER)            AND nb_tlm.resources_prev_ram_used_pers >= CAST(subs_t.criteria->"value" AS INTEGER))) EMIT CHANGES;
INSERTQUERY_651 (RUNNING) : INSERT INTO NOTIFICATIONS_S SELECT     subs.subs_id as id,     AS_VALUE(subs.subs_id) as subs_id,     subs_t.name as subs_name,     subs_t."method-ids" as method_ids,     subs_t.description as subs_description,     CONCAT('edge/', SPLIT(AS_VALUE(nb_tlm.id), '/')[2]) as resource_uri,     nb_tlm.name as resource_name,     nb_tlm.description as resource_description,     'NB disk %' as metric,     subs_t.criteria->condition as condition,     CAST(subs_t.criteria->"value" as VARCHAR
) as condition_value,     CAST(nb_tlm.resources_disk1_used_pers as VARCHAR) as "VALUE",     nb_tlm.timestamp as timestamp,     (nb_tlm.resources_prev->disks[1]->"used" < nb_tlm.resources->disks[1]->"used") as recovery FROM NB_TELEM_RESOURCES_REKYED_S AS nb_tlm JOIN SUBS_NB_DISK_BELOW_T AS subs ON subs."resource-id" = nb_tlm.id JOIN SUBSCRIPTION_T AS subs_t ON subs_t.id = subs.subs_id WHERE     subs_t.enabled = true     AND subs_t.category = 'notification'     AND subs_t."resource-kind" = 'nuvlabox'     AN
D (ARRAY_CONTAINS(nb_tlm.acl->"owners", subs.owner) OR ARRAY_CONTAINS(nb_tlm.acl->"view-data", subs.owner))     AND ((nb_tlm.resources_prev_disk1_used_pers >= CAST(subs_t.criteria->"value" AS INTEGER)            AND nb_tlm.resources_disk1_used_pers < CAST(subs_t.criteria->"value" AS INTEGER))         OR         (nb_tlm.resources_disk1_used_pers > CAST(subs_t.criteria->"value" AS INTEGER)            AND nb_tlm.resources_prev_disk1_used_pers <= CAST(subs_t.criteria->"value" AS INTEGER))) EMIT CHANGES;
INSERTQUERY_641 (RUNNING) : INSERT INTO NOTIFICATIONS_S SELECT     subs.subs_id as id,     AS_VALUE(subs.subs_id) as subs_id,     subs_t.name as subs_name,     subs_t."method-ids" as method_ids,     subs_t.description as subs_description,     CONCAT('edge/', SPLIT(AS_VALUE(nb_tlm.id), '/')[2]) as resource_uri,     nb_tlm.name as resource_name,     nb_tlm.description as resource_description,     'NB load %' as metric,     subs_t.criteria->condition as condition,     CAST(subs_t.criteria->"value" as VARCHAR
) as condition_value,     CAST(nb_tlm.resources_cpu_load_pers as VARCHAR) as "VALUE",     nb_tlm.timestamp as timestamp,     (nb_tlm.resources_prev->cpu->"load" > nb_tlm.resources->cpu->"load") as recovery FROM NB_TELEM_RESOURCES_REKYED_S AS nb_tlm JOIN SUBS_NB_LOAD_ABOVE_T AS subs ON subs."resource-id" = nb_tlm.id JOIN SUBSCRIPTION_T AS subs_t ON subs_t.id = subs.subs_id WHERE     subs_t.enabled = true     AND subs_t.category = 'notification'     AND subs_t."resource-kind" = 'nuvlabox'     AND (ARRAY_CON
TAINS(nb_tlm.acl->"owners", subs.owner) OR ARRAY_CONTAINS(nb_tlm.acl->"view-data", subs.owner))     AND ((nb_tlm.resources_prev_cpu_load_pers <= CAST(subs_t.criteria->"value" AS INTEGER)            AND nb_tlm.resources_cpu_load_pers > CAST(subs_t.criteria->"value" AS INTEGER))         OR         (nb_tlm.resources_cpu_load_pers < CAST(subs_t.criteria->"value" AS INTEGER)            AND nb_tlm.resources_prev_cpu_load_pers >= CAST(subs_t.criteria->"value" AS INTEGER))) EMIT CHANGES;
INSERTQUERY_647 (RUNNING) : INSERT INTO NOTIFICATIONS_S SELECT     subs.subs_id as id,     AS_VALUE(subs.subs_id) as subs_id,     subs_t.name as subs_name,     subs_t."method-ids" as method_ids,     subs_t.description as subs_description,     CONCAT('edge/', SPLIT(AS_VALUE(nb_tlm.id), '/')[2]) as resource_uri,     nb_tlm.name as resource_name,     nb_tlm.description as resource_description,     'NB ram %' as metric,     subs_t.criteria->condition as condition,     CAST(subs_t.criteria->"value" as VARCHAR)
 as condition_value,     CAST(nb_tlm.resources_ram_used_pers as VARCHAR) as "VALUE",     nb_tlm.timestamp as timestamp,     (nb_tlm.resources_prev->ram->"used" < nb_tlm.resources->ram->"used") as recovery FROM NB_TELEM_RESOURCES_REKYED_S AS nb_tlm JOIN SUBS_NB_RAM_BELOW_T AS subs ON subs."resource-id" = nb_tlm.id JOIN SUBSCRIPTION_T AS subs_t ON subs_t.id = subs.subs_id WHERE     subs_t.enabled = true     AND subs_t.category = 'notification'     AND subs_t."resource-kind" = 'nuvlabox'     AND (ARRAY_CONTA
INS(nb_tlm.acl->"owners", subs.owner) OR ARRAY_CONTAINS(nb_tlm.acl->"view-data", subs.owner))     AND ((nb_tlm.resources_prev_ram_used_pers >= CAST(subs_t.criteria->"value" AS INTEGER)            AND nb_tlm.resources_ram_used_pers < CAST(subs_t.criteria->"value" AS INTEGER))         OR         (nb_tlm.resources_ram_used_pers > CAST(subs_t.criteria->"value" AS INTEGER)            AND nb_tlm.resources_prev_ram_used_pers <= CAST(subs_t.criteria->"value" AS INTEGER))) EMIT CHANGES;
INSERTQUERY_649 (RUNNING) : INSERT INTO NOTIFICATIONS_S SELECT     subs.subs_id as id,     AS_VALUE(subs.subs_id) as subs_id,     subs_t.name as subs_name,     subs_t."method-ids" as method_ids,     subs_t.description as subs_description,     CONCAT('edge/', SPLIT(AS_VALUE(nb_tlm.id), '/')[2]) as resource_uri,     nb_tlm.name as resource_name,     nb_tlm.description as resource_description,     'NB disk %' as metric,     subs_t.criteria->condition as condition,     CAST(subs_t.criteria->"value" as VARCHAR
) as condition_value,     CAST(nb_tlm.resources_disk1_used_pers as VARCHAR) as "VALUE",     nb_tlm.timestamp as timestamp,     (nb_tlm.resources_prev->disks[1]->"used" > nb_tlm.resources->disks[1]->"used") as recovery FROM NB_TELEM_RESOURCES_REKYED_S AS nb_tlm JOIN SUBS_NB_DISK_ABOVE_T AS subs ON subs."resource-id" = nb_tlm.id JOIN SUBSCRIPTION_T AS subs_t ON subs_t.id = subs.subs_id WHERE     subs_t.enabled = true     AND subs_t.category = 'notification'     AND subs_t."resource-kind" = 'nuvlabox'     AN
D (ARRAY_CONTAINS(nb_tlm.acl->"owners", subs.owner) OR ARRAY_CONTAINS(nb_tlm.acl->"view-data", subs.owner))     AND ((nb_tlm.resources_prev_disk1_used_pers <= CAST(subs_t.criteria->"value" AS INTEGER)            AND nb_tlm.resources_disk1_used_pers > CAST(subs_t.criteria->"value" AS INTEGER))         OR         (nb_tlm.resources_disk1_used_pers < CAST(subs_t.criteria->"value" AS INTEGER)            AND nb_tlm.resources_prev_disk1_used_pers >= CAST(subs_t.criteria->"value" AS INTEGER))) EMIT CHANGES;
INSERTQUERY_639 (RUNNING) : INSERT INTO NOTIFICATIONS_S SELECT     subs.subs_id as id,     AS_VALUE(subs.subs_id) as subs_id,     subs_t.name as subs_name,     subs_t."method-ids" as method_ids,     subs_t.description as subs_description,     CONCAT('edge/', SPLIT(AS_VALUE(nb_tlm.id), '/')[2]) as resource_uri,     nb_tlm.name as resource_name,     nb_tlm.description as resource_description,     'NB online' as metric,     CAST((nb_tlm.online = true AND nb_tlm.online_prev = false) as VARCHAR) as condition,
    '' as condition_value,     '' as "VALUE",     nb_tlm.timestamp as timestamp,     (nb_tlm.online = true AND nb_tlm.online_prev = false) as recovery FROM NB_TELEM_RESOURCES_REKYED_S AS nb_tlm JOIN SUBS_NB_STATE_T AS subs ON subs."resource-id" = nb_tlm.id JOIN SUBSCRIPTION_T AS subs_t ON subs_t.id = subs.subs_id WHERE     subs_t.enabled = true     AND subs_t.category = 'notification'     AND subs_t."resource-kind" = 'nuvlabox'     AND (ARRAY_CONTAINS(nb_tlm.acl->"owners", subs.owner) OR ARRAY_CONTAINS(nb
_tlm.acl->"view-data", subs.owner))     AND ((nb_tlm.online = true AND nb_tlm.online_prev = false)           OR (nb_tlm.online = false AND nb_tlm.online_prev = true)) EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Queries that write from this STREAM
-----------------------------------
CSAS_NB_TELEM_RESOURCES_REKYED_S_637 (RUNNING) : CREATE STREAM NB_TELEM_RESOURCES_REKYED_S WITH (KAFKA_TOPIC='NB_TELEM_RESOURCES_REKYED_S', PARTITIONS=1, REPLICAS=1) AS SELECT   TLM.PARENT ID,   NB.NAME NAME,   NB.DESCRIPTION DESCRIPTION,   TLM.ONLINE ONLINE,   TLM.`online-prev` ONLINE_PREV,   TLM.RESOURCES RESOURCES,   ((TLM.RESOURCES->CPU->`load` * 100) / TLM.RESOURCES->CPU->`capacity`) RESOURCES_CPU_LOAD_PERS,   ((TLM.RESOURCES->RAM->`used` * 100) / TLM.RESOURCES->RAM->`capacity`) RESOURCES_RAM_USED_PE
RS,   ((TLM.RESOURCES->DISKS[1]->`used` * 100) / TLM.RESOURCES->DISKS[1]->`capacity`) RESOURCES_DISK1_USED_PERS,   TLM.`resources-prev` RESOURCES_PREV,   ((TLM.`resources-prev`->CPU->`load` * 100) / TLM.`resources-prev`->CPU->`capacity`) RESOURCES_PREV_CPU_LOAD_PERS,   ((TLM.`resources-prev`->RAM->`used` * 100) / TLM.`resources-prev`->RAM->`capacity`) RESOURCES_PREV_RAM_USED_PERS,   ((TLM.`resources-prev`->DISKS[1]->`used` * 100) / TLM.`resources-prev`->DISKS[1]->`capacity`) RESOURCES_PREV_DISK1_USED_PERS
,   TLM.`current-time` TIMESTAMP,   TLM.ACL ACL FROM NB_TELEM_RESOURCES_S TLM INNER JOIN NUVLABOX_T NB ON ((TLM.PARENT = NB.ID)) PARTITION BY TLM.PARENT EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
consumer-messages-per-sec:         0 consumer-total-bytes:1555535269 consumer-total-messages:   1618068 messages-per-sec:         0   total-messages:    232244     last-message: 2021-03-12T07:16:54.337Z

(Statistics of the local KSQL server interaction with the Kafka topic NB_TELEM_RESOURCES_REKYED_S)

Consumer Groups summary:

Consumer Group       : _confluent-ksql-default_query_CSAS_NB_TELEM_RESOURCES_REKYED_S_637

Kafka topic          : _confluent-ksql-default_query_CSAS_NB_TELEM_RESOURCES_REKYED_S_637-Join-repartition
Max lag              : 119654

 Partition | Start Offset | End Offset | Offset | Lag
---------------------------------------------------------
 0         | 92346        | 212000     | 92346  | 119654
---------------------------------------------------------

Kafka topic          : es_nuvla-nuvlabox-status
Max lag              : 0

 Partition | Start Offset | End Offset | Offset  | Lag
-------------------------------------------------------
 0         | 1528619      | 2094206    | 2094206 | 0
-------------------------------------------------------

Kafka topic          : NUVLABOX_REKYED_S
Max lag              : 124

 Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
 0         | 0            | 2574       | 2450   | 124
------------------------------------------------------
ksql>
konstan commented 3 years ago

Update: the instance of ksqldb was running fine for about 9 hours and then the problem described above happened again.

ChenZhaobin commented 3 years ago

I met with same problem when I want to drop a stream with query to it , even not use PARTITION BY,even when drop a table,it also throws exception:io.confluent.ksql.query.QueryId cannot be cast to java.lang.Comparable