snowflakedb / snowflake-kafka-connector

Snowflake Kafka Connector (Sink Connector)
Apache License 2.0
138 stars 97 forks source link

SnowflakeJsonConverter converts decimal values to binary hex strings #684

Closed sridevs closed 4 months ago

sridevs commented 1 year ago

Snowflake Kakfa Connector version - 1.8.2

SnowflakeJsonConverter converts decimal values to binary hex strings. In the below payload, price is "ITQ=" whereas it should be 8.5.

{
  "payload": {
    "after": {
      "test_code": "NC0016",
      "test_name": "converter test 6",
      "created_by": "76",
      "created_date": "2023-08-03T01:52:12.03Z",
      "id": 42,
      "last_mod_date": "2023-08-03T01:52:12.03Z",
      "modified_by": "76",
      "price": "ITQ="
    }

Schema of price field received in the payload for reference

           {
            "field": "price",
            "name": "org.apache.kafka.connect.data.Decimal",
            "optional": true,
            "parameters": {
              "connect.decimal.precision": "19",
              "scale": "3"
            },
            "type": "bytes",
            "version": 1
          }

My config spec

 spec:
  class: com.snowflake.kafka.connector.SnowflakeSinkConnector
  tasksMax: {{ $snowflakeCommon.tasksMax }}
  config:
    buffer.flush.time: {{ $snowflakeCommon.bufferFlushTime | quote }}
    value.converter: com.snowflake.kafka.connector.records.SnowflakeJsonConverter
    topics: {{ $tableTopicMap.topicName | quote }}
    snowflake.topic2table.map: {{ printf "%s:%s" $tableTopicMap.topicName $tableTopicMap.tableName | quote }}
    snowflake.url.name: {{ $snowflakeCommon.snowflakeUrl | quote }}
    snowflake.database.name: {{ $snowflakeCommon.snowflakeDatabaseName | quote }}
    snowflake.user.name: ${secrets:{{ $release.Namespace }}/{{ $secrets.snowflakeUsername.secretName }}:{{ $secrets.snowflakeUsername.key }}}
    snowflake.private.key: ${secrets:{{ $release.Namespace }}/{{ $secrets.snowflakePrivateKey.secretName }}:{{ $secrets.snowflakePrivateKey.key }}}
    snowflake.private.key.passphrase: ${secrets:{{ $release.Namespace }}/{{ $secrets.snowflakePrivateKeyPassPhrase.secretName }}:{{ $secrets.snowflakePrivateKeyPassPhrase.key }}}
    snowflake.schema.name: {{ $schema | quote }}
    behavior.on.null.values: IGNORE
    errors.retry.timeout: 10 
    errors.retry.delay.max.ms: 180000 

Looks like kafka has a fix for this - https://issues.apache.org/jira/browse/KAFKA-8595

    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.decimal.format: NUMERIC

But unable to use this. I tried using kafka json converter, but it doesn't work. snowflake sink connector is not using this converter so the cdc doesn't get captured.

Any solution or workaround?

sfc-gh-tzhang commented 1 year ago

What's the error when you try to use org.apache.kafka.connect.json.JsonConverter?

sridevs commented 1 year ago

The streams are not being sent to snowflake. Not sure of the exact error. I found these below logs though

2023-08-04 02:00:23,238 INFO [AdminClient clientId=adminclient-10] Node 3 disconnected. (org.apache.kafka.clients.NetworkClient) [kafka-admin-client-thread | adminclient-10] 2023-08-04 02:00:23,238 INFO [AdminClient clientId=adminclient-10] Node 1 disconnected. (org.apache.kafka.clients.NetworkClient) [kafka-admin-client-thread | adminclient-10] 2023-08-04 02:00:23,390 INFO [AdminClient clientId=adminclient-10] Node 0 disconnected. (org.apache.kafka.clients.NetworkClient) [kafka-admin-client-thread | adminclient-10]

Alternatively, If I set decimal.handling.mode: double in my source connecter which is io.debezium.connector.sqlserver.SqlServerConnector, it correctly converts and sends as double.

I'm not sure why org.apache.kafka.connect.json.JsonConverter is not working. Maybe there's a compatibility issue with com.snowflake.kafka.connector.SnowflakeSinkConnector and org.apache.kafka.connect.json.JsonConverter

sfc-gh-tzhang commented 1 year ago

org.apache.kafka.connect.json.JsonConverter should work in theory, please send us your configuration as well as full logs through Snowflake support and we should be able to help you further

acristu commented 1 year ago

"ITQ=" is not hex, it is probably base64, which from what I see is the root cause for this issue: https://github.com/snowflakedb/snowflake-kafka-connector/issues/704

sfc-gh-gjachimko commented 4 months ago

closing due to inactivity. please reopen if issue isn't solved and still arises.