This PR allows users to set the key and timestamp of messages written to Kafka, based on a field in the data. This is similar to how the output is controlled for the redis sink. This is accomplished by adding two new optional configs for the kafka sink, sink.timestamp_field and sink.key_field.
A (somewhat silly) example of what this looks like in SQL:
create table impulse with (
connector = 'impulse',
event_rate = '10'
);
create table sink (
i BIGINT,
k TEXT,
ts TIMESTAMP NOT NULL
) with (
connector = 'kafka',
bootstrap_servers = 'localhost:9092',
format = 'json',
'sink.timestamp_field' = 'ts',
'sink.key_field' = 'k',
type = 'sink',
topic = 'timestamped'
);
insert into sink
select counter, concat('k', counter), from_unixtime(counter + 1722300386)
from impulse;
Addresses #696
This PR allows users to set the key and timestamp of messages written to Kafka, based on a field in the data. This is similar to how the output is controlled for the redis sink. This is accomplished by adding two new optional configs for the kafka sink,
sink.timestamp_field
andsink.key_field
.A (somewhat silly) example of what this looks like in SQL: