dyrnq / springcloud-rocketmq-example

springcloud-rocketmq-example
0 stars 0 forks source link

Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Message type is not specified #11

Open dyrnq opened 5 hours ago

dyrnq commented 5 hours ago
Exception in thread "main" java.lang.IllegalStateException: Expected the service ProducerImpl-0 [FAILED] to be RUNNING, but the service has FAILED
    at com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:384)
    at com.google.common.util.concurrent.AbstractService.awaitRunning(AbstractService.java:308)
    at com.google.common.util.concurrent.AbstractIdleService.awaitRunning(AbstractIdleService.java:160)
    at org.apache.rocketmq.client.java.impl.producer.ProducerBuilderImpl.build(ProducerBuilderImpl.java:93)
    at org.apache.rocketmq.client.java.example.ProducerSingleton.buildProducer(ProducerSingleton.java:67)
    at org.apache.rocketmq.client.java.example.ProducerSingleton.getInstance(ProducerSingleton.java:74)
    at org.apache.rocketmq.client.java.example.ProducerNormalMessageExample.main(ProducerNormalMessageExample.java:39)
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Message type is not specified
    at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:592)
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:571)
    at com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:91)
    at org.apache.rocketmq.client.java.impl.ClientImpl.startUp(ClientImpl.java:188)
    at org.apache.rocketmq.client.java.impl.producer.ProducerImpl.startUp(ProducerImpl.java:114)
    at com.google.common.util.concurrent.AbstractIdleService$DelegateService.lambda$doStart$0(AbstractIdleService.java:64)
    at com.google.common.util.concurrent.Callables.lambda$threadRenaming$3(Callables.java:105)
    at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.IllegalArgumentException: Message type is not specified
    at org.apache.rocketmq.client.java.message.MessageType.fromProtobuf(MessageType.java:38)
    at org.apache.rocketmq.client.java.route.MessageQueueImpl.<init>(MessageQueueImpl.java:43)
    at org.apache.rocketmq.client.java.route.TopicRouteData.<init>(TopicRouteData.java:48)
    at org.apache.rocketmq.client.java.impl.ClientImpl.lambda$fetchTopicRoute0$6(ClientImpl.java:623)
    at com.google.common.util.concurrent.AbstractTransformFuture$AsyncTransformFuture.doTransform(AbstractTransformFuture.java:223)
    at com.google.common.util.concurrent.AbstractTransformFuture$AsyncTransformFuture.doTransform(AbstractTransformFuture.java:210)
    at com.google.common.util.concurrent.AbstractTransformFuture.run(AbstractTransformFuture.java:123)
    at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1286)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1055)
    at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:782)
    at io.grpc.stub.ClientCalls$GrpcFuture.set(ClientCalls.java:569)
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:542)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:563)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:744)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    ... 1 more
dyrnq commented 4 hours ago
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@60cabb0b]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-6.3.4.jar:6.3.4]
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108) ~[spring-integration-core-6.3.4.jar:6.3.4]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) ~[spring-integration-core-6.3.4.jar:6.3.4]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132) ~[spring-integration-core-6.3.4.jar:6.3.4]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-6.3.4.jar:6.3.4]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-6.3.4.jar:6.3.4]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.3.4.jar:6.3.4]
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:390) ~[spring-integration-core-6.3.4.jar:6.3.4]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:334) ~[spring-integration-core-6.3.4.jar:6.3.4]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:304) ~[spring-integration-core-6.3.4.jar:6.3.4]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-6.1.13.jar:6.1.13]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-6.1.13.jar:6.1.13]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-6.1.13.jar:6.1.13]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-6.1.13.jar:6.1.13]
    at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262) ~[spring-integration-core-6.3.4.jar:6.3.4]
    at io.micrometer.observation.Observation.observe(Observation.java:499) ~[micrometer-observation-1.13.4.jar:1.13.4]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262) ~[spring-integration-core-6.3.4.jar:6.3.4]
    at com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQInboundChannelAdapter.lambda$consumeMessage$6(RocketMQInboundChannelAdapter.java:163) ~[spring-cloud-starter-stream-rocketmq-2023.0.1.2.jar:2023.0.1.2]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:344) ~[spring-retry-2.0.9.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:233) ~[spring-retry-2.0.9.jar:na]
    at com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQInboundChannelAdapter.consumeMessage(RocketMQInboundChannelAdapter.java:162) ~[spring-cloud-starter-stream-rocketmq-2023.0.1.2.jar:2023.0.1.2]
    at com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQInboundChannelAdapter.lambda$onInit$5(RocketMQInboundChannelAdapter.java:124) ~[spring-cloud-starter-stream-rocketmq-2023.0.1.2.jar:2023.0.1.2]
    at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:411) ~[rocketmq-client-5.1.4.jar:5.1.4]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: java.lang.ClassCastException: class [B cannot be cast to class com.dyrnq.sca.rocketmq.MessageInfo ([B is in module java.base of loader 'bootstrap'; com.dyrnq.sca.rocketmq.MessageInfo is in unnamed module of loader 'app')
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:1063) ~[spring-cloud-function-context-4.1.3.jar:4.1.3]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:761) ~[spring-cloud-function-context-4.1.3.jar:4.1.3]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:592) ~[spring-cloud-function-context-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:92) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:823) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:654) ~[spring-cloud-stream-4.1.3.jar:4.1.3]
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) ~[spring-integration-core-6.3.4.jar:6.3.4]
    ... 26 common frames omitted
dyrnq commented 4 hours ago
      bindings:
        consumerEvent-out-0:
          destination: demo
          content-type: application/json
          group: demo-group

        consumerEvent-in-0:
          destination: demo
          content-type: application/json
          group: demo-group
          consumer:
            concurrency: 3

Limited bycontent-type: application/json, producer must send JSON message.

dyrnq commented 2 hours ago

When running mqadmin updateTopic, you can use -a +message.type=<message type> to specify the message type.

As follows:

bin/mqadmin updatetopic -n {NameSrvAddr} -t {YourTopic} -c {YourCluster} -a +message.type=TRANSACTION

message.type can use TRANSACTION, FIFO, DELAY or NORMAL. If this attribute is not specified, the default is NORMAL.

dyrnq commented 2 hours ago

https://github.com/apache/rocketmq-clients/issues/813