confluentinc / kafka-connect-storage-cloud

Kafka Connect suite of connectors for Cloud storage (Amazon S3)
Other
11 stars 329 forks source link

Null values being replaced with default #716

Open andyhuynh3 opened 7 months ago

andyhuynh3 commented 7 months ago

Hello, I'm using Debezium to extract MySQL data into Kafka in Avro format using the Confluent Avro converter. I'm then using the Confluent S3 sink to get this data into S3 as Avro files. However I'm running into an issue on the Kafka --> S3 side where my null values are being replaced with the MySQL default, even with value.converter.ignore.default.for.nullables=true. More details on setup below:

Here's what my S3 sink settings look like

{
   "connector.class":"io.confluent.connect.s3.S3SinkConnector",
   "tasks.max":"1",
   "errors.deadletterqueue.context.headers.enable":"true",
   "errors.deadletterqueue.topic.name":"db_ingestion_dead_letter_queue",
   "errors.deadletterqueue.topic.replication.factor":"1",
   "filename.offset.zero.pad.widthrotate_interval_ms":"12",
   "flush.size":"500000",
   "locale":"en",
   "partition.duration.ms":"60000",
   "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
   "path.format": "'\''year'\''=YYYY/'\''month'\''=MM/'\''day'\''=dd/'\''hour'\''=HH",
   "retry.backoff.ms":"5000",
   "rotate.interval.ms":"15000",
   "rotate.schedule.interval.ms":"60000",
   "s3.bucket.name":"my-bucket",
   "s3.part.size":"5242880",
   "s3.region":"us-west-2",
   "schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
   "schema.compability":"NONE ",
   "storage.class":"io.confluent.connect.s3.storage.S3Storage",
   "timezone":"UTC",
   "topics.dir":"developer/kafka-connect-avro/data/raw",
   "topics.regex":"dbzium\\.inventory\\..+",
   "format.class":"io.confluent.connect.s3.format.avro.AvroFormat",
   "key.converter": "io.confluent.connect.avro.AvroConverter",
   "key.converter.schema.registry.url": "http://registry:8080/apis/ccompat/v7",
   "key.converter.auto.registry.schemas": "true",
   "key.converter.ignore.default.for.nullables": "true",
   "schema.name.adjustment.mode":"avro",
   "value.converter": "io.confluent.connect.avro.AvroConverter",
   "value.converter.schema.registry.url": "http://registry:8080/apis/ccompat/v7",
   "value.converter.auto.registry.schemas": "true",
   "value.converter.ignore.default.for.nullables": "true"
}

Here's what my schema looks like:

{
  "type": "record",
  "name": "Value",
  "namespace": "dbzium.inventory.my_table",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "my_first_tinyint_col",
      "type": [
        "null",
        "boolean"
      ],
      "default": null
    },
    {
      "name": "test_str",
      "type": [
        {
          "type": "string",
          "connect.default": "test_str"
        },
        "null"
      ],
      "default": "test_str"
    },
    {
      "name": "__deleted",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "__op",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "__ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "__source_ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "__source_file",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "__source_pos",
      "type": [
        "null",
        "long"
      ],
      "default": null
    }
  ],
  "connect.name": "dbzium.inventory.my_table.Value"
}

And here's what my message looks like in Kafka:

./kaf -b kafka:9092 consume --schema-registry registry:8080/apis/ccompat/v7 dbzium.inventory.my_table
Key:         { "id": 1 }
Partition:   0
Offset:      0
Timestamp:   2024-02-07 16:35:24.59 +0000 UTC
{
  "__deleted": {
    "string": "false"
  },
  "__op": {
    "string": "c"
  },
  "__source_file": {
    "string": "1.000003"
  },
  "__source_pos": {
    "long": 746927
  },
  "__source_ts_ms": {
    "long": 1707323723000
  },
  "__ts_ms": {
    "long": 1707323724020
  },
  "id": 1,
  "my_first_tinyint_col": null,
  "test_str": null
}

And when I try to read the Avro file produced by the S3 connector via Python, this is what I'm seeing

>>> import copy, json, avro
>>> from avro.datafile import DataFileWriter, DataFileReader
>>> from avro.io import DatumWriter, DatumReader
>>> file_name = "./dbzium.inventory.my_table+0+0000000000.avro"
>>> with open(file_name, 'rb') as f:
    reader = DataFileReader(f, DatumReader())
    metadata = copy.deepcopy(reader.meta)
    schema_from_file = json.loads(metadata['avro.schema'])
    data = [r for r in reader]
    reader.close()
... 
>>> data[0]
{'id': 1, 'my_first_tinyint_col': None, 'test_str': 'test_str', '__deleted': 'false', '__op': 'c', '__ts_ms': 1707323724020, '__source_ts_ms': 1707323723000, '__source_file': '1.000003', '__source_pos': 746927}
>>> 

Notice how the value for the test_str key is the default value (also test_str) instead of None or null.

In part of the S3 connector logs, I do see ignore.default.for.nullables = false, so is this setting perhaps not taking?

[2024-02-08 00:58:35,672] INFO [kafka-to-s3|task-0] Creating S3 client. (io.confluent.connect.s3.storage.S3Storage:89)
[2024-02-08 00:58:35,673] INFO [kafka-to-s3|task-0] Created a retry policy for the connector (io.confluent.connect.s3.storage.S3Storage:170)
[2024-02-08 00:58:35,673] INFO [kafka-to-s3|task-0] Returning new credentials provider based on the configured credentials provider class (io.confluent.connect.s3.storage.S3Storage:175)
[2024-02-08 00:58:35,673] INFO [kafka-to-s3|task-0] S3 client created (io.confluent.connect.s3.storage.S3Storage:107)
[2024-02-08 00:58:42,099] INFO [kafka-to-s3|task-0] AvroDataConfig values:
    allow.optional.map.keys = false
    connect.meta.data = true
    discard.type.doc.default = false
    enhanced.avro.schema.support = true
    generalized.sum.type.support = false
    ignore.default.for.nullables = false
    schemas.cache.config = 1000
    scrub.invalid.names = false
 (io.confluent.connect.avro.AvroDataConfig:369)
[2024-02-08 00:58:42,099] INFO [kafka-to-s3|task-0] Created S3 sink record writer provider. (io.confluent.connect.s3.S3SinkTask:119)
[2024-02-08 00:58:42,100] INFO [kafka-to-s3|task-0] Created S3 sink partitioner. (io.confluent.connect.s3.S3SinkTask:121)
[2024-02-08 00:58:42,100] INFO [kafka-to-s3|task-0] Started S3 connector task with assigned partitions: [] (io.confluent.connect.s3.S3SinkTask:135)
raphaelauv commented 7 months ago

the feature is available from confluent-schema-registry release 7.3 are you using this version of the ser/deser lib in your kafka-connect ?

check the doc -> https://docs.confluent.io/platform/current/schema-registry/connect.html#null-values-replaced-with-default-values

andyhuynh3 commented 7 months ago

@raphaelauv Yes I'm using version 7.6.0. It works on the producer side (e.g. with Debezium) but the issue is with the Confluent Kafka Connect sinks. I have a PR here to address the issue.

raphaelauv commented 7 months ago

"value.converter.ignore.default.for.nullables": "true"

work with

 FROM confluentinc/cp-kafka-connect-base:7.6.0
RUN (echo 1 && yes) |confluent-hub install confluentinc/kafka-connect-s3:10.5.0

Screenshot from 2024-02-13 20-10-43

image

and the final file drop on s3 contain

 {"currency":null,"contry":"UNKNOW","_kafka_partition":0,"_kafka_offset":9,"_kafka_timestamp":1707851123401}
andyhuynh3 commented 7 months ago

I'm not using the Confluent Kafka Connect image to begin with, but I can give it a try when I have some time.

More details on my setup -- I'm working with the Strimzi base image (quay.io/strimzi/kafka:0.38.0-kafka-3.5.1) and installing version 10.5.7 of the S3 sink connector:

ARG KAFKA_VERSION="3.5.1"
ARG STRIMZI_VERSION="0.38.0"

FROM quay.io/strimzi/kafka:${STRIMZI_VERSION}-kafka-${KAFKA_VERSION}

ARG CONFLUENT_S3_SINK_VERSION="10.5.7"
ENV CONFLUENT_S3_SINK_VERSION=${CONFLUENT_S3_SINK_VERSION}
RUN mkdir -p /opt/kafka/plugins/s3
RUN curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3/versions/${CONFLUENT_S3_SINK_VERSION}/confluentinc-kafka-connect-s3-${CONFLUENT_S3_SINK_VERSION}.zip -o /tmp/confluentinc-kafka-connect-s3-${CONFLUENT_S3_SINK_VERSION}.zip
RUN cd /tmp && unzip /tmp/confluentinc-kafka-connect-s3-${CONFLUENT_S3_SINK_VERSION}.zip
RUN cp -r /tmp/confluentinc-kafka-connect-s3-${CONFLUENT_S3_SINK_VERSION}/lib /opt/kafka/plugins/s3/
RUN rm /tmp/confluentinc-kafka-connect-s3-${CONFLUENT_S3_SINK_VERSION}.zip && rm -rf /tmp/confluentinc-kafka-connect-s3-${CONFLUENT_S3_SINK_VERSION}

I do have "value.converter.ignore.default.for.nullables": "true" with this setup, but I still get nulls being replaced with the default in the S3 files.

raphaelauv commented 7 months ago

Check the version of the schema-registry libs jar in that container image