smallrye / smallrye-reactive-messaging

SmallRye Reactive Messaging
http://www.smallrye.io/smallrye-reactive-messaging/
Apache License 2.0
231 stars 174 forks source link

Error Receiving Message from STOMP #336

Open garrettahines1 opened 4 years ago

garrettahines1 commented 4 years ago

Describe the bug I am using smallrye-amqp to listen to messages over artemis mq. From a node js service I am sending a text message using the STOMP protocol. Get the following error when receiving the message in my java application. I am trying to receive the message as a String which fails but if I receive it as a byte[] it works.

ERROR [com.ServiceRequestResultListener#receiveRequest] [{}] (vert.x-eventloop-thread-0) The method com.ServiceRequestResultListener#receiveRequest has thrown an exception: java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap')
        at com.ServiceRequestResultListener_SmallryeMessagingInvoker_receiveRequest_f873ae93a96fc221154ff1ca86863df8150ec1c7.invoke(ServiceRequestResultListener_SmallryeMessagingInvoker_receiveRequest_f873ae93a96fc221154ff1ca86863df8150ec1c7.zig:46)
        at io.smallrye.reactive.messaging.AbstractMediator.invoke(AbstractMediator.java:61)
        at io.smallrye.reactive.messaging.SubscriberMediator.lambda$processMethodReturningVoid$0(SubscriberMediator.java:128)
        at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:63)
        at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
        at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
        at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
        at io.reactivex.internal.util.HalfSerializer.onNext(HalfSerializer.java:45)
        at io.reactivex.internal.subscribers.StrictSubscriber.onNext(StrictSubscriber.java:97)
        at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
        at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
        at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
        at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drainLoop(FlowableFlatMap.java:501)
        at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drain(FlowableFlatMap.java:366)
        at io.reactivex.internal.operators.flowable.FlowableFlatMap$InnerSubscriber.onNext(FlowableFlatMap.java:665)
        at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
        at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
        at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
        at io.reactivex.internal.subscriptions.DeferredScalarSubscription.complete(DeferredScalarSubscription.java:117)
        at io.reactivex.processors.AsyncProcessor.subscribeActual(AsyncProcessor.java:242)
        at io.reactivex.Flowable.subscribe(Flowable.java:14918)
        at io.reactivex.Flowable.subscribe(Flowable.java:14865)
        at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:163)
        at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
        at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
        at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
        at io.reactivex.internal.util.HalfSerializer.onNext(HalfSerializer.java:45)
        at io.reactivex.internal.subscribers.StrictSubscriber.onNext(StrictSubscriber.java:97)

Here is my method:

@Incoming("request")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void receiveRequest(final String requestJson) {
    System.out.println(requestJson);
}

To Reproduce Steps to reproduce the behavior:

  1. Start up java service using smallrye-amqp
  2. Send text message to artemismq broker using STOMP protocol

Environment (please complete the following information):

cescoffier commented 4 years ago

Can you paste me the application.properties. I'm trying to reproduce but so far I am not able to receive STOMP messages.

garrettahines1 commented 4 years ago

amqp

amqp-username=admin amqp-password=admin

Incoming comm request notification

mp.messaging.incoming.channel-name.connector=smallrye-amqp mp.messaging.incoming.channel-name.address=address-name mp.messaging.incoming.channel-name.durable=true

cescoffier commented 4 years ago

So, the connector is still connecting to the port 5672 right?

If I send STOM messages on port 61613, on the destination "/q", I should receive them on the address /q, or is there anything fancy to do on the artemis side?

cescoffier commented 4 years ago

Ok, got it to "work" somehow....

At least I receive stuff...

cescoffier commented 4 years ago

So the issue is that the STOMP message body is passed to AMQP as binary content. It does not include content-type or content-encoding that would give a hint about the type of content.

You can use the following as work-around:

    @Incoming("stomp")
    public void consume(byte[] payload) throws UnsupportedEncodingException {
      String s = new String(payload, "UTF-8");
      System.out.println(">> " + s);
    }
garrettahines1 commented 4 years ago

I am setting 'amq-msg-type': 'text' in the header of the STOMP message, is there something else I should be setting?

cescoffier commented 4 years ago

it is something "standard", if so, we can check for this property.

garrettahines1 commented 4 years ago

I've found others that use it like here. https://stackoverflow.com/questions/31559039/activemq-php-stomp-use-textmessage-instead-of-bytesmessage But don't see anything in the official STOMP documentation, so not sure if its a standard. The byte[] should suffice.

cescoffier commented 4 years ago

STOMP spec is very minimal, so that does not surprise me, but we can add this heuristic.

If an AMQP message has the amq-msg-type: text property set (BTW, which group of property are you using?) we can safely read the body as a String even if it's a binary value.

cescoffier commented 4 years ago

BTW, there is another proposal on SO proposing to use the content-type, it might be better. WDYT?

garrettahines1 commented 4 years ago

content-type works fine.

cescoffier commented 4 years ago

Ok, so let's go with the content-type header.

garrettahines1 commented 4 years ago

This is actually more important than I thought. Using byte array works fine STOMP->AMQP but we get an error that string cannot be cast to byte array when we do AMQP->AMQP. Ideally we would not care which protocol sends the message.

cescoffier commented 4 years ago

do you have the stack trace or a reproducer?

tsfullman commented 4 years ago

I'm getting this same error. Doesn't occur with activeMQ but after upgrading activeMQ to artemis need to change the incoming stream to bytes for some reason for messages coming from stomp.