tabular-io / iceberg-kafka-connect

Apache License 2.0
192 stars 41 forks source link

Iceberg tables on S3 have an extra directory level that was not configured #118

Closed marc0-p closed 10 months ago

marc0-p commented 10 months ago

Hi, I set up this sink connector and it's working but I have an issue.

I'm getting an extra level in the data directory, but before the partition folders. Almost like some kind of sharding. It looks something like this:

s3://my_bucket/topics/some.topic/data/00u8bQ/partition_field=xyz/part_file_000.parquet

Where the "00u8bQ" folder is one of many 6-character folders under the data directory.

bryanck commented 10 months ago

Yes, that's the object store file layout feature of Iceberg. See https://iceberg.apache.org/docs/latest/aws/#object-store-file-layout for more info. You can disable it but I'd leave it on for the reasons given in the docs.

marc0-p commented 10 months ago

@bryanck - I think I found something. So, in the object store file layout documentation they show a path like this: s3://my-table-data-bucket/2d3905f8/my_ns.db/my_table/category=orders/00000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet

Where as my data path looks something like this s3://my-bucket/datalake/topics/my_topic/data/2d3905f8/category=orders/00000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet

where my config would have the equivalent of: iceberg.catalog.warehouse = s3a://my-bucket/datalake topics.dir = topics

I think with my deterministic hash being inserted after the table name, might be a problem? Also, the apache example is a bit weird too, as the data is not under /data (counterpart to /metadata) in standard iceberg data structure.

marc0-p commented 10 months ago

Another thing. I'm not getting any manifest lists, or updates to the metadata. Which is probably the true root cause of all this!

So the status is, I have data actively flowing into the data dir, but have no updates to the metadata.

marc0-p commented 10 months ago

Note, while the connector is "running" I see this task error.:

{ "name": "sink-s3-block-ingestion-block-all-iceberg-v02", "connector": { "state": "RUNNING", "worker_id": "XXXXXX:8083" }, "tasks": [ { "id": 0, "state": "FAILED", "worker_id": "XXXXXX:8083", "trace": "java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.\n\tat io.tabular.iceberg.connect.channel.Coordinator.lambda$getTotalPartitionCount$0(Coordinator.java:129)\n\tat java.base/java.util.stream.ReferencePipeline$4$1.accept(ReferencePipeline.java:212)\n\tat java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1693)\n\tat java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)\n\tat java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)\n\tat java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)\n\tat java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)\n\tat java.base/java.util.stream.IntPipeline.reduce(IntPipeline.java:491)\n\tat java.base/java.util.stream.IntPipeline.sum(IntPipeline.java:449)\n\tat io.tabular.iceberg.connect.channel.Coordinator.getTotalPartitionCount(Coordinator.java:132)\n\tat io.tabular.iceberg.connect.channel.Coordinator.(Coordinator.java:78)\n\tat io.tabular.iceberg.connect.IcebergSinkTask.open(IcebergSinkTask.java:66)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:637)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:72)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:733)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:317)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:466)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:474)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:385)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:552)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1276)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:472)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.\n\tat java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)\n\tat java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)\n\tat org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)\n\tat io.tabular.iceberg.connect.channel.Coordinator.lambda$getTotalPartitionCount$0(Coordinator.java:127)\n\t... 33 more\nCaused by: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.\n" }, { "id": 1, "state": "RUNNING", "worker_id": "XXXX:8083" } ], "type": "sink" }

bryanck commented 10 months ago

It looks like you may be running an older version of the sink, can you try with the latest version? (Note, some property names were changed if going from 0.4 -> 0.5). The task should fail if the coordinator throws an exception so I'll make sure that's the case.

marcopalmeri commented 10 months ago

Hi @bryanck upgrading the version now results in a different error:


"org.apache.kafka.connect.errors.ConnectException: No partitions assigned, cannot determine leader\n\tat io.tabular.iceberg.connect.IcebergSinkTask.lambda$isLeader$1(IcebergSinkTask.java:109)\n\tat java.base/java.util.Optional.orElseThrow(Optional.java:408)\n\tat io.tabular.iceberg.connect.IcebergSinkTask.isLeader(IcebergSinkTask.java:108)\n\tat io.tabular.iceberg.connect.IcebergSinkTask.open(IcebergSinkTask.java:86)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:637)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:72)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:733)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:317)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:466)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:474)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:385)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:552)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1276)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:472)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n"

Any ideas? I tried bumping the connector version already.

bryanck commented 10 months ago

It looks like, as with the previous error, that the source topic cannot be found. Can you post your full config and how you are running Kafka Connect?

marcopalmeri commented 10 months ago

Using terraform, the same repo/deployments for all of our different sinks (mostly just confluent s3 sinks, some jdbc).

resource "kafka-connect_connector" "sink-iceberg-block-ingestion" {
  name = "sink-s3-block-ingestion-block-all-iceberg-v02"

  config = merge({
    "name"            = "sink-s3-block-ingestion-block-all-iceberg-v02"
    "connector.class" = "io.tabular.iceberg.connect.IcebergSinkConnector"
    "timezone"        = "UTC"
    "locale"          = "en-GB"

    "topics"         = "prod.block-ingestion.block_data.block.all"
    "topics.dir"     = "iceberg-ingested/kafka-uat/topics"
    "iceberg.tables" = "uat_ingested_block.block_all"
    "tasks.max"      = "2"

    "iceberg.catalog.catalog-impl"                            = "org.apache.iceberg.aws.glue.GlueCatalog"
    "iceberg.catalog"                                         = "uat_ingested_block"
    "iceberg.catalog.warehouse"                               = "s3a://blah/iceberg-ingested"
    "iceberg.catalog.io-impl"                                 = "org.apache.iceberg.aws.s3.S3FileIO"
    "iceberg.catalog.client.region"                           = local.aws_region
    "iceberg.tables.evolve-schema-enabled"                    = "true"
    "iceberg.tables.upsert-mode-enabled"                      = "true"
    "iceberg.tables.auto-create-enabled"                      = "true"
    "iceberg.tables.default-id-columns"                       = "id"
    "iceberg.tables.default-partition-by"                     = "base_chain"
    # "iceberg.table.uat_ingested_block.block_all.id-columns"   = "id"
    # "iceberg.table.uat_ingested_block.block_all.partition-by" = "base_chain"

    "value.converter"                             = "io.confluent.connect.avro.AvroConverter"
    "value.converter.schema.registry.url"         = var.schema_registry_url
    "value.converter.value.subject.name.strategy" = "io.confluent.kafka.serializers.subject.RecordNameStrategy"
    "key.converter.schema.registry.url"           = var.schema_registry_url

    "schema.compatibility"                = "NONE"
    "rotate.schedule.interval.ms"         = "60000"
    "errors.retry.delay.max.ms"           = "5000"
    "errors.retry.timeout"                = "60000"
    "consumer.override.auto.offset.reset" = "earliest"
    "connect.meta.data"                   = "false"
    "enhanced.avro.schema.support"        = "true"
    "behavior.on.null.values"             = "ignore"

    "consumer.override.max.partition.fetch.bytes" = "120000000"
    "consumer.override.max.request.size"          = "120000000"
    "consumer.override.max.poll.records"          = "20"
    "consumer.override.fetch.min.bytes"           = "500000"
    "consumer.override.fetch.max.bytes"           = "120000000"
    "consumer.override.session.timeout.ms"        = "200000"
    "consumer.override.request.interval.ms"       = "180000"
    "consumer.override.max.poll.interval.ms"      = "180000"
    "consumer.override.max.message.bytes"         = "100000000"
    "consumer.override.fetch.max.message.bytes"   = "100000000"
    "consumer.override.receive.buffer.bytes"      = "100000000"
    "consumer.override.request.timeout.ms"        = "90000"

    "transforms"                           = "tombstoneHandler"
    "transforms.tombstoneHandler.type"     = "io.confluent.connect.transforms.TombstoneHandler"
    "transforms.tombstoneHandler.behavior" = "ignore"
  }, local.funding_conn_override) #These are the connection/ssl details.
}
bryanck commented 10 months ago

It could be that the sink is not picking up the correct Kafka settings. It will attempt to read them from the worker properties file. You can try setting Kafka properties for the sink's internal Kafka client explicitly by setting iceberg.kafka.* properties. The prefix for these properties will be stripped off and then they will be passed to the Kafka client during initialization.

bryanck commented 10 months ago

Another user reported a similar error where the leader can't be determined, so I'm taking a further look, I'll report back.

bryanck commented 10 months ago

I have a new build 0.5.8 that hopefully addresses the leader election failure you're seeing.

marcopalmeri commented 10 months ago

@bryanck We have no errors now, the sink is writing data to S3, but still no metadata updates, no manifest list, no snapshots.

Is this because we manually created the table?

The auto-create did not work for us.

bryanck commented 10 months ago

Unfortunately I’m not able to reproduce your issue, I’m able to auto create and insert using a Glue catalog. If you can give me a way to reproduce your issue with a simplified example I’d be happy to investigate.

bryanck commented 10 months ago

In case it helps, here's the config I used:

    topics: test-topic
    header.converter: org.apache.kafka.connect.storage.StringConverter
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: false
    iceberg.tables: kc_glue_test.foobar
    iceberg.table.kc_glue_test.foobar.partition-by: id
    iceberg.tables.auto-create-enabled: true
    iceberg.control.commit.interval-ms: 30000
    iceberg.catalog.catalog-impl: org.apache.iceberg.aws.glue.GlueCatalog
    iceberg.catalog.io-impl: org.apache.iceberg.aws.s3.S3FileIO
    iceberg.catalog.warehouse: s3://kc-glue-test/iceberg
    iceberg.catalog.s3.staging-dir: /home/kafka/iceberg-staging
bryanck commented 10 months ago

I'm going to close this, feel free to open a new ticket if you have further issues and a method to reproduce.