Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.3k stars 1.96k forks source link

[BUG] "This method can be called if AMQP Message body type is 'VALUE'. The actual type is [DATA]" error is logged. #41636

Open Vladcorjuc opened 2 weeks ago

Vladcorjuc commented 2 weeks ago

Describe the bug An error is logged when sending a message to a topic when message is ServiceBusMessage of string. The error logged is "This method can be called if AMQP Message body type is 'VALUE'. The actual type is [DATA]." The message seems to be sent even if the error is logged.

Exception or Stack Trace Stack-Trace available in debug

java.lang.IllegalArgumentException: This method can be called if AMQP Message body type is 'VALUE'. The actual type is [DATA].
    at com.azure.core.amqp.models.AmqpMessageBody.getValue(AmqpMessageBody.java:346)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:688)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:183)
    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:732)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:183)
    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:732)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:183)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:502)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:341)
    at com.fasterxml.jackson.databind.ObjectMapper._writeValueAndClose(ObjectMapper.java:4799)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:4065)
    at org.springframework.cloud.function.json.JacksonMapper.toJson(JacksonMapper.java:75)
    at org.springframework.cloud.function.context.config.JsonMessageConverter.convertToInternal(JsonMessageConverter.java:131)
    at org.springframework.cloud.stream.converter.CompositeMessageConverterFactory$2.convertToInternal(CompositeMessageConverterFactory.java:117)
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:201)
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:189)
    at org.springframework.cloud.function.context.config.SmartCompositeMessageConverter.toMessage(SmartCompositeMessageConverter.java:158)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputMessageIfNecessary(SimpleFunctionRegistry.java:1451)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputIfNecessary(SimpleFunctionRegistry.java:1263)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.lambda$convertOutputPublisherIfNecessary$27(SimpleFunctionRegistry.java:1525)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at reactor.core.publisher.SinkManyUnicast.drainRegular(SinkManyUnicast.java:284)
    at reactor.core.publisher.SinkManyUnicast.drain(SinkManyUnicast.java:365)
    at reactor.core.publisher.SinkManyUnicast.tryEmitNext(SinkManyUnicast.java:239)
    at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
    at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)

To Reproduce Utilize the code snippet below to send a message to a topic.

Code Snippet

@Bean
  public Supplier<Flux<Message<ServiceBusMessage>>> publishTopicMessage() {
    return () -> publisher.asFlux()
        .doOnError(t -> log.error("An error occurred while sending a message. Cause: %s.".formatted(t.getLocalizedMessage())));
  }

  public synchronized void publishMessage(final String message, final String messageType) {
    var tenantId = "id";
    var principalId = "id";

    var azureMessage = new ServiceBusMessage(message);
    azureMessage.setCorrelationId(MDC.get(MDC_CORRELATION_ID_KEY));
    azureMessage.getApplicationProperties().put(EVENT_HEADER, messageType);
    azureMessage.getApplicationProperties().put(TENANT_ID_HEADER, tenantId);
    azureMessage.getApplicationProperties().put(PRINCIPAL_ID_HEADER, principalId);

    publisher.emitNext(
        MessageBuilder.withPayload(azureMessage).build(),
        Sinks.EmitFailureHandler.FAIL_FAST
    );
  }

Expected behavior No error should be logged if the message was sent with success.

Screenshots If applicable, add screenshots to help explain your problem.

Setup (please complete the following information):

Additional context It seems the error is logged two times.

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

github-actions[bot] commented 2 weeks ago

@anuchandy @conniey @lmolkova

github-actions[bot] commented 2 weeks ago

Thank you for your feedback. Tagging and routing to the team member best able to assist.

anuchandy commented 2 weeks ago

Hi @Netyyy, could you / spring-team please help take a look? It seems the Spring layer is sending message with default encoding (DATA), but then later it is attempting to access the message content as VALUE, the getValue API should be invoked only if the message was encoded as VALUE