apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.86k stars 1.77k forks source link

[Bug] [connector-hive] hive sink error org.apache.thrift.transport.TTransportException #6749

Open zhaoli2333 opened 5 months ago

zhaoli2333 commented 5 months ago

Search before asking

What happened

I got en error when syncing datas from mysql to hive.

SeaTunnel Version

Seatunnel version: 2.3.3 Hive version: 2.3.7 HDFS version: 2.6.x flink version: 1.14

SeaTunnel Config

{
        "env":{
            "execution.parallelism":2,
            "job.mode":"STREAMING"
        },
        "source":{
            "Jdbc":{
                "result_table_name": "kafka_topic_usage",
                "url":"jdbc:mysql://localhost:3306/bigdata_component",
                "driver":"com.mysql.cj.jdbc.Driver",
                "user":"xxxxxx",
                "password":"xxxxxx",
                "query":"select * from kafka_topic_usage",
                "partition_column":"id",
                "partition_num":4
            }
        },
        "sink":{
            "Hive": {
                "source_table_name": "kafka_topic_usage",
                "table_name": "hive_tmp.kafka_topic_usage",
                "metastore_uri": "thrift://test-002.test-hadoop.com:9083"
            }
        }

Running Command

./bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/v2.streaming.conf.template

Error Exception

2024-04-24 16:59:31.610 [flink-akka.actor.default-dispatcher-16] INFO org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery [] - Load SeaTunnelSink Plugin from /data/yarn/nm/usercache/hive/appcache/application_1713409518776_0165/filecache/connectors/seatunnel
2024-04-24 16:59:31.614 [flink-akka.actor.default-dispatcher-16] INFO org.apache.seatunnel.plugin.discovery.AbstractPluginDiscovery [] - Load plugin: PluginIdentifier{engineType='seatunnel', pluginType='sink', pluginName='Hive'} from classpath
2024-04-24 16:59:31.635 [flink-akka.actor.default-dispatcher-16] INFO org.apache.hadoop.hive.conf.HiveConf                         [] - Found configuration file null
2024-04-24 16:59:31.848 [flink-akka.actor.default-dispatcher-16] INFO hive.metastore                                               [] - Trying to connect to metastore with URI thrift://test-002.test-hadoop.com:9083
2024-04-24 16:59:31.867 [flink-akka.actor.default-dispatcher-16] INFO hive.metastore                                               [] - Opened a connection to metastore, current connections: 1
2024-04-24 16:59:31.911 [flink-akka.actor.default-dispatcher-16] WARN hive.metastore                                               [] - set_ugi() not successful, Likely cause: new client talking to old server. Continuing without it.
org.apache.thrift.transport.TTransportException: null
    at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) ~[connector-hive-2.3.3.jar:2.3.3]
    at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) ~[connector-hive-2.3.3.jar:2.3.3]
    at org.apache.thrift.protocol.TBinaryProtocol.readStringBody(TBinaryProtocol.java:380) ~[connector-hive-2.3.3.jar:2.3.3]
    at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:230) ~[connector-hive-2.3.3.jar:2.3.3]
    at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77) ~[connector-hive-2.3.3.jar:2.3.3]
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_set_ugi(ThriftHiveMetastore.java:4247) ~[connector-hive-2.3.3.jar:2.3.3]
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.set_ugi(ThriftHiveMetastore.java:4233) ~[connector-hive-2.3.3.jar:2.3.3]
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:496) ~[connector-hive-2.3.3.jar:2.3.3]
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:245) ~[connector-hive-2.3.3.jar:2.3.3]
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:128) ~[connector-hive-2.3.3.jar:2.3.3]
    at org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy.<init>(HiveMetaStoreProxy.java:61) ~[connector-hive-2.3.3.jar:2.3.3]
    at org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy.getInstance(HiveMetaStoreProxy.java:77) ~[connector-hive-2.3.3.jar:2.3.3]
    at org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.getTableInfo(HiveConfig.java:73) ~[connector-hive-2.3.3.jar:2.3.3]
    at org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSink.prepare(HiveSink.java:123) ~[connector-hive-2.3.3.jar:2.3.3]
    at org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor.lambda$initializePlugins$0(SinkExecuteProcessor.java:85) ~[seatunnel-flink-13-starter.jar:2.3.3]
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_141]
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) ~[?:1.8.0_141]
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_141]
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_141]
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_141]
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_141]
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[?:1.8.0_141]
    at org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor.initializePlugins(SinkExecuteProcessor.java:90) ~[seatunnel-flink-13-starter.jar:2.3.3]
    at org.apache.seatunnel.core.starter.flink.execution.FlinkAbstractPluginExecuteProcessor.<init>(FlinkAbstractPluginExecuteProcessor.java:67) ~[seatunnel-flink-13-starter.jar:2.3.3]
    at org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor.<init>(SinkExecuteProcessor.java:56) ~[seatunnel-flink-13-starter.jar:2.3.3]
    at org.apache.seatunnel.core.starter.flink.execution.FlinkExecution.<init>(FlinkExecution.java:96) ~[seatunnel-flink-13-starter.jar:2.3.3]
    at org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:69) ~[seatunnel-flink-13-starter.jar:2.3.3]
    at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) ~[seatunnel-flink-13-starter.jar:2.3.3]
    at org.apache.seatunnel.core.starter.flink.SeaTunnelFlink.main(SeaTunnelFlink.java:34) ~[seatunnel-flink-13-starter.jar:2.3.3]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_141]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_141]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_141]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_141]
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261) ~[flink-dist_2.11-1.14.4.jar:1.14.4]
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:224) ~[flink-dist_2.11-1.14.4.jar:1.14.4]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_141]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_141]
    at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-akka_29024eb4-87b7-43a7-b4cc-c0280e88fd55.jar:1.14.4]
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_29024eb4-87b7-43a7-b4cc-c0280e88fd55.jar:1.14.4]
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_29024eb4-87b7-43a7-b4cc-c0280e88fd55.jar:1.14.4]
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_29024eb4-87b7-43a7-b4cc-c0280e88fd55.jar:1.14.4]
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_29024eb4-87b7-43a7-b4cc-c0280e88fd55.jar:1.14.4]
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_141]
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_141]
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_141]
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) [?:1.8.0_141]
2024-04-24 16:59:31.922 [flink-akka.actor.default-dispatcher-16] INFO hive.metastore                                               [] - Connected to metastore.
2024-04-24 16:59:31.932 [flink-akka.actor.default-dispatcher-16] ERROR org.apache.seatunnel.core.starter.SeaTunnel                  [] - 

===============================================================================

2024-04-24 16:59:31.932 [flink-akka.actor.default-dispatcher-16] ERROR org.apache.seatunnel.core.starter.SeaTunnel                  [] - Fatal Error, 

2024-04-24 16:59:31.932 [flink-akka.actor.default-dispatcher-16] ERROR org.apache.seatunnel.core.starter.SeaTunnel                  [] - Please submit bug report in https://github.com/apache/seatunnel/issues

2024-04-24 16:59:31.932 [flink-akka.actor.default-dispatcher-16] ERROR org.apache.seatunnel.core.starter.SeaTunnel                  [] - Reason:ErrorCode:[HIVE-03], ErrorDescription:[Get hive table information from hive metastore service failed] - Get table [hive_tmp.kafka_topic_usage] information failed 

2024-04-24 16:59:31.935 [flink-akka.actor.default-dispatcher-16] ERROR org.apache.seatunnel.core.starter.SeaTunnel                  [] - Exception StackTrace:org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException: ErrorCode:[HIVE-03], ErrorDescription:[Get hive table information from hive metastore service failed] - Get table [hive_tmp.kafka_topic_usage] information failed
    at org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy.getTable(HiveMetaStoreProxy.java:90)
    at org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.getTableInfo(HiveConfig.java:74)
    at org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSink.prepare(HiveSink.java:123)
    at org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor.lambda$initializePlugins$0(SinkExecuteProcessor.java:85)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
    at org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor.initializePlugins(SinkExecuteProcessor.java:90)
    at org.apache.seatunnel.core.starter.flink.execution.FlinkAbstractPluginExecuteProcessor.<init>(FlinkAbstractPluginExecuteProcessor.java:67)
    at org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor.<init>(SinkExecuteProcessor.java:56)
    at org.apache.seatunnel.core.starter.flink.execution.FlinkExecution.<init>(FlinkExecution.java:96)
    at org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:69)
    at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
    at org.apache.seatunnel.core.starter.flink.SeaTunnelFlink.main(SeaTunnelFlink.java:34)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:261)
    at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:224)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.thrift.transport.TTransportException
    at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
    at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
    at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
    at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
    at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
    at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_table(ThriftHiveMetastore.java:1514)
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_table(ThriftHiveMetastore.java:1500)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1346)
    at org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy.getTable(HiveMetaStoreProxy.java:86)
    ... 37 more


### Zeta or Flink or Spark Version

flink version: 1.14

### Java or Scala Version

java 1.8

### Screenshots

_No response_

### Are you willing to submit PR?

- [ ] Yes I am willing to submit a PR!

### Code of Conduct

- [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
liunaijie commented 4 months ago

image image

looks like your table not exist