databricks / iceberg-kafka-connect

Apache License 2.0
220 stars 49 forks source link

Sink worker failing to authenticate to hive metastore (intermittent issue) #299

Open julien-alpaca opened 1 month ago

julien-alpaca commented 1 month ago

Iceberg sink tasks are failing due to the following error:

          "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:635)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:344)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:246)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:215)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:280)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.RuntimeException: Unable to determine current user
at org.apache.hadoop.conf.Configuration$Resource.getRestrictParserDefault(Configuration.java:294)
at org.apache.hadoop.conf.Configuration$Resource.<init>(Configuration.java:260)
at org.apache.hadoop.conf.Configuration$Resource.<init>(Configuration.java:252)
at org.apache.hadoop.conf.Configuration.addResource(Configuration.java:979)
at org.apache.hadoop.hive.conf.HiveConf.initialize(HiveConf.java:4057)
at org.apache.hadoop.hive.conf.HiveConf.<init>(HiveConf.java:4019)
at org.apache.iceberg.hive.HiveClientPool.<init>(HiveClientPool.java:55)
at org.apache.iceberg.hive.CachedClientPool.lambda$clientPool$0(CachedClientPool.java:96)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1916)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
at com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
at com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
at org.apache.iceberg.hive.CachedClientPool.clientPool(CachedClientPool.java:96)
at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:146)
at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97)
at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:80)
at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:49)
at io.tabular.iceberg.connect.data.IcebergWriterFactory.createWriter(IcebergWriterFactory.java:54)
at io.tabular.iceberg.connect.channel.Worker.lambda$writerForTable$4(Worker.java:157)
at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1228)
at io.tabular.iceberg.connect.channel.Worker.writerForTable(Worker.java:156)
at io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$1(Worker.java:112)
at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4305)
at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:110)
at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:99)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at io.tabular.iceberg.connect.channel.Worker.write(Worker.java:85)
at io.tabular.iceberg.connect.channel.TaskImpl.put(TaskImpl.java:42)
at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:76)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:605)
... 11 more
Caused by: org.apache.hadoop.security.KerberosAuthException: failure to login: javax.security.auth.login.LoginException: java.lang.NullPointerException: invalid null input: name
at jdk.security.auth/com.sun.security.auth.UnixPrincipal.<init>(UnixPrincipal.java:71)
at jdk.security.auth/com.sun.security.auth.module.UnixLoginModule.login(UnixLoginModule.java:134)
at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:754)
at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:678)
at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:676)
at java.base/java.security.AccessController.doPrivileged(AccessController.java:714)
at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:676)
at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at org.apache.hadoop.security.UserGroupInformation$HadoopLoginContext.login(UserGroupInformation.java:2091)
at org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:2001)
at org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:722)
at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:672)
at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:579)
at org.apache.hadoop.conf.Configuration$Resource.getRestrictParserDefault(Configuration.java:292)
at org.apache.hadoop.conf.Configuration$Resource.<init>(Configuration.java:260)
at org.apache.hadoop.conf.Configuration$Resource.<init>(Configuration.java:252)
at org.apache.hadoop.conf.Configuration.addResource(Configuration.java:979)
at org.apache.hadoop.hive.conf.HiveConf.initialize(HiveConf.java:4057)
at org.apache.hadoop.hive.conf.HiveConf.<init>(HiveConf.java:4019)
at org.apache.iceberg.hive.HiveClientPool.<init>(HiveClientPool.java:55)
at org.apache.iceberg.hive.CachedClientPool.lambda$clientPool$0(CachedClientPool.java:96)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1916)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
at com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
at com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
at org.apache.iceberg.hive.CachedClientPool.clientPool(CachedClientPool.java:96)
at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:146)
at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97)
at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:80)
at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:49)
at io.tabular.iceberg.connect.data.IcebergWriterFactory.createWriter(IcebergWriterFactory.java:54)
at io.tabular.iceberg.connect.channel.Worker.lambda$writerForTable$4(Worker.java:157)
at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1228)
at io.tabular.iceberg.connect.channel.Worker.writerForTable(Worker.java:156)
at io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$1(Worker.java:112)
at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4305)
at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:110)
at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:99)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at io.tabular.iceberg.connect.channel.Worker.write(Worker.java:85)
at io.tabular.iceberg.connect.channel.TaskImpl.put(TaskImpl.java:42)
at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:76)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:605)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:344)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:246)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:215)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:280)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)

at org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:2012)
at org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:722)
at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:672)
at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:579)
at org.apache.hadoop.conf.Configuration$Resource.getRestrictParserDefault(Configuration.java:292)
... 43 more
Caused by: javax.security.auth.login.LoginException: java.lang.NullPointerException: invalid null input: name
at jdk.security.auth/com.sun.security.auth.UnixPrincipal.<init>(UnixPrincipal.java:71)
at jdk.security.auth/com.sun.security.auth.module.UnixLoginModule.login(UnixLoginModule.java:134)
at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:754)
at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:678)
at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:676)
at java.base/java.security.AccessController.doPrivileged(AccessController.java:714)
at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:676)
at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at org.apache.hadoop.security.UserGroupInformation$HadoopLoginContext.login(UserGroupInformation.java:2091)
at org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:2001)
at org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:722)
at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:672)
at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:579)
at org.apache.hadoop.conf.Configuration$Resource.getRestrictParserDefault(Configuration.java:292)
at org.apache.hadoop.conf.Configuration$Resource.<init>(Configuration.java:260)
at org.apache.hadoop.conf.Configuration$Resource.<init>(Configuration.java:252)
at org.apache.hadoop.conf.Configuration.addResource(Configuration.java:979)
at org.apache.hadoop.hive.conf.HiveConf.initialize(HiveConf.java:4057)
at org.apache.hadoop.hive.conf.HiveConf.<init>(HiveConf.java:4019)
at org.apache.iceberg.hive.HiveClientPool.<init>(HiveClientPool.java:55)
at org.apache.iceberg.hive.CachedClientPool.lambda$clientPool$0(CachedClientPool.java:96)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1916)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
at com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
at com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
at com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
at org.apache.iceberg.hive.CachedClientPool.clientPool(CachedClientPool.java:96)
at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:146)
at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97)
at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:80)
at org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:49)
at io.tabular.iceberg.connect.data.IcebergWriterFactory.createWriter(IcebergWriterFactory.java:54)
at io.tabular.iceberg.connect.channel.Worker.lambda$writerForTable$4(Worker.java:157)
at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1228)
at io.tabular.iceberg.connect.channel.Worker.writerForTable(Worker.java:156)
at io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$1(Worker.java:112)
at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4305)
at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:110)
at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:99)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at io.tabular.iceberg.connect.channel.Worker.write(Worker.java:85)
at io.tabular.iceberg.connect.channel.TaskImpl.put(TaskImpl.java:42)
at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:76)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:605)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:344)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:246)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:215)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:280)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)

at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:849)
at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:678)
at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:676)
at java.base/java.security.AccessController.doPrivileged(AccessController.java:714)
at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:676)
at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at org.apache.hadoop.security.UserGroupInformation$HadoopLoginContext.login(UserGroupInformation.java:2091)
at org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:2001)
... 47 more
"

Note that the connector can create parquet files and metadata files in GCS but after some time this error happens. Kerberos authentication is disabled on the hive metastore server as well as on the client.

Sink connector configuration:

    "info": {
      "name": "iceberg-sink",
      "config": {
        "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
        "iceberg.tables.cdc-field": "op",
        "tasks.max": "3",
        "topics": "XXX.public.orders",
        "iceberg.catalog.io-impl": "org.apache.iceberg.gcp.gcs.GCSFileIO",
        "iceberg.catalog.uri": "thrift://XXX.XXXsvc.cluster.local:9083",
        "iceberg.tables.auto-create-enabled": "true",
        "iceberg.tables": "default.orders",
        "name": "iceberg-sink",
        "iceberg.catalog.warehouse": "gs://XXX/datawarehouse",
        "iceberg.tables.default-id-columns": "id",
        "iceberg.catalog.type": "hive"
      },
    }
zy2014810 commented 1 month ago

I also encountered the same problem, can you solve it?

julien-alpaca commented 1 month ago

I also encountered the same problem, can you solve it?

You need to run with a user that exists in the docker image:

securityContext:
        runAsUser: 0
        runAsGroup: 0

In my case I am using root. By default kubernetes runs with a non root user. Also it was intermittent because the HMS client was able to write the data but it was failing only when writing the snapshot metadata.