DataLinkDC / dinky

Dinky is a real-time data development platform based on Apache Flink, enabling agile data development, deployment and operation.
http://www.dinky.org.cn
Apache License 2.0
3.11k stars 1.15k forks source link

[Bug] mysql整库同步到hudi不能创建hive表 #1687

Closed javaht closed 1 year ago

javaht commented 1 year ago

Search before asking

What happened

dinky0.7.1 flink cdc2.3.0 hudi0.11.1 kerbers ranger

整库同步sql如下: yarn-per job模式部署 EXECUTE CDCSOURCE USER_CDAS WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'xxxx', 'port' = '3306', 'username' = 'root', 'password' = 'xxxx', 'useSSl'='false', 'source.server-time-zone' = 'Asia/Shanghai', 'checkpoint'='600000', 'scan.startup.mode'='initial', 'parallelism'='1', 'database-name'='xxxx', 'table-name'='xxx\.*', 'sink.connector'='hudi', 'sink.path'='hdfs:///datas/xxxx/${tableName}', 'sink.hoodie.datasource.write.recordkey.field'='${pkList}', 'sink.hoodie.parquet.max.file.size'='268435456', -- 'sink.write.precombine.field'='ts', 'sink.write.tasks'='1', 'sink.write.bucket_assign.tasks'='2', 'sink.write.precombine'='true', 'sink.compaction.async.enabled'='true', 'sink.write.task.max.size'='10240', 'sink.write.rate.limit'='30000', 'sink.write.operation'='upsert', 'sink.table.type'='COPY_ON_WRITE', 'sink.compaction.tasks'='1', 'sink.compaction.delta_seconds'='20', 'sink.compaction.async.enabled'='true', 'sink.read.streaming.skip_compaction'='true', 'sink.compaction.delta_commits'='20', 'sink.compaction.trigger.strategy'='num_or_time', 'sink.compaction.max_memory'='500', 'sink.changelog.enabled'='true', 'sink.read.streaming.enabled'='true', 'sink.read.streaming.check.interval'='3', 'sink.hive_sync.enable'='true', 'sink.hive_sync.mode'='hms', 'sink.hive_sync.db'='hudi_ods_hbz', 'sink.hive_sync.table'='${tableName}', 'sink.table.prefix.schema'='true', 'sink.hive_sync.metastore.uris'='thrift://xxxx:9083', 'hive_sync.jdbc_url' = 'jdbc:hive2://xxxx:10000', 'sink.hive_sync.username'='hive', 'hive_sync.password' = '123456', 'sink.table.prefix'='odshbz', 'sink.table.lower' = 'true' ); 数据可以正常同步到hdfs,但是不能创建hive表。 查看日志错误如下:

java.lang.RuntimeException: data outPutTag is not exists!table name is xxxx.zx_tmp_crawl_news at com.dlink.cdc.sql.SQLSinkBuilder$3.lambda$processElement$0(SQLSinkBuilder.java:261) ~[dlink-app-1.14-0.7.1-jar-with-dependencies.jar:?] at java.util.Optional.orElseThrow(Optional.java:290) ~[?:1.8.0_241] at com.dlink.cdc.sql.SQLSinkBuilder$3.processElement(SQLSinkBuilder.java:260) ~[dlink-app-1.14-0.7.1-jar-with-dependencies.jar:?] at com.dlink.cdc.sql.SQLSinkBuilder$3.processElement(SQLSinkBuilder.java:249) ~[dlink-app-1.14-0.7.1-jar-with-dependencies.jar:?] at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:154) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0] at com.dlink.cdc.mysql.MysqlJsonDebeziumDeserializationSchema.deserialize(MysqlJsonDebeziumDeserializationSchema.java:65) ~[dlink-app-1.14-0.7.1-jar-with-dependencies.jar:?] at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:129) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:111) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:83) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0] at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:55) ~[flink-sql-connector-mysql-cdc-2.3.0.jar:2.3.0] at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) ~[flink-table_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:354) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) [flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) [flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) [flink-dist_2.12-1.14.5.jar:1.14.5] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist_2.12-1.14.5.jar:1.14.5] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_241] flink的依赖如下:

image

What you expected to happen

同步全部的表使用的方式用 database\.* 不可以? 猜测是这样

How to reproduce

null

Anything else

No response

Version

dev

Are you willing to submit PR?

Code of Conduct

aiwenmo commented 1 year ago

'table-name'='xxx\..*',

aiwenmo commented 1 year ago

匹配全部表,用两个点

javaht commented 1 year ago

我看了是hive的metastore的问题 但是我不知道怎么解决? 可能是哪方面的问题呢? 补充:standalone没问题。可以同步。yarn模式就不行

hdfs三台机器都配置了kerberos,hive的认证是hive/hadoop102@ ZHT.COM , hadoop是hadoop/hadoop@102@ZHT.COM

hive的metastore报错如下: 2023-03-05 21:26:13,327 ERROR [pool-6-thread-77] server.TThreadPoolServer (TThreadPoolServer.java:run(297)) - Error occurred during processing of message. java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: Invalid status -128 at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219) at org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge$Server$TUGIAssumingTransportFactory$1.run(HadoopThriftAuthBridge.java:694) at org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge$Server$TUGIAssumingTransportFactory$1.run(HadoopThriftAuthBridge.java:691) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:360) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709) at org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge$Server$TUGIAssumingTransportFactory.getTransport(HadoopThriftAuthBridge.java:691) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:269) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.thrift.transport.TTransportException: Invalid status -128 at org.apache.thrift.transport.TSaslTransport.sendAndThrowMessage(TSaslTransport.java:232) at org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:184) at org.apache.thrift.transport.TSaslServerTransport.handleSaslStartMessage(TSaslServerTransport.java:125) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:271) at org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41) at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216)

flink 报错如下: 2023-03-05 20:53:14,097 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Closed a connection to metastore, current connections: 5 2023-03-05 20:53:14,097 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Trying to connect to metastore with URI thrift://hadoop102:9083 2023-03-05 20:53:14,097 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Opened a connection to metastore, current connections: 6 2023-03-05 20:53:14,107 WARN org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - set_ugi() not successful, Likely cause: new client talking to old server. Continuing without it. org.apache.thrift.transport.TTransportException: null at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) ~[flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar:1.14.5] at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) ~[flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar:1.14.5] at org.apache.thrift.protocol.TBinaryProtocol.readStringBody(TBinaryProtocol.java:380) ~[flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar:1.14.5] at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:230) ~[flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar:1.14.5] at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77) ~[flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar:1.14.5] at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_set_ugi(ThriftHiveMetastore.java:4787) ~[flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar:1.14.5] at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.set_ugi(ThriftHiveMetastore.java:4773) ~[flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar:1.14.5] at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:534) ~[flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar:1.14.5] at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.reconnect(HiveMetaStoreClient.java:379) ~[flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar:1.14.5] at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient$1.run(RetryingMetaStoreClient.java:187) ~[flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar:1.14.5] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_212] at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_212] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898) ~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0] at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:183) ~[flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar:1.14.5] at com.sun.proxy.$Proxy72.getAllFunctions(Unknown Source) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_212] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212] at org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2773) ~[flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar:1.14.5] at com.sun.proxy.$Proxy72.getAllFunctions(Unknown Source) ~[?:?] at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:4603) ~[flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar:1.14.5] at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:291) ~[flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar:1.14.5] at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:274) ~[flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar:1.14.5] at org.apache.hadoop.hive.ql.metadata.Hive.(Hive.java:435) ~[flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar:1.14.5] at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:375) ~[flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar:1.14.5] at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:355) ~[flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar:1.14.5] at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:331) ~[flink-sql-connector-hive-3.1.2_2.12-1.14.5.jar:1.14.5] at org.apache.hudi.hive.ddl.HMSDDLExecutor.(HMSDDLExecutor.java:78) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.hive.HoodieHiveSyncClient.(HoodieHiveSyncClient.java:79) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.hive.HiveSyncTool.initSyncClient(HiveSyncTool.java:101) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.hive.HiveSyncTool.(HiveSyncTool.java:95) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.sink.utils.HiveSyncContext.hiveSyncTool(HiveSyncContext.java:79) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doSyncHive(StreamWriteOperatorCoordinator.java:335) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ~[hudi-flink1.14-bundle-0.12.0.jar:0.12.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_212] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_212] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]

javaht commented 1 year ago

把hive-site.xml放到hadoop的配置文件夹即可解决