DataSQRL / sqrl

Flexible development framework for building streaming data applications in SQL with Kafka, Flink, Postgres, GraphQL, and more.
https://www.datasqrl.com/
97 stars 14 forks source link

Timestamp precision in avro throws 'Unsupported to derive Schema' #793

Open henneberger opened 2 months ago

henneberger commented 2 months ago

Avro does not support timestamp with local time zone. our functions convert to local time zone.

"logicalType": "timestamp-millis" in avro causes precision error when writing to kafka/jdbc.

Throws:

Unsupported to derive Schema for type: TIMESTAMP_LTZ(3)
flink-job-submitter-1  | Caused by: java.lang.UnsupportedOperationException: Unsupported to derive Schema for type: TIMESTAMP_LTZ(3) NOT NULL
flink-job-submitter-1  |        at org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:429)
flink-job-submitter-1  |        at org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:400)
flink-job-submitter-1  |        at org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:306)
flink-job-submitter-1  |        at org.apache.flink.formats.avro.AvroRowDataSerializationSchema.<init>(AvroRowDataSerializationSchema.java:60)
flink-job-submitter-1  |        at org.apache.flink.formats.avro.AvroFormatFactory$2.createRuntimeEncoder(AvroFormatFactory.java:90)
flink-job-submitter-1  |        at org.apache.flink.formats.avro.AvroFormatFactory$2.createRuntimeEncoder(AvroFormatFactory.java:85)
flink-job-submitter-1  |        at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createSerialization(KafkaDynamicSink.java:388)
henneberger commented 2 months ago

Flink 1.19 avro format now supports a timestamp_mapping.legacy.

Use the legacy mapping of timestamp in avro. Before 1.19, The default behavior of Flink wrongly mapped both SQL TIMESTAMP and TIMESTAMP_LTZ type to AVRO TIMESTAMP.
The correct behavior is Flink SQL TIMESTAMP maps Avro LOCAL TIMESTAMP and Flink SQL TIMESTAMP_LTZ maps Avro TIMESTAMP, you can obtain the correct mapping by disable using this legacy mapping.
Use legacy behavior by default for compatibility consideration.
henneberger commented 1 month ago

This flag has been added. It is for the 'avro' format only, so the configuration value will be avro.timestamp_mapping.legacy, the 'avro-confluent' does not yet support this flag. See https://github.com/apache/flink/pull/25439.

henneberger commented 1 day ago

Until flink adds the avro.timestamp_mapping.legacy to avro-confluent, the workaround is to remove the "logicalType": "timestamp-millis" logical type in the avro schema, which will deserialize the timestamp as epoch time. If a timestamp type is needed, it can be added to the metadata field, or converted in sql.