apache / rocketmq-clients

RocketMQ Clients - Collection of Client Bindings for Apache RocketMQ
https://rocketmq.apache.org/
Apache License 2.0
300 stars 202 forks source link

[Bug] java client(both producer or consumer) can't start if one of proxy (in local mode) was crashed. #818

Open Yelijah opened 3 weeks ago

Yelijah commented 3 weeks ago

Before Creating the Bug Report

Programming Language of the Client

Java

Runtime Platform Environment

macos

RocketMQ Version of the Client/Server

server:5.1.4 client: all version was tested.

Run or Compiler Version

No response

Describe the Bug

rocketmq server: 2master 2slave, proxy is in local mode.

when rocketmq server has many proxy nodes and the client addressed all of them, then if one proxy was crashed, the client (both proxy and consumer)can't start also!

the rocketmq client error log is:

2024-08-17 16:48:02.654 INFO  [60776] [SimpleConsumerImpl-0 STARTING] [o.a.r.c.j.i.ClientImpl#?:?] - Begin to fetch topic(s) route data from remote during client startup, clientId=ElijahMBP19.local@60776@0@2odcwjmfyo, topics=[fifoSimpleConsumerTopic]
2024-08-17 16:48:03.653 INFO  [60776] [RocketmqClientScheduler-0-2] [o.a.r.c.j.i.ClientManagerImpl#?:?] - Start to log statistics, clientVersion=5.0.7, clientWrapperVersion=null, clientEndpoints=ipv4:192.168.101.128:18081,192.168.101.219:18081, os description=[Mac OS X 10.16], java description=[AdoptOpenJDK OpenJDK 64-Bit Server VM 25.265-b01], clientId=ElijahMBP19.local@60776@0@2odcwjmfyo
2024-08-17 16:48:04.256 ERROR [60776] [RocketmqClientAsyncWorker-0-14] [o.a.r.c.j.i.ClientSessionImpl#?:?] - Exception raised from stream response observer, clientId=ElijahMBP19.local@60776@0@2odcwjmfyo, endpoints=ipv4:192.168.101.128:18081
org.apache.rocketmq.shaded.io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
    at org.apache.rocketmq.shaded.io.grpc.Status.asRuntimeException(Status.java:539)
    at org.apache.rocketmq.shaded.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:487)
    at org.apache.rocketmq.shaded.io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at org.apache.rocketmq.shaded.io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at org.apache.rocketmq.shaded.io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:563)
    at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
    at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:744)
    at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
    at org.apache.rocketmq.shaded.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at org.apache.rocketmq.shaded.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.rocketmq.shaded.io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /192.168.101.128:18081
Caused by: java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
    at org.apache.rocketmq.shaded.io.grpc.netty.shaded.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337)
    at org.apache.rocketmq.shaded.io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
    at org.apache.rocketmq.shaded.io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:710)
    at org.apache.rocketmq.shaded.io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
    at org.apache.rocketmq.shaded.io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
    at org.apache.rocketmq.shaded.io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
    at org.apache.rocketmq.shaded.io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at org.apache.rocketmq.shaded.io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at org.apache.rocketmq.shaded.io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
java.lang.IllegalStateException: Expected the service SimpleConsumerImpl-0 [FAILED] to be RUNNING, but the service has FAILED

    at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:381)
    at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractService.awaitRunning(AbstractService.java:305)
    at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractIdleService.awaitRunning(AbstractIdleService.java:165)
    at org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerBuilderImpl.build(SimpleConsumerBuilderImpl.java:82)
    at cn.ymr.rocketmq5.order.FiFoMsgTest.buildSimpleConsumer(FiFoMsgTest.java:227)
    at cn.ymr.rocketmq5.order.FiFoMsgTest.simpleConsumeFiFoMsgAsync(FiFoMsgTest.java:186)
    at cn.ymr.rocketmq5.order.FiFoMsgTest.simpleConsumeFiFoMsgRetry1(FiFoMsgTest.java:148)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
    at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55)
Caused by: java.util.concurrent.CancellationException: Task was cancelled.
    at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.cancellationExceptionWithCause(AbstractFuture.java:1543)
    at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:586)
    at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:567)
    at org.apache.rocketmq.shaded.com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:91)
    at org.apache.rocketmq.client.java.impl.ClientImpl.startUp(ClientImpl.java:188)
    at org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerImpl.startUp(SimpleConsumerImpl.java:93)
    at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractIdleService$DelegateService$1.run(AbstractIdleService.java:62)
    at org.apache.rocketmq.shaded.com.google.common.util.concurrent.Callables.lambda$threadRenaming$3(Callables.java:103)
    at java.lang.Thread.run(Thread.java:748)

Does it mean grpc client is not a highly available design??? It's too dangerous!!!

Steps to Reproduce

just like above.

What Did You Expect to See?

the client start successfully if any of proxy is alive.

What Did You See Instead?

the client start failed if any of proxy is dead.

Additional Context

No response

Yelijah commented 2 weeks ago

when proxy is in cluster mode, anything works ok.

Yelijah commented 2 weeks ago

I add some breakpoint and logs in java client. And find some problems:

image
2024/08/26 23:16:52 rocketmq-queue-async-rpc-3-worker-3 INFO [basic]  onTopicRouteDataFetched routeEndpoints:[ipv4:192.168.101.128:18181, ipv4:192.168.101.219:18181] existRouteEndpoints:[]
2024/08/26 23:16:52 rocketmq-queue-async-rpc-3-worker-3 INFO [basic]  getClientSession endpoints=ipv4:192.168.101.128:18181
2024/08/26 23:16:52 rocketmq-queue-async-rpc-3-worker-3 INFO [basic]  getClientSession endpoints=ipv4:192.168.101.219:18181

broker's endpoints got by ClientManager#queryRoute are two dependent address, not like "ipv4:192.168.101.128:18181;ipv4:192.168.101.219:18181".

image

Then ClientImpl#getClientSession will try to connect to every endpoint adress. if one of the two proxy(broker) is crahsed, the client connected error and shutdown!

But when proxy nodes are in cluster mode(dependant with broker) , broker's endpoint got by ClientManager#queryRoute is just one union adress like "ipv4:192.168.101.128:18181;ipv4:192.168.101.219:18181"