databricks / iceberg-kafka-connect

Apache License 2.0
220 stars 49 forks source link

Multi-table creation from multiple topics #254

Closed alessandrooliveira-nomad closed 5 months ago

alessandrooliveira-nomad commented 6 months ago

Is it possible to automatically create multiple tables from multiple topics in the same connector?

The configuration below does not appear to work when reading from two topics (topic_A and topic_B), and writing to two tables (glue_database.table_A and glue_database.table_B):

{
    "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
    "consumer.auto.offset.reset": "latest",
    "errors.deadletterqueue.context.headers.enable": "false",
    "iceberg.catalog": "glue",
    "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
    "iceberg.catalog.client.assume-role.arn": "xxxxxxxxxxxxxxx",
    "iceberg.catalog.client.assume-role.region": "us-east-1",
    "iceberg.catalog.client.assume-role.tags.LakeFormationAuthorizedCaller": "iceberg-connect",
    "iceberg.catalog.glue.account-id": "xxxxxxxxxxxxxxxxx",
    "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "iceberg.catalog.s3.path-style-access": "true",
    "iceberg.catalog.warehouse": "xxxxxxxxxxxxxxx",
    "iceberg.control.topic": "iceberg-control-topic-schema-account",
    "iceberg.tables.auto-create-props.write.parquet.compression-codec": "snappy",
    "iceberg.tables.auto-create-props.format-version": "2",
    "iceberg.tables.default-partition-by": "days(__source_date)",
    "iceberg.tables": "glue_database.table_A, glue_database.table_B",
    "iceberg.tables.auto-create-enabled": "true",
    "iceberg.tables.dynamic-enabled": "false",
    "iceberg.tables.evolve-schema-enabled": "true",
    "iceberg.tables.upsert-mode-enabled": "true",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.latest.version": "true",
    "key.converter.schema.registry.url": "xxxxxxxxxxxxxxxx",
    "key.converter.schemas.enable": "true",
    "name": "iceberg_sink_connector",
    "tasks.max": "1",
    "topics": "topic_A, topic_B",
    "transforms": "unwrap, timestamp, RenameField",
    "transforms.unwrap.add.fields": "op,source.table,source.db,source.schema,source.ts_ms",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.latest.version": "true",
    "value.converter.schema.registry.url": "xxxxxxxxxxxxxxxx",
    "value.converter.schemas.enable": "true",
    "transforms.timestamp.field": "__source_ts_ms",
    "transforms.timestamp.format": "yyyy-MM-dd",
    "transforms.timestamp.target.type": "Date",
    "transforms.timestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.RenameField.renames": "__source_ts_ms:__source_date",
    "iceberg.table.glue_database.table_A.route-regex": "table_A",
    "iceberg.table.glue_database.table_A.id-columns": "table_A_id",
    "iceberg.table.glue_database.table_B.route-regex": "table_B",
    "iceberg.table.glue_database.table_B.id-columns": "table_B_id"
}

It returns "Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.iceberg.types.Types$NestedField.fieldId()" because the return value of "org.apache.iceberg.Schema.findField(String)" is null".

I can guarantee that it is not an issue related to the schemas as if I create a separate connector for each topic the connector works.