Open k0bayash1maru opened 8 months ago
@bryanck , can you point me in the right direction please.
FYI, I have the .jks file uploaded to the connect and truststore path pointing to it
The sink distribution for Hive includes Hive v2.3. It looks like some of your properties are for Hive v3+.
Thanks for the reply.
Yes, my hms version is 4.0
Any ideas on how I can pass these SSL values in the old Hive 2.3 version ?
Or can I pass custom Key-Value pair to get past this ?
Tagging @tabmatfournier , if you have any thoughts as well based on your post here:
@jbolesjc https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_scram.html#kconnect-long , set your properties on the
consumer
prefixed keys.
https://github.com/tabular-io/iceberg-kafka-connect/issues/215#issuecomment-2008267035
@tabmatfournier / @bryanck
hoping you have some workaround/suggestions for my scenario.
Originally I had the sink use the Hive 3.x metastore client, but that wasn't working, so I switched back to the 2.x client in this PR. You could try creating a custom build with the Hive 3 client but YMMV.
I tried something over the weekend and update the code with:
hive-ver = "4.0.0-alpha-2"
private static final String HIVE_METASTORE_USE_SSL = "hive.metastore.use.SSL";
rebuilt the package and now when I run it I see that the new parameters are picked up and I can connect to the metastore BUT:
2024-03-24 16:22:39,869 INFO || [Consumer clientId=connector-consumer-customers-sink-0, groupId=connect-customers-sink] Subscribed to topic(s): dbserver1.inventory.customers [org.apache.kafka.clients.consumer.KafkaConsumer]
2024-03-24 16:22:39,870 INFO || IcebergSinkConfig values:
hive.metastore.client.plain.password = 48280c1244f139183abcde
hive.metastore.client.plain.username = default
hive.metastore.truststore.password = <>
hive.metastore.truststore.path = <>
hive.metastore.use.SSL = true
iceberg.catalog = iceberg_data
iceberg.connect.group-id = null
iceberg.control.commit.interval-ms = 1000
iceberg.control.commit.threads = 2
iceberg.control.commit.timeout-ms = 30000
iceberg.control.group-id = null
iceberg.control.topic = control-iceberg
iceberg.hadoop-conf-dir = null
iceberg.tables = [kafka.customers]
iceberg.tables.auto-create-enabled = true
iceberg.tables.cdc-field = null
iceberg.tables.default-commit-branch = null
iceberg.tables.default-id-columns = null
iceberg.tables.default-partition-by = null
iceberg.tables.dynamic-enabled = false
iceberg.tables.evolve-schema-enabled = true
iceberg.tables.route-field = null
iceberg.tables.schema-case-insensitive = true
iceberg.tables.schema-force-optional = false
iceberg.tables.upsert-mode-enabled = false
[io.tabular.iceberg.connect.IcebergSinkConfig]
2024-03-24 15:47:48,675 INFO || Opened a connection to metastore, current connections: 1 [hive.metastore]
2024-03-24 15:47:48,678 INFO || Connected to metastore. [hive.metastore]
.. BUT it gets disconnected when it tries to do a gettable:
org.apache.thrift.transport.TTransportException: Socket is closed by peer.
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:184)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:109)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:464)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:362)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:245)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_table(ThriftHiveMetastore.java:1514)
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_table(ThriftHiveMetastore.java:1500)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1346)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
...
java.lang.RuntimeException: Failed to get table info from metastore kafka.customers
at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:171)
at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97)
at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:80)
at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:47)
at io.tabular.iceberg.connect.data.IcebergWriterFactory.createWriter(IcebergWriterFactory.java:54)
at io.tabular.iceberg.connect.channel.Worker.lambda$writerForTable$8(Worker.java:242)
at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1134)
at io.tabular.iceberg.connect.channel.Worker.writerForTable(Worker.java:241)
at io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$5(Worker.java:197)
at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4390)
at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:195)
at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:184)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:171)
at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:156)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.thrift.transport.TTransportException: Socket is closed by peer.
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:184)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:109)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:464)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:362)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:245)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_table(ThriftHiveMetastore.java:1514)
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_table(ThriftHiveMetastore.java:1500)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1346)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:169)
at com.sun.proxy.$Proxy224.getTable(Unknown Source)
at org.apache.iceberg.hive.HiveTableOperations.lambda$doRefresh$0(HiveTableOperations.java:158)
at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:58)
at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:158)
... 26 more
Where does the logic for the actual connection go ?say when we are trying to do the gettable ? I think i'm missing something in the IcebergSinkTask.java but not sure what.
Iceberg's HiveCatalog
.
Finally got the SSL connectivity working!
2024-03-25 22:25:56,266 INFO || Trying to connect to metastore with URI thrift://abc-metastore:9083 [hive.metastore]
2024-03-25 22:25:56,291 INFO || Opened an SSL connection to metastore, current connections: 1 [hive.metastore]
2024-03-25 22:25:56,344 INFO || Connected to metastore. [hive.metastore]
2024-03-25 22:25:56,365 INFO || Table properties set at catalog level through catalog properties: {} [org.apache.iceberg.BaseMetastoreCatalog]
2024-03-25 22:25:56,376 INFO || Table properties enforced at catalog level through catalog properties: {} [org.apache.iceberg.BaseMetastoreCatalog]
2024-03-25 22:25:56,529 INFO || Committed to table iceberg.kafka.t2 with the new metadata location s3a://bucket1/kafka/t2/metadata/00000-049c2bf3-1afa-41a4-a61d-21b1f5796621.metadata.json [org.apache.iceberg.hive.HiveTableOperations]
2024-03-25 22:25:56,529 INFO || Successfully committed to table iceberg.kafka.t2 in 152 ms [org.apache.iceberg.BaseMetastoreTableOperations]
2024-03-25 22:25:56,555 INFO || Refreshing table metadata from new version: s3a://bucket1/kafka/t2/metadata/00000-049c2bf3-1afa-41a4-a61d-21b1f5796621.metadata.json [org.apache.iceberg.BaseMetastoreTableOperations]
2024-03-25 22:25:56,576 INFO || Got brand-new compressor [.zstd] [org.apache.hadoop.io.compress.CodecPool]
2024-03-25 22:25:57,137 INFO || Sending event of type: COMMIT_REQUEST [io.tabular.iceberg.connect.channel.Channel]
2024-03-25 22:25:57,144 INFO || Started new commit with commit-id=0cea0e73-5540-4ed9-bfa8-dc0f53309d6b [io.tabular.iceberg.connect.channel.Coordinator]
>><<
2024-03-25 23:29:50,036 INFO || Commit 33baf198-56ef-45d0-af65-14c885996bdd not ready, received responses for 0 of 1 partitions, waiting for more [io.tabular.iceberg.connect.channel.CommitState]
2024-03-25 23:29:50,036 INFO || Handled event of type: COMMIT_READY [io.tabular.iceberg.connect.channel.Channel]
2024-03-25 23:29:50,038 INFO || Commit 33baf198-56ef-45d0-af65-14c885996bdd ready, received responses for all 1 partitions [io.tabular.iceberg.connect.channel.CommitState]
2024-03-25 23:29:50,069 INFO || Refreshing table metadata from new version: s3a://bucket1/kafka/t2/metadata/00002-32b57e34-13a1-4a32-9da7-5ef80259deec.metadata.json [org.apache.iceberg.BaseMetastoreTableOperations]
2024-03-25 23:29:50,078 INFO || Table loaded by catalog: iceberg.kafka.t2 [org.apache.iceberg.BaseMetastoreCatalog]
2024-03-25 23:29:50,917 INFO || Committed to table iceberg.kafka.t2 with the new metadata location s3a://bucket1/kafka/t2/metadata/00003-999f63e5-14d8-4fc3-aed5-aab69734638c.metadata.json [org.apache.iceberg.hive.HiveTableOperations]
2024-03-25 23:29:50,917 INFO || Successfully committed to table iceberg.kafka.t2 in 496 ms [org.apache.iceberg.BaseMetastoreTableOperations]
2024-03-25 23:29:50,917 INFO || Committed snapshot 1199420577274795395 (MergeAppend) [org.apache.iceberg.SnapshotProducer]
2024-03-25 23:29:50,943 INFO || Refreshing table metadata from new version: s3a://bucket1/kafka/t2/metadata/00003-999f63e5-14d8-4fc3-aed5-aab69734638c.metadata.json [org.apache.iceberg.BaseMetastoreTableOperations]
2024-03-25 23:29:51,014 INFO || Received metrics report: CommitReport{tableName=iceberg.kafka.t2, snapshotId=1199420577274795395, sequenceNumber=3, operation=append, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.854994462S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=1}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=3}, addedDeleteFiles=null, addedEqualityDeleteFiles=null, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=0}, addedRecords=CounterResult{unit=COUNT, value=1}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=6}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=1931}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=6041}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, addedEqualityDeletes=null, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=0}}, metadata={iceberg-version=Apache Iceberg 1.4.2 (commit f6bb9173b13424d77e7ad8439b5ef9627e530cb2)}} [org.apache.iceberg.metrics.LoggingMetricsReporter]
2024-03-25 23:29:51,015 INFO || Sending event of type: COMMIT_TABLE [io.tabular.iceberg.connect.channel.Channel]
2024-03-25 23:29:51,021 INFO || Commit complete to table kafka.t2, snapshot 1199420577274795395, commit ID 33baf198-56ef-45d0-af65-14c885996bdd, vtts null [io.tabular.iceberg.connect.channel.Coordinator]
2024-03-25 23:29:51,024 INFO || Sending event of type: COMMIT_COMPLETE [io.tabular.iceberg.connect.channel.Channel]
@bryanck Whats the best way to contribute here ?
You can open a PR.
Hello,
My HMS requires me to pass it SSL parameters like : truststore.path, truststore.password etc.
These are the parameters im trying to pass
When I try to pass these parameters with the iceberg connector as:
I get an error saying:
additional kafka connect log:
What am I missing?!
hms docker log: