confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
128 stars 1.04k forks source link

Update tables and streams without losing consumer groups, intermediate topics or states #9793

Closed paolo-bruno closed 1 year ago

paolo-bruno commented 1 year ago

Provide details of the setup you're running ksql v0.28.2

Outline your question When a KSQL stream/table needs to be updated, a DROP stream/table may be required. A DROP statement removes consumer group related to the entity dropped and intermediate topics (for example changelog topics). Then, the stream/table can be recreated and it can go back to reading the topic of interest from earliest or latest offset (it depends on variable auto.offset.reset).

During a deployment, some messages may be read multiple times ('auto.offset.reset' = 'earliest'), some messages may be lost ('auto.offset.reset' = 'latest').

How to maintain consumer group? And internal topics? There is a well-defined approach to solve these situations when a stream/table is updated?

mjsax commented 1 year ago

You can use CREATE OR REPLACE to keep the internal state of the query. -- If create-or-replace does not work because of incompatibilities, it's impossible to re-use internal topics anyway (well, because it's an incompatible change). Thus, you might want to reprocess from beginning anyway to re-build state. We could conceptually help to filter duplicate output, but it's very hard to automate. You could try to specify a filter based on ROW_PARTITION and ROW_OFFSET to drop already processed records.

We could improve that one can specify a specific start offset, and thus allow to re-use the last committed offsets when the DROP happens. But internal state cannot be re-used for this case.

Closing this question for now. Feel free to follow up.

paolo-bruno commented 1 year ago

Would it be possible to define the exact name of consumer group associated with a stream/table? For example ...query_CTAS_TASK_PERSISTED_FLATTENED instead of ...query_CTAS_TASK_PERSISTED_FLATTENED_413.

Moreover, it would not be possible to maintain info about consumed offsets of the input topic read by the current stream/table?

Given an example stream:

CREATE STREAM tasks_persisted  WITH (kafka_topic = 'input') AS
  SELECT EXPLODE(PERSIST) AS task
  FROM tasks
EMIT CHANGES;

CREATE STREAM tasks_persisted_flattened AS
  SELECT
    task->entityReference AS entityReference,
    task->entity AS entity
  FROM tasks_persisted
  WHERE task->entity->positionInfo->siteCode IS NOT NULL
EMIT CHANGES;

We could maintain offsets related to input topic in consumer group ...query_CTAS_TASK_PERSISTED_FLATTENED and after an update of the last stream (tasks_persisted_flattened ), the application could go back to reading from the first unconsumed message?

Thanks in advance.

mjsax commented 1 year ago

Would it be possible to define the exact name of consumer group associated with a stream/table?

No. That's not supported.

We could maintain offsets related to input topic in consumer group ...query_CTAS_TASK_PERSISTED_FLATTENED and after an update of the last stream (tasks_persisted_flattened ), the application could go back to reading from the first unconsumed message?

That also not supported.

I guess, both is conceptually possible to build. But it's not available in ksqlDB as of now. Sorry.

paolo-bruno commented 1 year ago

Are you planning to add these or similar features in the roadmap

Rohit-Singh3 commented 6 months ago

@mjsax I'm facing an issue while adding a filter based on ROWOFFSET and ROWPARTITION to skip the already processed data in the create or replace statement.

PROBLEM? My problem statement is i want to read the data from a particular offset across the partitions, or from a particular timestamp from a re-playability point of view. but while using ROWPARTITON, ROWOFFSET, and ROWTIME I'm not able to achieve this because I'm not getting the option to add ROWPARTITON, ROWOFFSET, and ROWTIME in the stream created using create or replace from select as

Below are the two streams that I have created, one is the source stream for the source topic and another is the derived stream for the output topic.


-- source stream

CREATE STREAM Source_Stream ( ID STRING, Client STRING, WebName STRING, ActionType STRING, ActionDate BIGINT, CustomerPin BIGINT ) WITH ( kafka_topic='source-topic', value_format='JSON' );

-- derived stream

CREATE Filtered_Data_Stream WITH (kafka_topic='derived-topic') AS SELECT ID AS KEY, AS_VALUE(ID) AS ID, Client, WebName, ActionType, ActionDate, CustomerPin FROM SourceStream WHERE Client IN ('client1', 'client2') PARTITION BY ID;

What is observed when using ROWPARTITION, ROWOFFSET, and ROWTIME

-- queried on source stream select ROWPARTITION AS partition, ROWOFFSET AS offset, ROWTIME as timestamp, * from Source_Stream emit changes;

-- queried on derived stream which is created using AS SELECT from Source_Stream. select ROWPARTITION AS partition, ROWOFFSET AS offset, ROWTIME as timestamp, * from Filtered_Data_Stream emit changes;

THE PROBLEM IS THAT THE OFFSET WILL BE DIFFERENT IN BOTH TOPICS SO I AM NOT ABLE TO FILTER THE DATA FROM THE INPUT TOPIC, THAT ARE ALREADY PROCESSED.

Also, I'm not getting the option to update the input stream and add filter conditions on the basis of offset and partitions. CREATE OR REPLACE STREAM Source_Stream ( ID STRING, Client STRING, WebName STRING, ActionType STRING, ActionDate BIGINT, CustomerPin BIGINT ) WITH ( kafka_topic='source-topic', value_format='JSON' ) where ROWOFFSET>=1628652;

WHEN executing the above query getting an error;

[Error #40001] statement_error ") where ROWOFFSET>=1628652;" line 11:3: Syntax Error Expecting ';'

Please suggest the solution around it.

One question I want to ask if we stop the query using PAUSE why its consumer group state is still showing active or RUNNING? it should be in an empty or STOPPED state.