databricks / iceberg-kafka-connect

Apache License 2.0
219 stars 49 forks source link

Iceberg tables on S3 have an extra directory level that was not configured #118

Closed marc0-p closed 1 year ago

marc0-p commented 1 year ago

Hi, I set up this sink connector and it's working but I have an issue.

I'm getting an extra level in the data directory, but before the partition folders. Almost like some kind of sharding. It looks something like this:

s3://my_bucket/topics/some.topic/data/00u8bQ/partition_field=xyz/part_file_000.parquet

Where the "00u8bQ" folder is one of many 6-character folders under the data directory.

bryanck commented 1 year ago

Yes, that's the object store file layout feature of Iceberg. See https://iceberg.apache.org/docs/latest/aws/#object-store-file-layout for more info. You can disable it but I'd leave it on for the reasons given in the docs.

marc0-p commented 1 year ago

@bryanck - I think I found something. So, in the object store file layout documentation they show a path like this: s3://my-table-data-bucket/2d3905f8/my_ns.db/my_table/category=orders/00000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet

Where as my data path looks something like this s3://my-bucket/datalake/topics/my_topic/data/2d3905f8/category=orders/00000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet

where my config would have the equivalent of: iceberg.catalog.warehouse = s3a://my-bucket/datalake topics.dir = topics

I think with my deterministic hash being inserted after the table name, might be a problem? Also, the apache example is a bit weird too, as the data is not under /data (counterpart to /metadata) in standard iceberg data structure.

marc0-p commented 1 year ago

Another thing. I'm not getting any manifest lists, or updates to the metadata. Which is probably the true root cause of all this!

So the status is, I have data actively flowing into the data dir, but have no updates to the metadata.

marc0-p commented 1 year ago

Note, while the connector is "running" I see this task error.:

{ "name": "sink-s3-block-ingestion-block-all-iceberg-v02", "connector": { "state": "RUNNING", "worker_id": "XXXXXX:8083" }, "tasks": [ { "id": 0, "state": "FAILED", "worker_id": "XXXXXX:8083", "trace": "java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.\n\tat io.tabular.iceberg.connect.channel.Coordinator.lambda$getTotalPartitionCount$0(Coordinator.java:129)\n\tat java.base/java.util.stream.ReferencePipeline$4$1.accept(ReferencePipeline.java:212)\n\tat java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1693)\n\tat java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)\n\tat java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)\n\tat java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)\n\tat java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)\n\tat java.base/java.util.stream.IntPipeline.reduce(IntPipeline.java:491)\n\tat java.base/java.util.stream.IntPipeline.sum(IntPipeline.java:449)\n\tat io.tabular.iceberg.connect.channel.Coordinator.getTotalPartitionCount(Coordinator.java:132)\n\tat io.tabular.iceberg.connect.channel.Coordinator.(Coordinator.java:78)\n\tat io.tabular.iceberg.connect.IcebergSinkTask.open(IcebergSinkTask.java:66)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:637)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:72)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:733)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:317)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:466)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:474)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:385)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:552)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1276)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:472)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.\n\tat java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)\n\tat java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)\n\tat org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)\n\tat io.tabular.iceberg.connect.channel.Coordinator.lambda$getTotalPartitionCount$0(Coordinator.java:127)\n\t... 33 more\nCaused by: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.\n" }, { "id": 1, "state": "RUNNING", "worker_id": "XXXX:8083" } ], "type": "sink" }

bryanck commented 1 year ago

It looks like you may be running an older version of the sink, can you try with the latest version? (Note, some property names were changed if going from 0.4 -> 0.5). The task should fail if the coordinator throws an exception so I'll make sure that's the case.

marcopalmeri commented 1 year ago

Hi @bryanck upgrading the version now results in a different error:


"org.apache.kafka.connect.errors.ConnectException: No partitions assigned, cannot determine leader\n\tat io.tabular.iceberg.connect.IcebergSinkTask.lambda$isLeader$1(IcebergSinkTask.java:109)\n\tat java.base/java.util.Optional.orElseThrow(Optional.java:408)\n\tat io.tabular.iceberg.connect.IcebergSinkTask.isLeader(IcebergSinkTask.java:108)\n\tat io.tabular.iceberg.connect.IcebergSinkTask.open(IcebergSinkTask.java:86)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:637)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:72)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:733)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:317)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:466)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:474)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:385)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:552)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1276)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:472)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n"

Any ideas? I tried bumping the connector version already.

bryanck commented 1 year ago

It looks like, as with the previous error, that the source topic cannot be found. Can you post your full config and how you are running Kafka Connect?

marcopalmeri commented 1 year ago

Using terraform, the same repo/deployments for all of our different sinks (mostly just confluent s3 sinks, some jdbc).

resource "kafka-connect_connector" "sink-iceberg-block-ingestion" {
  name = "sink-s3-block-ingestion-block-all-iceberg-v02"

  config = merge({
    "name"            = "sink-s3-block-ingestion-block-all-iceberg-v02"
    "connector.class" = "io.tabular.iceberg.connect.IcebergSinkConnector"
    "timezone"        = "UTC"
    "locale"          = "en-GB"

    "topics"         = "prod.block-ingestion.block_data.block.all"
    "topics.dir"     = "iceberg-ingested/kafka-uat/topics"
    "iceberg.tables" = "uat_ingested_block.block_all"
    "tasks.max"      = "2"

    "iceberg.catalog.catalog-impl"                            = "org.apache.iceberg.aws.glue.GlueCatalog"
    "iceberg.catalog"                                         = "uat_ingested_block"
    "iceberg.catalog.warehouse"                               = "s3a://blah/iceberg-ingested"
    "iceberg.catalog.io-impl"                                 = "org.apache.iceberg.aws.s3.S3FileIO"
    "iceberg.catalog.client.region"                           = local.aws_region
    "iceberg.tables.evolve-schema-enabled"                    = "true"
    "iceberg.tables.upsert-mode-enabled"                      = "true"
    "iceberg.tables.auto-create-enabled"                      = "true"
    "iceberg.tables.default-id-columns"                       = "id"
    "iceberg.tables.default-partition-by"                     = "base_chain"
    # "iceberg.table.uat_ingested_block.block_all.id-columns"   = "id"
    # "iceberg.table.uat_ingested_block.block_all.partition-by" = "base_chain"

    "value.converter"                             = "io.confluent.connect.avro.AvroConverter"
    "value.converter.schema.registry.url"         = var.schema_registry_url
    "value.converter.value.subject.name.strategy" = "io.confluent.kafka.serializers.subject.RecordNameStrategy"
    "key.converter.schema.registry.url"           = var.schema_registry_url

    "schema.compatibility"                = "NONE"
    "rotate.schedule.interval.ms"         = "60000"
    "errors.retry.delay.max.ms"           = "5000"
    "errors.retry.timeout"                = "60000"
    "consumer.override.auto.offset.reset" = "earliest"
    "connect.meta.data"                   = "false"
    "enhanced.avro.schema.support"        = "true"
    "behavior.on.null.values"             = "ignore"

    "consumer.override.max.partition.fetch.bytes" = "120000000"
    "consumer.override.max.request.size"          = "120000000"
    "consumer.override.max.poll.records"          = "20"
    "consumer.override.fetch.min.bytes"           = "500000"
    "consumer.override.fetch.max.bytes"           = "120000000"
    "consumer.override.session.timeout.ms"        = "200000"
    "consumer.override.request.interval.ms"       = "180000"
    "consumer.override.max.poll.interval.ms"      = "180000"
    "consumer.override.max.message.bytes"         = "100000000"
    "consumer.override.fetch.max.message.bytes"   = "100000000"
    "consumer.override.receive.buffer.bytes"      = "100000000"
    "consumer.override.request.timeout.ms"        = "90000"

    "transforms"                           = "tombstoneHandler"
    "transforms.tombstoneHandler.type"     = "io.confluent.connect.transforms.TombstoneHandler"
    "transforms.tombstoneHandler.behavior" = "ignore"
  }, local.funding_conn_override) #These are the connection/ssl details.
}
bryanck commented 1 year ago

It could be that the sink is not picking up the correct Kafka settings. It will attempt to read them from the worker properties file. You can try setting Kafka properties for the sink's internal Kafka client explicitly by setting iceberg.kafka.* properties. The prefix for these properties will be stripped off and then they will be passed to the Kafka client during initialization.

bryanck commented 1 year ago

Another user reported a similar error where the leader can't be determined, so I'm taking a further look, I'll report back.

bryanck commented 1 year ago

I have a new build 0.5.8 that hopefully addresses the leader election failure you're seeing.

marcopalmeri commented 1 year ago

@bryanck We have no errors now, the sink is writing data to S3, but still no metadata updates, no manifest list, no snapshots.

Is this because we manually created the table?

The auto-create did not work for us.

bryanck commented 1 year ago

Unfortunately I’m not able to reproduce your issue, I’m able to auto create and insert using a Glue catalog. If you can give me a way to reproduce your issue with a simplified example I’d be happy to investigate.

bryanck commented 1 year ago

In case it helps, here's the config I used:

    topics: test-topic
    header.converter: org.apache.kafka.connect.storage.StringConverter
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: false
    iceberg.tables: kc_glue_test.foobar
    iceberg.table.kc_glue_test.foobar.partition-by: id
    iceberg.tables.auto-create-enabled: true
    iceberg.control.commit.interval-ms: 30000
    iceberg.catalog.catalog-impl: org.apache.iceberg.aws.glue.GlueCatalog
    iceberg.catalog.io-impl: org.apache.iceberg.aws.s3.S3FileIO
    iceberg.catalog.warehouse: s3://kc-glue-test/iceberg
    iceberg.catalog.s3.staging-dir: /home/kafka/iceberg-staging
bryanck commented 1 year ago

I'm going to close this, feel free to open a new ticket if you have further issues and a method to reproduce.