Closed yunexuan closed 1 year ago
Thanks for your feedback, could you provide more info from client's log?
Here is the path: ${home}/logs/rocketmq/rocketmq-client.log
jdk1.8 rocketmq-client-java 5.0.4 rocketmq 5.0 What should I do?
方便贴下生产者初始化和发送的代码嘛?
jdk1.8 rocketmq-client-java 5.0.4 rocketmq 5.0 What should I do?
方便贴下生产者初始化和发送的代码嘛?
我也遇到类似的问题,用的是官方文档的代码(https://rocketmq.apache.org/zh/docs/quickStart/01quickstart) import java.io.IOException; import java.util.Collections; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
public class PushConsumerExample { private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
String endpoints = "localhost:9876";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
// 订阅消息的过滤规则,表示订阅所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 为消费者指定所属的消费者分组,Group需要提前创建。
String consumerGroup = "YourConsumerGroup";
// 指定需要订阅哪个目标Topic,Topic需要提前创建。
String topic = "TestTopic";
// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 设置消费者分组。
.setConsumerGroup(consumerGroup)
// 设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 设置消费监听器。
.setMessageListener(messageView -> {
// 处理消息并返回消费结果。
logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// 如果不需要再使用 PushConsumer,可关闭该实例。
// pushConsumer.close();
}
}
我这边的报错日志如下: Exception in thread "main" java.lang.IllegalStateException: Expected the service PushConsumerImpl-0 [FAILED] to be RUNNING, but the service has FAILED at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:381) at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractService.awaitRunning(AbstractService.java:305) at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractIdleService.awaitRunning(AbstractIdleService.java:165) at org.apache.rocketmq.client.java.impl.consumer.PushConsumerBuilderImpl.build(PushConsumerBuilderImpl.java:128) at com.nik.rocketmq.test.receive.ReceiveTest1.main(ReceiveTest1.java:45) Caused by: java.util.concurrent.ExecutionException: org.apache.rocketmq.shaded.io.grpc.StatusRuntimeException: UNAVAILABLE: Failed ALPN negotiation: Unable to find compatible protocol Channel Pipeline: [SslHandler#0, ProtocolNegotiators$ClientTlsHandler#0, WriteBufferingAndExceptionHandler#0, DefaultChannelPipeline$TailContext#0] at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:588) at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:567) at org.apache.rocketmq.shaded.com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:91) at org.apache.rocketmq.client.java.impl.ClientImpl.startUp(ClientImpl.java:188) at org.apache.rocketmq.client.java.impl.consumer.PushConsumerImpl.startUp(PushConsumerImpl.java:161) at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractIdleService$DelegateService$1.run(AbstractIdleService.java:62) at org.apache.rocketmq.shaded.com.google.common.util.concurrent.Callables.lambda$threadRenaming$3(Callables.java:103) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.rocketmq.shaded.io.grpc.StatusRuntimeException: UNAVAILABLE: Failed ALPN negotiation: Unable to find compatible protocol Channel Pipeline: [SslHandler#0, ProtocolNegotiators$ClientTlsHandler#0, WriteBufferingAndExceptionHandler#0, DefaultChannelPipeline$TailContext#0] at org.apache.rocketmq.shaded.io.grpc.Status.asRuntimeException(Status.java:539) at org.apache.rocketmq.shaded.io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544) at org.apache.rocketmq.shaded.io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) at org.apache.rocketmq.shaded.io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) at org.apache.rocketmq.shaded.io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:563) at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:744) at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723) at org.apache.rocketmq.shaded.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.apache.rocketmq.shaded.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more
@nikhe Hi nikhe! I met the same problem as you, and I solved it successfully. According to you log, you can try to change the string endpoints into "localhost:8081" in both producer and consumer.
This issue is stale because it has been open for 30 days with no activity. It will be closed in 3 days if no further activity occurs.
This issue was closed because it has been inactive for 3 days since being marked as stale.
jdk1.8 rocketmq-client-java 5.0.4 rocketmq 5.0 What should I do?