This appears to be a flink bug when using kafka. The workaround would probably be to use a different timestamp. I tried reimplementing the function with just a trivial function and it still fails. Likely, this should be escalated to the flink team.
ADD JAR '/Users/henneberger/Downloads/kafka-clients-3.4.0.jar';
ADD JAR '/Users/henneberger/Downloads/flink-connector-kafka-3.2.0-1.18.jar';
ADD JAR '/Users/henneberger/sqrl/sqrl-flink-lib/sqrl-lib-common/target/sqrl-lib-common-0.5.5-SNAPSHOT.jar';
ADD JAR '/Users/henneberger/sqrl/sqrl-flink-lib/sqrl-time/target/sqrl-time-0.5.5-SNAPSHOT.jar';
CREATE TEMPORARY FUNCTION IF NOT EXISTS `EpochMilliToTimestamp` AS 'com.datasqrl.time.EpochMilliToTimestamp' LANGUAGE JAVA;
CREATE TEMPORARY TABLE `x` (
`ts` BIGINT NOT NULL,
`src_uuid` STRING NOT NULL,
`ts2` AS `EpochMilliToTimestamp`(`ts`),
WATERMARK FOR `ts2` AS `ts2` - INTERVAL '0.01' SECOND
) WITH (
'format' = 'json',
'properties.bootstrap.servers' = 'localhost:49446',
'properties.group.id' = 'id',
'topic' = 'stu',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest',
'connector' = 'kafka'
);
CREATE TEMPORARY TABLE `stu_2` (
`ts` BIGINT NOT NULL,
`src_uuid` STRING NOT NULL,
`ts2` TIMESTAMP(3) NOT NULL
) WITH (
'connector' = 'blackhole'
);
CREATE VIEW `table$2`
AS
SELECT *
FROM `x`;
EXECUTE STATEMENT SET BEGIN
INSERT INTO `stu_2`
(SELECT *
FROM `table$2`)
;
END;
This appears to be a flink bug when using kafka. The workaround would probably be to use a different timestamp. I tried reimplementing the function with just a trivial function and it still fails. Likely, this should be escalated to the flink team.
A simple reproduction would be:
./sql-client.sh -f /Users/henneberger/sqrl/issue/build/deploy/flink/src/main/resources/flink.sql
x.log