tabular-io / iceberg-kafka-connect

Apache License 2.0
169 stars 31 forks source link

Auto Create Table using Hive catalog not working due to unwanted slash before table name in the s3 location #237

Open ArkaSarkar19 opened 2 months ago

ArkaSarkar19 commented 2 months ago

Hi Team

I have a use case to land data from a Kafka stream to an iceberg table using this connector, similar to a classic S3 lander but in Iceberg tables. Therefore, I need this connector to create the iceberg table on the Hive Metastore automatically.

How can we set the auto-create properties for a hive catalog. I have tried setting the iceberg.tables.auto-create-enabled, alongside the iceberg.catalog.warehouse, iceberg.catalog.uri. The connector works perfectly fine on existing iceberg tables on the same cluster but can't seem to create a new table on its own.

Can someone point me to the necessary configs required for the above.

Also a quick question, how can we specify the dataset-tags during autocreation of the table ?

This is the connector config I am using

{
"connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector", 
"errors.log.include.messages": "true",
"consumer.override.bootstrap.servers": "[bootstrap-server-url]",
"tasks.max": "1",
"topics": "test_topic",
"iceberg.control.commit.interval-ms": "60000",
"iceberg.control.topic": "test_control",
"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
"value.converter.schema.registry.url": "[SR-url]",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "[SR-url]",
"iceberg.tables": "test.iceberg_sink_test_4",
"name": "iceberg_sink_connector",
"errors.log.enable": "true",
"iceberg.catalog.type": "hive",
"iceberg.catalog.uri": "[thrift-url]",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.s3.region": "us-east-1",
"iceberg.catalog.s3.sse.key": "AES256",
"iceberg.catalog.s3.sse.type": "s3",
"iceberg.catalog": "catalog",
"iceberg.catalog.warehouse": "s3://XXX/iceberg_sink_test_4/",
"iceberg.catalog.s3.bucket.name": "XXX/iceberg_sink_test_4/",
"iceberg.tables.auto-create-enabled": "true",
"iceberg.tables.evolve-schema-enabled":"true",
"iceberg.tables.auto-create-props.location" : "s3://XXXX/iceberg_sink_test_4",
"iceberg.tables.auto-create-props.write.data.path" :"s3://XXXX/iceberg_sink_test_4/data" 
}
fqtab commented 2 months ago

How can we set the auto-create properties for a hive catalog. I have tried setting the iceberg.tables.auto-create-enabled, alongside the iceberg.catalog.warehouse, iceberg.catalog.uri. The connector works perfectly fine on existing iceberg tables on the same cluster but can't seem to create a new table on its own.

Those settings look correct to me. What errors do you see when the connector tries to create tables?

ArkaSarkar19 commented 2 months ago

Hi There are no apparent errors neither in the logs nor the connector is failing. The connector is creating the s3 bucket at the desired location (but doesn't create the /data and /metadata folders), I can also see the table and the metastore info attached with it. However, I am not able to query it as it says that Metadata not found in metadata location for table egdp_test_test.iceberg_sink_test_9 , I believe it is due to the fact that the /metadata is not available at the s3 location.

Do you have any idea why this might be happening ? Also, can you tell me how can we register partitions using the KC config ? I want to apply a transform to one of the columns before partitioning it. Similar to the DDL below :

    USING iceberg
PARTITIONED BY (date_hour(`datetime_utc`))
LOCATION  's3://a
ArkaSarkar19 commented 2 months ago

Hi @fqtab

We found the issue, in the config we are passing the warehouse location as s3://bucket/table but in the table metastore we can see that the location is being stored as s3://bucket//table/metadata (example : s3://[bucket_name]//iceberg_sink_test_11/ where as it should be just as s3://[bucket_name]/iceberg_sink_test_11/ ).

Hence when we checked, we were able to find all the files in the / folder in s3.

Can you tell us what should we use so that we don't see all the files being populated in the / folder, this causes an issue as we are not able to query the table with the files at the wrong location.

This is the current connector config we are using.

{
    "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
    "errors.log.include.messages": "true",
    "consumer.override.bootstrap.servers": "[redacted]",
    "tasks.max": "1",
    "topics": "topic",
    "iceberg.control.commit.interval-ms": "60000",
    "iceberg.control.topic": "topic_2",
    "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
    "value.converter.schema.registry.url": [redacted],
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": [redacted],
    "iceberg.tables": "test.iceberg_sink_test_17",
    "name": "iceberg_sink_connector_t",
    "errors.log.enable": "true",
    "iceberg.catalog": "spark_catalog",
    "iceberg.catalog.type": "hive",
    "iceberg.catalog.uri": "[redacted]",
    "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "iceberg.catalog.client.region": "us-east-1",
    "iceberg.catalog.s3.region": "us-east-1",
    "iceberg.catalog.s3.sse.key": "AES256",
    "iceberg.catalog.s3.sse.type": "s3",
    "iceberg.catalog.warehouse": "s3://bucket_name",
    "iceberg.tables.auto-create-enabled": "true",
    "iceberg.tables.evolve-schema-enabled":"true",
    "iceberg.catalog.s3.path-style-access" : "true"
}
fqtab commented 2 months ago

Can you tell us what should we use so that we don't see all the files being populated in the / folder

Can you try setting "iceberg.catalog.warehouse": "s3://bucket_name/warehouse"?

ArkaSarkar19 commented 2 months ago

Hi @fqtab I have tried giving "iceberg.catalog.warehouse": "s3://bucket_name/lander_warehouse",, but it still goes to the / folder. From the logs I can see that this :

DEBUG Seek with new stream for s3://bucket_name//iceberg_sink_test_25/metadata/00000-a5ad1845-3e06-4bc7-a3f7-f036426c8d61.metadata.json to offset 0 (org.apache.iceberg.aws.s3.S3InputStream:175)

fqtab commented 2 months ago

That's extremely unusual; it has to respect "iceberg.catalog.warehouse" if it's set. It doesn't sound like it looked at what you're setting at all. Did you try this with a clean environment? i.e. the table was deleted before you changed the "iceberg.catalog.warehouse" setting? etc.

Otherwise, I think there's something strange about your specific setup; it would help if you could you reproduce the issue for me with a small example somewhere.

ArkaSarkar19 commented 2 months ago

Hi @fqtab

This behaviour was strange to me as well. I am updating the same connector config again and again, and I am providing a new table_name always. The connector creates the table with the new name only but in the wrong location.

Also One thing I am also adding the dataset-tags like this in the config :

"iceberg.tables.auto-create-props.description": "Testing iceberg table for poc auto create",
  "iceberg.tables.auto-create-props.owner": "arka",

Does this change anything ?

ArkaSarkar19 commented 2 months ago

I noticed the class org.apache.iceberg.BaseMetastoreCatalog in iceberg-core:1.4.2 overrides the table properties like this inside the create() function.

      String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
      tableProperties.putAll(tableOverrideProperties());

And we checked this location always passed as null from org.apache.iceberg.catalog.Catalog.createTable() in iceberg-api:1.4.2.

ArkaSarkar19 commented 2 months ago

Hi @fqtab

How can I build the source code with all the hive dependencies in place ?

fqtab commented 2 months ago

./gradlew clean build You'll find two zip archives under your kafka-connect-runtime/build/distributions/ folder One with hive dependencies and one without.

ArkaSarkar19 commented 2 months ago

Hi @fqtab

On debugging the source code we found that on passing the table-override properties, it takes the right path to the data, metadata folder. Thus the connector is working fine along with auto-create functionality.

We added these additional configs :

    "iceberg.catalog.table-override.write.data.path" :"s3://bucekt_name/table/data", 
    "iceberg.catalog.table-override.write.metadata.path" :"s3://bucekt_name/table/metadata",

the table-override properties are passed under iceberg.catalog.* and not under write-props. It's better to add this information in the documentation for other users as well.

ArkaSarkar19 commented 2 months ago

Hi @fqtab We were able to create an iceberg table to using connector auto-create configurations with appropriate tags. How can we update the tags of the same table using the connector.

For reference here is the config that we are using now :

{
    "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
    "errors.log.include.messages": "true",
    "consumer.override.bootstrap.servers": "URL",
    "tasks.max": "1",
    "topics": "topic",
    "iceberg.control.commit.interval-ms": "60000",
    "iceberg.control.topic": "control_topic",
    "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
    "value.converter.schema.registry.url": "URL",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "URL
    "name": "iceberg_sink_connector_t",
    "errors.log.enable": "true",
    "iceberg.catalog": "spark_catalog",
    "iceberg.catalog.type": "hive",
    "iceberg.catalog.uri": "URL",
    "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "iceberg.catalog.client.region": "us-east-1",
    "iceberg.catalog.s3.region": "us-east-1",
    "iceberg.catalog.s3.sse.key": "AES256",
    "iceberg.catalog.s3.sse.type": "s3",
    "iceberg.catalog.warehouse": "s3://bucket_name",
    "iceberg.tables.auto-create-enabled": "true",
    "iceberg.tables.evolve-schema-enabled":"true",
    "iceberg.tables.auto-create-props.tag1" : "value1",
    "iceberg.tables.auto-create-props.tag2" : "value2",
    "iceberg.tables.auto-create-props.tag3" : "value3",
     .
     .
     .
     .
    "iceberg.tables.auto-create-props.tag_n" : "value_n",

    "iceberg.tables.auto-create-props.write.parquet.compression-codec":"snappy",
    "iceberg.catalog.s3.path-style-access" : "true",
    "iceberg.catalog.table-override.write.data.path" :"s3://bucket_name/iceberg_sink_test_42/data", 
    "iceberg.catalog.table-override.write.metadata.path" :"s3:/bucket_name/iceberg_sink_test_42/metadata",
    "iceberg.table.test.iceberg_sink_test_42.partition-by": "hours(datetime_utc)"
}

If I need to update the value of tag1,tag2 how can be done using the connector configurations

fqtab commented 2 months ago

the table-override properties are passed under iceberg.catalog.* and not under write-props. It's better to add this information in the documentation for other users as well.

It sounds like you were able to solve the issue that was reported when the ticket was initially opened. Feel free to open a PR to update the docs as appropriate.

For new issues, feel free to create new issues. If you have general questions about the connector, it's better to ask in the apache-iceberg/kafka-connect channel as this repo is in the process of being donated to the Apache foundation and there are more people that can answer questions there.

If I need to update the value of tag1,tag2 how can be done using the connector configurations

I don't think you can do this through connector configurations after the table has been created. You'll have to do it via spark or the low-level Java API or something else.