DTStack / chunjun

A data integration framework
https://dtstack.github.io/chunjun/
Apache License 2.0
3.98k stars 1.69k forks source link

[Bug] [sync] 两个带kerberos认证的CDH-6.3.2集群之间同步数据报错 #1659

Open wangxiaoming-wxm opened 1 year ago

wangxiaoming-wxm commented 1 year ago

Search before asking

What happened

6bd49a4b28ec26a55291d214dd216c0 以上是我的报错。

------------------------我是分割线---------------------

{
  "job": {
    "content": [
      {
        "reader" : {
          "parameter" : {
            "path" : "hdfs://ns1/user/hive/warehouse/payment_i17_ods_phq.db/city_info/*",
            "hadoopConfig" : {
              "hadoop.user.name": "dataarchitecture_reader",
              "dfs.ha.namenodes.ns1": "nn1,nn2",
              "fs.defaultFS": "hdfs://ns1",
              "dfs.namenode.rpc-address.ns1.nn2": "node104:8020",
              "dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
              "dfs.namenode.rpc-address.ns1.nn1": "node105:8020",
              "dfs.nameservices": "ns1",
              "fs.hdfs.impl.disable.cache": "true",
              "fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem",
              "hadoop.security.authentication": "Kerberos",
              "dfs.namenode.kerberos.principal": "hdfs/_HOST@TEST.COM",
              "dfs.datanode.kerberos.principal": "hdfs/_HOST@TEST.COM",
              "yarn.resourcemanager.principal": "rm/_HOST@TEST.COM",
              "dfs.namenode.kerberos.internal.spnego.principal": "HTTP/_HOST@TEST.COM",
              "hadoop.security.authorization": "true",
              "dfs.namenode.keytab.file": "/app/chunjun/kerberos/dataarchitecture_reader.keytab",
              "java.security.krb5.conf": "/app/chunjun/krb5/krb5_cdh1.conf",
              "useLocalFile": "true",
              "principalFile": "/app/chunjun/kerberos/dataarchitecture_reader.keytab",
              "principal": "dataarchitecture_reader/node105@TEST.COM",
              "dfs.client.use.datanode.hostname":true,
              "dfs.client.use.namenode.hostname":true,
              "sun.security.krb5.disableReferrals":true
            },
            "column": [
               {
               "name":"city_id",
               "type":"string"
               },
               {
               "name":"city_name",
               "type":"string"
               }
            ],
            "defaultFS" : "hdfs://ns1",
            "fieldDelimiter" : "\u0001",
            "encoding" : "utf-8",
            "fileType" : "text"
          },
          "name" : "hdfsreader"
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "path": "hdfs://nameservice1/user/hive/warehouse/pxb_test.db/prpjpayrefrec_oracle",
            "defaultFS": "hdfs://nameservice1",
            "column": [
              {
                "name":"city_id",
                "type":"string"
                },
                {
                "name":"city_name",
                "type":"string"
                }
            ],
            "fileType": "text",
            "maxFileSize": 10485760,
            "nextCheckRows": 20000,
            "fieldDelimiter": "\u0001",
            "encoding": "utf-8",
            "fileName": "dt=2023050813",
            "writeMode": "overwrite",
            "hadoopConfig": {
              "dfs.ha.namenodes.nameservice1": "nn10,nn20",
              "hadoop.user.name": "data_dev_pxb",
              "fs.defaultFS": "hdfs://nameservice1",
              "dfs.namenode.rpc-address.nameservice1.nn20": "twobigdata01:8020",
              "dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
              "dfs.namenode.rpc-address.nameservice1.nn10": "twobigdata02:8020",
              "dfs.nameservices": "nameservice1",
              "fs.hdfs.impl.disable.cache": "true",
              "fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem",           
              "hadoop.security.authentication": "Kerberos",
              "dfs.namenode.kerberos.principal": "hdfs/_HOST@TEST.COM",
              "dfs.datanode.kerberos.principal": "hdfs/_HOST@TEST.COM",
              "yarn.resourcemanager.principal": "rm/_HOST@TEST.COM",
              "dfs.namenode.kerberos.internal.spnego.principal": "HTTP/_HOST@TEST.COM",
              "hadoop.security.authorization": "true",
              "dfs.namenode.keytab.file": "/app/chunjun/kerberos/data_dev_pxb.keytab",
              "java.security.krb5.conf": "/app/chunjun/krb5/krb5_cdh2.conf",
              "useLocalFile": "true",
              "principalFile": "/app/chunjun/kerberos//data_dev_pxb.keytab",
              "principal": "data_dev_pxb/twobigdata02@TEST.COM",
              "dfs.client.use.datanode.hostname":"true",
              "dfs.client.use.namenode.hostname":"true"
            }   
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": 0
      }
    }
  }
}

--------------我是分割线------------- 以上是我的json文件

What you expected to happen

不知道在CDH之间同步数据,kerberos认证无法通过,是为什么,因为我是用stream插件,进行sync模式的时候,分别能通过chunjun读写上下游的CDH集群,但是两个CDH之间就是无法完成读写,总会报kerberos错误。(我确认kerberos账号及使用的keytab、krb5.conf文件没有问题,因为每一个cdh集群都能通过stream插件单独完成读写验证)

How to reproduce

按照我写的json格式进行验证,就能复现问题

Anything else

No response

Version

master

Are you willing to submit PR?

Code of Conduct

laixueyong commented 1 week ago

老哥想问一下为什么我读取hdfs进行Kerberos认证后报文件不存在 Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:811) at com.dtstack.chunjun.environment.MyLocalStreamEnvironment.execute(MyLocalStreamEnvironment.java:194) at com.dtstack.chunjun.Main.exeSyncJob(Main.java:227) at com.dtstack.chunjun.Main.main(Main.java:118) at com.dtstack.chunjun.client.local.LocalClusterClientHelper.submit(LocalClusterClientHelper.java:35) at com.dtstack.chunjun.client.Launcher.main(Launcher.java:119) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: com.dtstack.chunjun.throwable.ChunJunRuntimeException: error to create hdfs splits at com.dtstack.chunjun.connector.hdfs.source.BaseHdfsInputFormat.lambda$createInputSplitsInternal$0(BaseHdfsInputFormat.java:81) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:360) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709) at com.dtstack.chunjun.connector.hdfs.source.BaseHdfsInputFormat.createInputSplitsInternal(BaseHdfsInputFormat.java:75) at com.dtstack.chunjun.source.format.BaseRichInputFormat.createInputSplits(BaseRichInputFormat.java:128) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:247) at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:866) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:311) at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:322) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:276) at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:249) at org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:133) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:342) at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:327) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39) at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:163) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:472) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: com.dtstack.chunjun.throwable.ChunJunRuntimeException: java.io.FileNotFoundException: File hdfs://nameservice1/user/hive/warehouse/pec.db/px/ does not exist. at com.dtstack.chunjun.connector.hdfs.source.HdfsParquetInputFormat.createHdfsSplit(HdfsParquetInputFormat.java:110) at com.dtstack.chunjun.connector.hdfs.source.BaseHdfsInputFormat.lambda$createInputSplitsInternal$0(BaseHdfsInputFormat.java:79) ... 24 more Caused by: java.io.FileNotFoundException: File hdfs://nameservice1/user/hive/warehouse/pec.db/px/ does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1058) at org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:131) at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1118) at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1115) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1125) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910) at com.dtstack.chunjun.connector.hdfs.source.HdfsParquetInputFormat.getAllPartitionPath(HdfsParquetInputFormat.java:90) at com.dtstack.chunjun.connector.hdfs.source.HdfsParquetInputFormat.createHdfsSplit(HdfsParquetInputFormat.java:108) ... 25 more

    at com.dtstack.chunjun.source.format.BaseRichInputFormat.open(BaseRichInputFormat.java:150)
    at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:126)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.run