apache / doris

Apache Doris is an easy-to-use, high performance and unified analytics database.
https://doris.apache.org
Apache License 2.0
12.77k stars 3.29k forks source link

flink Connect to Doris BE{host='172.16.50.4', port=9060}failed #10495

Open EnglishVillage opened 2 years ago

EnglishVillage commented 2 years ago

110.42.205.125 and 172.16.50.4 are the same server, 110.42.205.125 is the external network address, and 172.16.50.4 is the internal network address. I tried to add the benodes parameter, but this parameter is not supported! What should I do?

flink code: tableEnv.executeSql("CREATE TABLE flink_doris_source (\n" + " username STRING,\n" + " siteid INT,\n" + " citycode SMALLINT,\n" + " pv BIGINT\n" + " ) \n" + " WITH (\n" + " 'connector' = 'doris',\n" + " 'fenodes' = '110.42.205.125:8082',\n" + " 'table.identifier' = 'test_db.table1',\n" + " 'username' = 'test',\n" + " 'password' = 'test'\n" + ")"); tableEnv.sqlQuery("select * from flink_doris_source").execute().print();

error: Exception in thread "main" java.lang.RuntimeException: Failed to fetch next result at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370) at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152) at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160) at bjh.bigdata.flink.test.dorisTest.main(dorisTest.java:45) Caused by: java.io.IOException: Failed to fetch job execution result at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ... 5 more Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175) ... 7 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614) at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174) ... 7 more Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.doris.flink.exception.ConnectedFailedException: Connect to Doris BE{host='172.16.50.4', port=9060}failed. at org.apache.doris.flink.backend.BackendClient.open(BackendClient.java:97) at org.apache.doris.flink.backend.BackendClient.(BackendClient.java:67) at org.apache.doris.flink.datastream.ScalaValueReader.(ScalaValueReader.scala:46) at org.apache.doris.flink.table.DorisRowDataInputFormat.open(DorisRowDataInputFormat.java:96) at org.apache.doris.flink.table.DorisRowDataInputFormat.open(DorisRowDataInputFormat.java:45) at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) Caused by: org.apache.doris.shaded.org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection timed out: connect at org.apache.doris.shaded.org.apache.thrift.transport.TSocket.open(TSocket.java:226) at org.apache.doris.flink.backend.BackendClient.open(BackendClient.java:86) ... 8 more Caused by: java.net.ConnectException: Connection timed out: connect at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at org.apache.doris.shaded.org.apache.thrift.transport.TSocket.open(TSocket.java:221) ... 9 more

stalary commented 2 years ago

Whether your flink service can connect to be node?

EnglishVillage commented 2 years ago

Whether your flink service can connect to be node?

yes, The ip of the external network can access be, but the ip of the internal network cannot access be. Now, the WITH parameter of flink can only configure fenodes, but not benodes. It causes the internal access to benodes through the ip of the intranet. so, i can't solve this.

stalary commented 2 years ago

Whether your flink service can connect to be node?

yes, The ip of the external network can access be, but the ip of the internal network cannot access be. Now, the WITH parameter of flink can only configure fenodes, but not benodes. It causes the internal access to benodes through the ip of the intranet. so, i can't solve this.

It is recommended that you configure priority_networks for be.

EnglishVillage commented 2 years ago

Whether your flink service can connect to be node?

yes, The ip of the external network can access be, but the ip of the internal network cannot access be. Now, the WITH parameter of flink can only configure fenodes, but not benodes. It causes the internal access to benodes through the ip of the intranet. so, i can't solve this.

It is recommended that you configure priority_networks for be.

Yes, I have configured the ip of the intranet and extranet. As follows: priority_networks = 172.16.50.4/24;110.42.205.125/24 But it still gives an error!

JNSimba commented 2 years ago

Whether your flink service can connect to be node?

yes, The ip of the external network can access be, but the ip of the internal network cannot access be. Now, the WITH parameter of flink can only configure fenodes, but not benodes. It causes the internal access to benodes through the ip of the intranet. so, i can't solve this.

It is recommended that you configure priority_networks for be.

Yes, I have configured the ip of the intranet and extranet. As follows: priority_networks = 172.16.50.4/24;110.42.205.125/24 But it still gives an error!

Here is the ip taken from the return value of show frontends. To make sure that the flink client can access this ip address

stalary commented 2 years ago

Whether your flink service can connect to be node?

yes, The ip of the external network can access be, but the ip of the internal network cannot access be. Now, the WITH parameter of flink can only configure fenodes, but not benodes. It causes the internal access to benodes through the ip of the intranet. so, i can't solve this.

It is recommended that you configure priority_networks for be.

Yes, I have configured the ip of the intranet and extranet. As follows: priority_networks = 172.16.50.4/24;110.42.205.125/24 But it still gives an error!

What IP is used to add the BE node?

zzzzzzzs commented 2 years ago

I also encountered this problem. I read the data in the Doris table of the remote server from the local access. I set the remote IP in the local program, but the log prints the intranet IP.

meetbigdata commented 1 year ago

我也遇到这个问题,能帮忙分析一下吗

ShoJinto commented 7 months ago

你好!请问这个问题是否有解决方案呢?1年半时间了,难道就没有可用的方案么?我看文档中有介绍 auto-redirect 但是flinkSQL中直接报错:不支持该选项

org.apache.flink.table.api.ValidationException: Unsupported options found for 'doris'.

Unsupported options:

auto-redirect

Supported options:

default-database
doris.batch.size
doris.deserialize.arrow.async
doris.deserialize.queue.size
doris.exec.mem.limit
doris.filter.query
doris.read.field
....
JNSimba commented 7 months ago

image You can use this parameter and fill in be’s external network IP here. connector 1.5.x+

shengbinxu commented 4 months ago

image You can use this parameter and fill in be’s external network IP here. connector 1.5.x+

thank you very much! https://doris.apache.org/docs/dev/ecosystem/flink-doris-connector/