databricks / iceberg-kafka-connect

Apache License 2.0
219 stars 49 forks source link

Hive Metastore: Failed to get table info from metastore #215

Closed jbolesjc closed 2 months ago

jbolesjc commented 8 months ago

Hey all,

We are experiencing an issue where the sink connector cannot get the table info of our table in the hive metastore. We do have iceberg.tables.auto-create-enabled set to true but I went and created the table in the hive metastore anyways because this was failing. So the table exists but the sink connector is unable to pull any info from it.

Is there some permissions issue between the local hive:4.0.0-beta-1 running and the local kafka-connect?

The error we receive in the connector is the following: "Failed to get table info from metastore default.iceberg_events"

Screenshot 2024-03-18 at 4 44 22 PM

"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. \n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:610) \n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330) \n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) \n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) \n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) \n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237) \n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) \n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) \n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) \n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) \n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.RuntimeException: Failed to get table info from metastore default.iceberg_events \n\tat org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:171) \n\tat org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97) \n\tat org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:80) \n\tat org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:47) \n\tat io.tabular.iceberg.connect.data.IcebergWriterFactory.createWriter(IcebergWriterFactory.java:54) \n\tat io.tabular.iceberg.connect.channel.Worker.lambda$writerForTable$8(Worker.java:242) \n\tat java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1134) \n\tat io.tabular.iceberg.connect.channel.Worker.writerForTable(Worker.java:241) \n\tat io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$5(Worker.java:197) \n\tat java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4390) \n\tat io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:195) \n\tat io.tabular.iceberg.connect.channel.Worker.save(Worker.java:184) \n\tat java.base/java.util.ArrayList.forEach(ArrayList.java:1541) \n\tat io.tabular.iceberg.connect.channel.Worker.save(Worker.java:171) \n\tat io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:150) \n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582) \n\t... 10 more\nCaused by: org.apache.thrift.transport.TTransportException \n\tat org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) \n\tat org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) \n\tat org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429) \n\tat org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318) \n\tat org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219) \n\tat org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77) \n\tat org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_table(ThriftHiveMetastore.java:1514) \n\tat org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_table(ThriftHiveMetastore.java:1500) \n\tat org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1346) \n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) \n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) \n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) \n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566) \n\tat org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:169) \n\tat com.sun.proxy.$Proxy58.getTable(Unknown Source) \n\tat org.apache.iceberg.hive.HiveTableOperations.lambda$doRefresh$0(HiveTableOperations.java:158) \n\tat org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:58) \n\tat org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51) \n\tat org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122) \n\tat org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:158) \n\t... 25 more\n

jbolesjc commented 8 months ago

Wanted to add that we are using the runtime hive v0.6.12 release

danielcweeks commented 8 months ago

It looks like that's an exception from the hive metastore client (thrift exception). I would suspect that the issue is related to connectivity to the metastore (host not resolvable or port issue?).

k0bayash1maru commented 8 months ago

Is your Hive metastore setup for SSL/SASL with PLAIN client auth ? Are you running with Minio ?

jbolesjc commented 8 months ago

Does not appear that the hive-site.xml has any SASL set up currently.

`

hive.server2.enable.doAs false
<property>
    <name>hive.tez.exec.inplace.progress</name>
    <value>false</value>
</property>
<property>
    <name>hive.exec.scratchdir</name>
    <value>/opt/hive/scratch_dir</value>
</property>
<property>
    <name>hive.user.install.directory</name>
    <value>/opt/hive/install_dir</value>
</property>
<property>
    <name>tez.runtime.optimize.local.fetch</name>
    <value>true</value>
</property>
<property>
    <name>hive.exec.submit.local.task.via.child</name>
    <value>false</value>
</property>
<property>
    <name>mapreduce.framework.name</name>
    <value>local</value>
</property>
<property>
    <name>tez.local.mode</name>
    <value>true</value>
</property>
<property>
    <name>hive.execution.engine</name>
    <value>tez</value>
</property>
<property>
    <name>metastore.warehouse.dir</name>
    <value>/opt/hive/data/warehouse</value>
</property>
<property>
    <name>metastore.metastore.event.db.notification.api.auth</name>
    <value>false</value>
</property>

`

jbolesjc commented 8 months ago

I am able to ping the IP of the hive container from my kafka-connect container. However I am not able to ping the URI I am passing to the connector config of thrift://hive:10000/

jbolesjc commented 8 months ago

I am not using minio, I am using localstack. If I go ahead and set up PLAIN client auth, what are the connector config keys to pass the username and password with in order to connect?

tabmatfournier commented 8 months ago

@jbolesjc https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_scram.html#kconnect-long , set your properties on the consumer prefixed keys.

jbolesjc commented 8 months ago

As I don't see any out of the box SASL set up with my hms currently I don't think adding a username and password to my kafka-connect is the solution here

jbolesjc commented 2 months ago

We ended up ditching hive and using Nessie as the catalog and localstack for storage

We used a connector config similar to this "config": { "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector", "tasks.max": "3", "topics": "test_lake", "topics.dir": "\b", "iceberg.tables": "test_events", "iceberg.catalog": "test_catalog", "iceberg.catalog.catalog-impl": "org.apache.iceberg.nessie.NessieCatalog", "iceberg.catalog.warehouse": "s3a://local-lake-events/", "iceberg.catalog.uri": "http://nessie:19120/api/v1", "iceberg.catalog.s3.endpoint": "http://localstack:4566", "iceberg.catalog.ref": "main", "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", "iceberg.catalog.s3.path-style-access": "true", "iceberg.tables.auto-create-enabled": "true", "iceberg.tables.evolve-schema-enabled": "true", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", }