tabular-io / iceberg-kafka-connect

Apache License 2.0
192 stars 41 forks source link

Working with other catalogs #85

Closed ajantha-bhat closed 11 months ago

ajantha-bhat commented 11 months ago

As documentation suggests that this library will work with all the Iceberg catalogs, I wanted to try it out with Nessie catalog.

Since, I am a newbie to Kafka and streaming side of things, I figured out that setting up each of them will take some time for me. So, I decided to try out a quick POC by extending the integration-tests to use Nessie catalog. But the TestContext is singleton and can have only one catalog at a time as of now. So, Making it adoptable to multiple catalog will be my next task. But I didn't want to spend time on that now. Hence, I updated the test cases to use Nessie catalog instead of REST catalog to see whether it can work.

code changes: https://github.com/tabular-io/iceberg-kafka-connect/compare/main...ajantha-bhat:iceberg-kafka-connect:nessie

But the integration tests are failing with that change.

Assertion condition defined as a lambda expression in io.tabular.iceberg.connect.IntegrationTest 
Expected size: 1 but was: 0 in:
[] within 30 seconds.
org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a lambda expression in io.tabular.iceberg.connect.IntegrationTest 
Expected size: 1 but was: 0 in:
[] within 30 seconds.

IntegrationCdcTest > testIcebergSink() FAILED
    org.awaitility.core.ConditionTimeoutException at IntegrationCdcTest.java:181
        Caused by: java.util.concurrent.TimeoutException at IntegrationCdcTest.java:181

I put break point and checked that table is created in nessie catalog. But there is no data from streaming ingest. Table is empty. Hence the testcase failed.

There is no error log or exception related to functionality. I thought code will invoke IcebergSinkTask.open(). But it didn't. So, please let me know how can I debug further. @bryanck

P.s: I also saw some time out issue fix, https://github.com/tabular-io/iceberg-kafka-connect/pull/84 I have rebased my changes. Still my issue exists.

bryanck commented 11 months ago

The integration tests are a somewhat flaky on some systems, I'm still investigating that. Also, the timeout occurs if the assertion never passes, so try checking the Kafka Connect container logs for any exceptions being logged.

ajantha-bhat commented 11 months ago

so try checking the Kafka Connect container logs for any exceptions being logged.

Thanks. I found below callstack from kafka-container logs.

2023-09-25 15:51:52 org.projectnessie.client.http.HttpClientException: Failed to execute GET request against 'http://localhost:19121/api/v1/config'.
2023-09-25 15:51:52     at org.projectnessie.client.http.impl.jdk11.JavaRequest.executeRequest(JavaRequest.java:128)
2023-09-25 15:51:52     at org.projectnessie.client.http.HttpRequest.get(HttpRequest.java:80)
2023-09-25 15:51:52     at org.projectnessie.client.http.NessieApiCompatibility.check(NessieApiCompatibility.java:35)
2023-09-25 15:51:52     at org.projectnessie.client.http.HttpClientBuilder.build(HttpClientBuilder.java:341)
2023-09-25 15:51:52     at org.apache.iceberg.nessie.NessieCatalog.initialize(NessieCatalog.java:95)
2023-09-25 15:51:52     at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:239)
2023-09-25 15:51:52     at org.apache.iceberg.CatalogUtil.buildIcebergCatalog(CatalogUtil.java:284)
2023-09-25 15:51:52     at io.tabular.iceberg.connect.data.Utilities.loadCatalog(Utilities.java:67)
2023-09-25 15:51:52     at io.tabular.iceberg.connect.IcebergSinkTask.open(IcebergSinkTask.java:61)
2023-09-25 15:51:52     at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:644)
2023-09-25 15:51:52     at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1200(WorkerSinkTask.java:73)
2023-09-25 15:51:52     at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:741)
2023-09-25 15:51:52     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:322)
2023-09-25 15:51:52     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:471)
2023-09-25 15:51:52     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:474)
2023-09-25 15:51:52     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:385)
2023-09-25 15:51:52     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:557)
2023-09-25 15:51:52     at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1272)
2023-09-25 15:51:52     at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1236)
2023-09-25 15:51:52     at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
2023-09-25 15:51:52     at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:479)
2023-09-25 15:51:52     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:331)
2023-09-25 15:51:52     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
2023-09-25 15:51:52     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
2023-09-25 15:51:52     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
2023-09-25 15:51:52     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
2023-09-25 15:51:52     at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
2023-09-25 15:51:52     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2023-09-25 15:51:52     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-09-25 15:51:52     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2023-09-25 15:51:52     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2023-09-25 15:51:52     at java.base/java.lang.Thread.run(Thread.java:829)
2023-09-25 15:51:52 Caused by: java.net.ConnectException: Connection refused
2023-09-25 15:51:52     at java.net.http/jdk.internal.net.http.HttpClientImpl.

Let me dig further.

bryanck commented 11 months ago

In case this is relevant, Docker on Mac doesn't support host networking, so Kafka Connect won't be able to connect to a catalog running on your host system when running the integration tests.

bryanck commented 11 months ago

BTW the integration tests should be more stable now, I made a few updates.

ajantha-bhat commented 11 months ago

In case this is relevant, Docker on Mac doesn't support host networking, so Kafka Connect won't be able to connect to a catalog running on your host system when running the integration tests.

Can you please elaborate this? This integration test is running Nessie docker just like how the test was previously running REST catalog docker. I will figure out the problem about why it is connection refused. Initially I thought I need to use mapped-port for kafaka-connect. But REST catalog was not using it. But I tried using mapped-port, still same error. I will manually curl and see it.

BTW the integration tests should be more stable now, I made a few updates.

Thanks, I will rebase after fixing my error.

bryanck commented 11 months ago

If you're running Nessie in Docker then you can ignore that reply, I wasn't clear on your setup.

bryanck commented 11 months ago

I don't think localhost as the catalog host will work though. That should be the network alias.

ajantha-bhat commented 11 months ago

I don't think localhost as the catalog host will work though. That should be the network alias.

You are right. changing from http://localhost:19121/api/v1 to http://iceberg:19121/api/v1 has fixed my connection issue. I didn't observe hostname clearly how the REST uri was configured before.

ajantha-bhat commented 11 months ago

That error is resolved.

Since, I have configured the warehouse for Nessie, I have a new error. I will try to figure out.

Caused by: org.apache.iceberg.exceptions.RuntimeIOException: Failed to get file system for path: s3://bucket/warehouse/test/foobar_4d4430dd-da8f-4ae5-b4f1-ff9d1fa8f5f7/metadata/00000-17199790-1334-49f0-bfc3-0b4ac0a9b544.metadata.json
2023-09-25 20:50:29     at org.apache.iceberg.hadoop.Util.getFs(Util.java:58)
2023-09-25 20:50:29     at org.apache.iceberg.hadoop.HadoopInputFile.fromLocation(HadoopInputFile.java:56)
2023-09-25 20:50:29     at org.apache.iceberg.hadoop.HadoopFileIO.newInputFile(HadoopFileIO.java:90)
2023-09-25 20:50:29     at org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:266)
2023-09-25 20:50:29     at org.apache.iceberg.nessie.NessieTableOperations.loadTableMetadata(NessieTableOperations.java:86)
ajantha-bhat commented 11 months ago

FileIO was not configured for REST catalog (it might be using ResolvingFileIO in the tabular Iceberg fork which this project uses). But for Nessie, I need to manually configure the FileIO as S3FileIO.

bryanck commented 11 months ago

Yes, I believe the Nessie catalog uses HadoopFileIO by default, there is a PR open to change that: https://github.com/apache/iceberg/pull/8272

ajantha-bhat commented 11 months ago

Everything works now. But I can't raise PR for the testcase yet since TestContext is a singleton class.

I will comeback with a refactoring and contribute the testcases. Thanks. closing this ticket.