confluentinc / kafka-connect-storage-common

Shared software among connectors that target distributed filesystems and cloud storage.
Other
5 stars 155 forks source link

Expose `ignore.default.for.nullables` #342

Open andyhuynh3 opened 7 months ago

andyhuynh3 commented 7 months ago

Problem

https://github.com/confluentinc/schema-registry/pull/2326 introduced the ignore.default.for.nullables Avro converter config property. However the storage connectors currently cannot take advantage of it as it's not an exposed config. For example, when using the S3 sink connector, null values are still being replaced with defaults as detailed in this issue. Because this config is currently not exposed, ignore.default.for.nullables will always come in with the default of false:

[2024-02-08 00:58:35,672] INFO [kafka-to-s3|task-0] Creating S3 client. (io.confluent.connect.s3.storage.S3Storage:89)
[2024-02-08 00:58:35,673] INFO [kafka-to-s3|task-0] Created a retry policy for the connector (io.confluent.connect.s3.storage.S3Storage:170)
[2024-02-08 00:58:35,673] INFO [kafka-to-s3|task-0] Returning new credentials provider based on the configured credentials provider class (io.confluent.connect.s3.storage.S3Storage:175)
[2024-02-08 00:58:35,673] INFO [kafka-to-s3|task-0] S3 client created (io.confluent.connect.s3.storage.S3Storage:107)
[2024-02-08 00:58:42,099] INFO [kafka-to-s3|task-0] AvroDataConfig values:
    allow.optional.map.keys = false
    connect.meta.data = true
    discard.type.doc.default = false
    enhanced.avro.schema.support = true
    generalized.sum.type.support = false
    ignore.default.for.nullables = false
    schemas.cache.config = 1000
    scrub.invalid.names = false
 (io.confluent.connect.avro.AvroDataConfig:369)
[2024-02-08 00:58:42,099] INFO [kafka-to-s3|task-0] Created S3 sink record writer provider. (io.confluent.connect.s3.S3SinkTask:119)
[2024-02-08 00:58:42,100] INFO [kafka-to-s3|task-0] Created S3 sink partitioner. (io.confluent.connect.s3.S3SinkTask:121)
[2024-02-08 00:58:42,100] INFO [kafka-to-s3|task-0] Started S3 connector task with assigned partitions: [] (io.confluent.connect.s3.S3SinkTask:135)

Solution

Expose the the ignore.default.for.nullables option so that it can be configured.

Does this solution apply anywhere else?
If yes, where?

Test Strategy

I rebuilt the kafka-connect-storage-core-11.2.4.jar with the included changes in this PR, then ran some manual test with the S3 connector to confirm that the option takes. Here's what my S3 sink settings look like:

{
   "connector.class":"io.confluent.connect.s3.S3SinkConnector",
   "tasks.max":"1",
   "errors.deadletterqueue.context.headers.enable":"true",
   "errors.deadletterqueue.topic.name":"db_ingestion_dead_letter_queue",
   "errors.deadletterqueue.topic.replication.factor":"1",
   "filename.offset.zero.pad.widthrotate_interval_ms":"12",
   "flush.size":"500000",
   "locale":"en",
   "partition.duration.ms":"60000",
   "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
   "path.format": "'\''year'\''=YYYY/'\''month'\''=MM/'\''day'\''=dd/'\''hour'\''=HH",
   "retry.backoff.ms":"5000",
   "rotate.interval.ms":"15000",
   "rotate.schedule.interval.ms":"60000",
   "s3.bucket.name":"my-bucket",
   "s3.part.size":"5242880",
   "s3.region":"us-west-2",
   "schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
   "schema.compability":"NONE ",
   "storage.class":"io.confluent.connect.s3.storage.S3Storage",
   "timezone":"UTC",
   "topics.dir":"developer/kafka-connect-avro/data/raw",
   "topics.regex":"dbzium\\.inventory\\..+",
   "format.class":"io.confluent.connect.s3.format.avro.AvroFormat",
   "key.converter": "io.confluent.connect.avro.AvroConverter",
   "key.converter.schema.registry.url": "http://registry:8080/apis/ccompat/v7",
   "key.converter.auto.registry.schemas": "true",
   "key.converter.ignore.default.for.nullables": "true",
   "schema.name.adjustment.mode":"avro",
   "value.converter": "io.confluent.connect.avro.AvroConverter",
   "value.converter.schema.registry.url": "http://registry:8080/apis/ccompat/v7",
   "value.converter.auto.registry.schemas": "true",
   "value.converter.ignore.default.for.nullables": "true",
   "ignore.default.for.nullables": "true"
}

After starting the connector, I see that the ignore.default.for.nullables setting was correctly applied based on the logs below:

[2024-02-08 17:05:36,672] INFO [kafka-to-s3|task-0] Creating S3 client. (io.confluent.connect.s3.storage.S3Storage:89)
[2024-02-08 17:05:36,672] INFO [kafka-to-s3|task-0] Created a retry policy for the connector (io.confluent.connect.s3.storage.S3Storage:170)
[2024-02-08 17:05:36,672] INFO [kafka-to-s3|task-0] Returning new credentials provider based on the configured credentials provider class (io.confluent.connect.s3.storage.S3Storage:175)
[2024-02-08 17:05:36,672] INFO [kafka-to-s3|task-0] S3 client created (io.confluent.connect.s3.storage.S3Storage:107)
[2024-02-08 17:05:36,921] INFO [kafka-to-s3|task-0] AvroDataConfig values:
    allow.optional.map.keys = false
    connect.meta.data = true
    discard.type.doc.default = false
    enhanced.avro.schema.support = true
    generalized.sum.type.support = false
    ignore.default.for.nullables = true
    schemas.cache.config = 1000
    scrub.invalid.names = false
 (io.confluent.connect.avro.AvroDataConfig:369)
[2024-02-08 17:05:36,921] INFO [kafka-to-s3|task-0] Created S3 sink record writer provider. (io.confluent.connect.s3.S3SinkTask:119)
[2024-02-08 17:05:36,921] INFO [kafka-to-s3|task-0] Created S3 sink partitioner. (io.confluent.connect.s3.S3SinkTask:121)
[2024-02-08 17:05:36,921] INFO [kafka-to-s3|task-0] Started S3 connector task with assigned partitions: [] (io.confluent.connect.s3.S3SinkTask:135)
Testing done:

Release Plan

cla-assistant[bot] commented 7 months ago

CLA assistant check
All committers have signed the CLA.

cla-assistant[bot] commented 7 months ago

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

raphaelauv commented 7 months ago

I don't understand why we need to add an option ignore.default.for.nullables here

do you know why the deser of the confluent-schema-registry lib with the option value.converter.ignore.default.for.nullables is not working ?

andyhuynh3 commented 7 months ago

I do not, but I suppose it's similar to why this PR is in place to expose the scrub.invalid.names config.

The config takes and does work with producers (e.g. Debezium), but I wasn't able to get it working with the S3 sink until I introduced the changes in this PR