Open TheDarkFlame opened 1 year ago
Can you check the topic's retention time? The broker maybe deleting records after they expire
This isn't a topic retention issue.
This can also be replicated by sending a bunch of historical data in (record replay), with the Timestamp being provided by a field in the message.
eg: I took 2 hours worth of data from a partition, and used kcat to send it all into a topic, however when sending messages in a live-streaming fashion, this works.
Let me know if there's any other information I can provide to help clear things up?
What is your auto.offset.reset
config? By default, it's set to "latest" -- you might need to change it to "earliest" using SET
before starting the query?
The other question is: do you expect that every input record produces an update record in the changelog for a window? If yes, it seems to be related to caching and you might want to disable caching for the query via SET
changing the cache size to zero.
Last: if you reply old data, if the extacted timestamps from the payload are older than output topic retention time, the data might get purged from the output topic quickly. Retention compare "broker wall-clock" to "event-time".
What is your
auto.offset.reset
config? By default, it's set to "latest" -- you might need to change it to "earliest" usingSET
before starting the query?
This is set to earliest. Worth noting that the transformation is as follows:
-> Set parameters, including auto offset reset to earliest. -> Source stream(topic: source_topic) -> transform as necessary, and copy into new stream 'Common' (topic: copy_topic) -> aggregate into table with emit final and small window, window is small enough that some events should be emitted by themselves. (topic: table_topic) -> create stream 'aggregations' on top of table topic (topic: table_topic) -> transform for final cleanup into stream 'output' (topic: output)
Running in interactive mode, the transform from source to common works fine and populates events for the entire set of data that flows through, but the table either does not populate, or grabs an event close to the end of the time series and emits only that (haven't figured out the exact behaviour here). It definitely doesn't produce anywhere close to the expected set of events. At first I thought this was an issue of sessions not closing properly due to parallelism, but I set parallelism on all topics to 1 in order to rule this out.
The other question is: do you expect that every input record produces an update record in the changelog for a window? If yes, it seems to be related to caching and you might want to disable caching for the query via
SET
changing the cache size to zero.I expect the following: A single emit for each session aggregated. I know the windows close because the data is a heartbeat with some less frequent events occurring, and the session windows are set well below the heartbeat interval (I'm talking about window of 10s and a heartbeat interval of 5 minutes, with grace period set to 5 or 0).
This is the behaviour I see when working with real-time streamed data.
Notably, looking at the server logs, I do not see any messages regarding messages being skipped because of the timestamp being too old. I do however see these messages when attempt to send messages into the system with older timestamps. This confirms that the stream time does indeed progress as expected.
Last: if you reply old data, if the extacted timestamps from the payload are older than output topic retention time, the data might get purged from the output topic quickly. Retention compare "broker wall-clock" to "event-time".
While this may be true, I'm not sure that kafka is purging the data? On account of the data transforming fine using a standard non-aggregating transform. However, I may be missing some information here in your response.
I should re-iterate that I'm using a field for the timestamp, not the broker's timestamp field, I'm not sure that kafka's retention here is a factor, as it isn't aware of the messages being expired, since the message header's time should be the wall time.
I'll spend a little bit of time coming up with a simplified example sql and some dummy data in a following response.
I've spent some time compiling a test. I've added the three relevant files below.
Tests to be run as follows:
kcat -C -b localhost:29092 -t <topic>
)kcat -P -b localhost:29092 -t replay -l -p 0 replay.jsonl
correlated
datagen.sh
correlated
If you prefer, you could use the datagen.sh
to produce more recent input data by generating replay.jsonl
by running the following:
datagen.sh > replay.jsonl
When replaying data (by sending old data in, or by starting ksqldb after data exists such as during a ksqldb restart) the data does not produce correctly to its underlying topic in the same way as when running with live data.
SET 'auto.offset.reset' = 'earliest';
-- read in source for replay
CREATE SOURCE STREAM IF NOT EXISTS REPLAY (
`data` VARCHAR,
`session_id` VARCHAR,
`event_time` VARCHAR
)
WITH (KAFKA_TOPIC='replay', VALUE_FORMAT='JSON', TIMESTAMP='`event_time`', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX', PARTITIONS=1);
-- read in source for live
CREATE SOURCE STREAM IF NOT EXISTS LIVE (
`data` VARCHAR,
`session_id` VARCHAR,
`event_time` VARCHAR
)
WITH (KAFKA_TOPIC='live', VALUE_FORMAT='JSON', TIMESTAMP='`event_time`', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX', PARTITIONS=1);
-- perform basic stream merge between replay and live
CREATE STREAM IF NOT EXISTS ALL_INPUT
WITH (KAFKA_TOPIC='input', VALUE_FORMAT='JSON', TIMESTAMP='`event_time`', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX', partitions=1)
AS SELECT
`data`,
`session_id`,
`event_time`
FROM REPLAY;
INSERT INTO ALL_INPUT
SELECT `data`, `session_id`, `event_time`
FROM LIVE;
-- aggregate into table
CREATE TABLE IF NOT EXISTS CORRELATED_EVENTS_TABLE
WITH (KAFKA_TOPIC='correlated', VALUE_FORMAT='JSON', TIMESTAMP='`earliest_timestamp`', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX', partitions=1) AS
SELECT
EARLIEST_BY_OFFSET(`event_time`) as `earliest_timestamp`,
COLLECT_LIST(STRUCT(`data` := `data`, `session_id` := `session_id`, `event_time` := `event_time`)) as `events`,
COUNT(*) as `num_rows`,
`session_id` as `session_id`
FROM ALL_INPUT
WINDOW SESSION (4 SECONDS, GRACE PERIOD 0 SECONDS)
GROUP BY `session_id`
EMIT FINAL;
-- create a source stream over the table changelog topic
CREATE SOURCE STREAM IF NOT EXISTS CORRELATED_EVENTS_STREAM(
`earliest_timestamp` VARCHAR,
`events` ARRAY<STRUCT<`data` VARCHAR, `session_id` VARCHAR, `event_time` VARCHAR>>,
`num_rows` INT
) WITH (KAFKA_TOPIC='correlated', TIMESTAMP='`earliest_timestamp`', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX', VALUE_FORMAT='JSON', partitions=1);
-- perform cleanup of output and send it on output topic
CREATE STREAM IF NOT EXISTS output_stream
WITH (KAFKA_TOPIC='output', VALUE_FORMAT='JSON', TIMESTAMP='`earliest_timestamp`', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ssX', partitions=1)
AS SELECT
`earliest_timestamp`,
`events`
FROM correlated_events_stream;
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:00:20Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:00:25Z"}
{"data":"lorem ipsum","session_id":"2","event_time":"2023-01-10T08:00:30Z"}
{"data":"lorem ipsum","session_id":"2","event_time":"2023-01-10T08:00:35Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:00:40Z"}
{"data":"lorem ipsum","session_id":"3","event_time":"2023-01-10T08:00:45Z"}
{"data":"lorem ipsum","session_id":"4","event_time":"2023-01-10T08:00:50Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:00:55Z"}
{"data":"lorem ipsum","session_id":"2","event_time":"2023-01-10T08:01:00Z"}
{"data":"lorem ipsum","session_id":"3","event_time":"2023-01-10T08:01:05Z"}
{"data":"lorem ipsum","session_id":"4","event_time":"2023-01-10T08:01:10Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:01:15Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:01:20Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:01:25Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:01:30Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:01:35Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:01:40Z"}
{"data":"lorem ipsum","session_id":"2","event_time":"2023-01-10T08:01:45Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:01:50Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:01:55Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:02:00Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:02:05Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:02:10Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:02:15Z"}
{"data":"lorem ipsum","session_id":"2","event_time":"2023-01-10T08:02:20Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:02:25Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:02:30Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:02:35Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:02:40Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:02:45Z"}
{"data":"lorem ipsum","session_id":"2","event_time":"2023-01-10T08:02:50Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:02:55Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:03:00Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:03:05Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:03:10Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:03:15Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:03:20Z"}
{"data":"lorem ipsum","session_id":"2","event_time":"2023-01-10T08:03:25Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:03:30Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:03:35Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:03:40Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:03:45Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:03:50Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:03:55Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:04:00Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:04:05Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:04:10Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:04:15Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:04:20Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:04:25Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:04:30Z"}
{"data":"lorem ipsum","session_id":"2","event_time":"2023-01-10T08:04:35Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:04:40Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:04:45Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:04:50Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:04:55Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:05:00Z"}
{"data":"lorem ipsum","session_id":"2","event_time":"2023-01-10T08:05:05Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:05:10Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:05:15Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:05:20Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:05:25Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:05:30Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:05:35Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:05:40Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:05:45Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:05:50Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:05:55Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:06:00Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:06:05Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:06:10Z"}
{"data":"lorem ipsum","session_id":"2","event_time":"2023-01-10T08:06:15Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:06:20Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:06:25Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:06:30Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:06:35Z"}
{"data":"lorem ipsum","session_id":"2","event_time":"2023-01-10T08:06:40Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:06:45Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:06:50Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:06:55Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:07:00Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:07:05Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:07:10Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:07:15Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:07:20Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:07:25Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:07:30Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:07:35Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:07:40Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:07:45Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:07:50Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:07:55Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:08:00Z"}
{"data":"lorem ipsum","session_id":"2","event_time":"2023-01-10T08:08:05Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:08:10Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:08:15Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:08:20Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:08:25Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:08:30Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:08:35Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:08:40Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:08:45Z"}
{"data":"lorem ipsum","session_id":"1","event_time":"2023-01-10T08:08:50Z"}
{"data":"lorem ipsum","session_id":"2","event_time":"2023-01-10T08:08:55Z"}
#!/bin/sh
for i in {0 ... 10}
do
echo "{\"data\":\"lorem ipsum\",\"session_id\":\"1\",\"event_time\":\"$(date -u +"%Y-%m-%dT%H:%M:%SZ")\"}" | kcat -P -b localhost:29092 -t
sleep 5
done
@suhas-satish @mjsax
Have you managed to find time to review the above re-created results?
Describe the bug Using a session window (other window types untested), backlog messages are not properly loaded into the table as when processing live.
To Reproduce The following two scenarios detail: Scenario 1 (unexpected result):
Result: messages from during the gap are not loaded into the table's aggregation topic as would be expected
Scenario 2 (expected result):
Result: messages are loaded into the table's aggregation topic as would be expected, with aggregations processing as normal
Expected behavior All messages from session windows should be output into the table's underlying changelog topic.
Actual behaviour If producing messages before starting ksqldb, they are not output to the table's topic, otherwise, during normal operation, messages are output.
Quirks:
This also appears to occur when producing a large number of messages onto a topic all at once for data replay over a period of time significantly greater than that of the session window.
Additional context Other potentially valuable information: