snowflakedb / snowflake-kafka-connector

Snowflake Kafka Connector (Sink Connector)
Apache License 2.0
137 stars 98 forks source link

Avro decimal #897

Open dechants opened 1 month ago

dechants commented 1 month ago


I have the following settings (snowflakeinc/snowflake-kafka-connector:2.2.2):

class: com.snowflake.kafka.connector.SnowflakeSinkConnector
  snowflake.ingestion.method: "SNOWPIPE_STREAMING"
  snowflake.enable.schematization: "true"
  value.converter.schemas.enable: "true"
  value.converter: io.confluent.connect.avro.AvroConverter
  value.converter.schema.registry.url: http://schemaregistry.kafka.svc.cluster.local:8081

My schema:

    "type": "record",
    "name": "ConnectDefault",
    "namespace": "io.confluent.connect.avro",
    "fields": [
            "name": "TENANTID",
            "type": [
            "default": null
            "name": "INCREMENTALKEY",
            "type": {
                "type": "bytes",
                "scale": 0,
                "precision": 19,
                "connect.version": 1,
                "connect.parameters": {
                    "scale": "0",
                    "connect.decimal.precision": "19"
                "": "",
                "logicalType": "decimal"

The connector creates the table, however INCREMENTALKEY is VARCHAR(16777216).

How can I make sure that the connector automatically creates the table in Snowflake and "maps" numeric values correctly?

sfc-gh-mbobowski commented 3 weeks ago

Hello @dechants and sorry for the delay.

Mapping Avro bytes/decimal into Snowflake VARCHAR was added in this PR and the reason for doing that was the difference in precision between the types.

There are two solutions I can imagine: 1) Check the value of connect.decimal.precision and adjust the Snowflake type, however, I don't know if it is possible to access precision from the code easily. 2) Create a parameter that would switch between VARCHAR and NUMBER. The risk of precision mismatch would be on the user.

@sfc-gh-xhuang what do you think?

dechants commented 3 weeks ago

@sfc-gh-mbobowski no worries, thank you for your reply.

Could you please explain both options and provide a configuration example?

The source is a COTS application managed by another team, so there is no chance that I could make changes there. However, I know that the field is a primary key (integer) which is defined as NUMBER in the source Oracle database without precision and scale. The JDBC source connector is configured with numeric.mapping = best_fit (Confluent doc).