tabular-io / iceberg-kafka-connect

Apache License 2.0
171 stars 31 forks source link

Classic Datalake Use Case. Is it supported? #190

Open nicodeg87 opened 5 months ago

nicodeg87 commented 5 months ago

Hi guys,

Im trying to understand if this connector can be used for a common scenario loading data from Kafka into Iceberg. Scenario:

  1. Each kafka topic should have its own Iceberg table.
  2. Each kafka topic has its schema definition on Schema Registry (ex. Confluent Schema Registry).
  3. Ideally the configuration should be dynamic, for example, the topics starting with some prefix will be loaded into Iceberg so we do not need to change configuration every time a new topic is added (tables could be named according topic name).
  4. I understand schema evolution is already automated, just to confirm this point in the sense that it will be synced according the structure registered on Schema Registry.

According the config documentation and examples provided It looks it is not supported, but I wanted to double check just to be sure (the support for merging several topics into one table looks far more complex than one topic/one table use case which is the most common data architecture).

Thanks in advance! Nicolas.

bryanck commented 5 months ago

Yes this use case is supported. You will need a field in the record that specifies the table when using dynamic routing, though we have considered adding an option to use the topic name.

nicodeg87 commented 5 months ago

thanks for the reply. Yes, definitely topic name would be a convenient way for this use case as having a field with string on every records is not optimal from storage perspective (also producers might do mistakes with this fields routing messages to a wrong table).

Sorry, another quick question: I did not see a setting to specify Parquet/Avro format on created iceberg tables. Is it Parquet supported only?

Thanks. Nicolas.

bryanck commented 5 months ago

You can define any table property to set for auto create by setting iceberg.tables.auto-create-props.* configs, e.g. set iceberg.tables.auto-create-props.write.format.default to parquet, orc, or avro to set the file format.

ArkaSarkar19 commented 2 months ago

Hi Team I also have a similar use case, how can we set the auto-create properties for a hive catalog. I have tried setting the iceberg.tables.auto-create-enabled, alongside the iceberg.catalog.warehouse, iceberg.catalog.uri. The connector works perfectly fine on existing iceberg tables on the same cluster but can't seem to create a new table on its own.

Can someone point me to the necessary configs required for the above.

Also a quick question, how can we specify the dataset-tags during autocreation of the table ?

This is the connector config I am using

` { "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector", "errors.log.include.messages": "true", "consumer.override.bootstrap.servers": "[bootstrap-server-url]", "tasks.max": "1", "topics": "test_topic", "iceberg.control.commit.interval-ms": "60000", "iceberg.control.topic": "test_control", "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy", "value.converter.schema.registry.url": "[SR-url]", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "[SR-url]", "iceberg.tables": "test.iceberg_sink_test_4", "name": "iceberg_sink_connector", "errors.log.enable": "true", "iceberg.catalog.type": "hive", "iceberg.catalog.uri": "[thrift-url]", "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", "iceberg.catalog.s3.region": "us-east-1", "iceberg.catalog.s3.sse.key": "AES256", "iceberg.catalog.s3.sse.type": "s3", "iceberg.catalog": "catalog", "iceberg.catalog.warehouse": "s3://XXX/iceberg_sink_test_4/", "iceberg.catalog.s3.bucket.name": "XXX/iceberg_sink_test_4/", "iceberg.tables.auto-create-enabled": "true", "iceberg.tables.evolve-schema-enabled":"true", "iceberg.tables.auto-create-props.location" : "s3://XXXX/iceberg_sink_test_4", "iceberg.tables.auto-create-props.write.data.path" :"s3://XXXX/iceberg_sink_test_4/data"

} `