tabular-io / iceberg-kafka-connect

Apache License 2.0
203 stars 46 forks source link

How about adding some type hinting options on the type inference of schema evolution. #141

Closed okayhooni closed 11 months ago

okayhooni commented 11 months ago

If the ingestion case with schema (it could be provided with some schema registry), it can be easy to infer typing of newly added field.

But, there is no schema information provided, (aka, the schema-less ingestion case) it can be hard to infer the field type.

I'm struggling to flatten and load web logs with deeply nested data fields, without a schema info.

But, I got the issues like below. ex 1)

java.lang.IllegalArgumentException: Cannot convert to long: java.util.ArrayList

ex 2)

java.lang.IllegalArgumentException: Cannot convert to boolean: java.lang.Long

ex 3)

Caused by: org.apache.kafka.connect.errors.DataException: An error occurred converting record, topic: weblog-common, partition, 29, offset: 1816637121
    at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:73)
    at io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$5(Worker.java:197)
    at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4204)
    at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:195)
    at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:184)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:171)
    at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:150)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
    ... 11 more
Caused by: java.lang.IllegalArgumentException
    at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:129)
    at io.tabular.iceberg.connect.data.RecordConverter.convertListValue(RecordConverter.java:275)
    at io.tabular.iceberg.connect.data.RecordConverter.convertValue(RecordConverter.java:117)
    at io.tabular.iceberg.connect.data.RecordConverter.lambda$convertToStruct$0(RecordConverter.java:184)
    at java.base/java.util.LinkedHashMap.forEach(LinkedHashMap.java:721)
    at io.tabular.iceberg.connect.data.RecordConverter.convertToStruct(RecordConverter.java:168)
    at io.tabular.iceberg.connect.data.RecordConverter.convertStructValue(RecordConverter.java:155)
    at io.tabular.iceberg.connect.data.RecordConverter.convert(RecordConverter.java:98)
    at io.tabular.iceberg.connect.data.RecordConverter.convert(RecordConverter.java:93)
    at io.tabular.iceberg.connect.data.IcebergWriter.convertToRow(IcebergWriter.java:82)
    at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:62)
    ... 19 more

I bypassed those issues, by re-creating table with pre-defined STRING type field, instead of auto-inferred field type, like BIGINT, or BOOLEAN

If some type-hinting options like iceberg.tables.prefer-string-field-type-inference or iceberg.tables.force-string-field-type-inference is added, I would really appreciate to that option.

okayhooni commented 11 months ago

@bryanck

Is there any plan to support this use case on type inference with some detailed option..?

I guess it can be done via some modifications on the inferIcebergType method of SchemaUtils Class and SchemaUpdate class

If there is no plan to support it, could I contribute some modifications on the type inference logic and add some options..?

I really hesitate to maintain our own fork of this repository, in order to follow up very fast updates and frequent releases on this repository. and actually I look forward some DMS/debezium related features you are developing now..!

We currently use DMS with redshift sink to CDC ingestion from MySQL RDS. But, if this tabular-iceberg connector has features compatible with those CDC use case, I plan to migrate our current CDC pipeline to Iceberg table on S3 (NOT REDSHIFT!) with your beautiful connector!

bryanck commented 11 months ago

Right now we're focusing on bug fixes while we move the sink to the Iceberg project, once moved you can open a PR there for consideration. Until then, you could use an SMT to convert values to string, or generate a record schema.

okayhooni commented 11 months ago

Thanks for reply :) I will open the issue/MR after this connector repository is migrated to iceberg project!

In my case, I need to apply those type mapping logic only on the newly added fields, NOT the existing fields with proper type mapping(manually type-inferred by human..)

But, I think SMT method cannot differentiate the field is newly added or not.

We already use schema registry to maintain record schema on the topics recently added.. But we can't apply those schema constraints on some legacy web logs.. because front-engineers prefer to use lots of nesting keys with same name and different value types, as they've been doing..!