Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.3k stars 1.96k forks source link

[BUG] - Resend the message from Service Bus is not working while consuming the message from Function code ( Error - This body type not is supported: VALUE) #41071

Closed rajam-git closed 3 weeks ago

rajam-git commented 2 months ago

Describe the bug Resend the message from Service Bus is not working while consuming the message from Function code

Exception or Stack Trace [2024-06-21T20:51:09.319Z] 2024-06-21 16:51:09 ERROR ServiceBusReceivedMessage - This body type not is supported: VALUE [2024-06-21T20:51:09.319Z] Before Log [2024-06-21T20:51:09.319Z] Inside ServiceBusReceivedMessage stream loop - Received msg with sequence #: 13. contents: 1099 [2024-06-21T20:51:09.321Z] **Outbound-Consumer-SecondVendor TimerTrigger Completed:** [2024-06-21T20:51:09.321Z] ERROR -> failed to receive messages: This body type not is supported: VALUE[2024-06-21T20:51:09.327Z] Function "Outbound-Consumer-SecondVendor" (Id: e8a192b2-0c83-4099-9b52-36c65e4fd54f) invoked by Java Worker

[2024-06-21T20:51:09.329Z] java.lang.UnsupportedOperationException: This body type not is supported: VALUE [2024-06-21T20:51:09.330Z] at com.azure.messaging.servicebus.ServiceBusReceivedMessage.getBody(ServiceBusReceivedMessage.java:120)[2024-06-21T20:51:09.330Z] at com.function.sub.UscsDispatcher.lambda$receiveMessagesSync$0(UscsDispatcher.java:84) [2024-06-21T20:51:09.331Z] at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) [2024-06-21T20:51:09.331Z] at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
[2024-06-21T20:51:09.332Z] at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762) [2024-06-21T20:51:09.332Z] at com.function.sub.UscsDispatcher.receiveMessagesSync(UscsDispatcher.java:77) [2024-06-21T20:51:09.333Z] at com.function.sub.AzureFnTimerTrigger.outboundConsumerSecondVendor(AzureFnTimerTrigger.java:63) [2024-06-21T20:51:09.334Z] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [2024-06-21T20:51:09.334Z] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
[2024-06-21T20:51:09.335Z] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [2024-06-21T20:51:09.336Z] at java.base/java.lang.reflect.Method.invoke(Method.java:568) [2024-06-21T20:51:09.337Z] at com.microsoft.azure.functions.worker.broker.JavaMethodInvokeInfo.invoke(JavaMethodInvokeInfo.java:22) [2024-06-21T20:51:09.337Z] at com.microsoft.azure.functions.worker.broker.EnhancedJavaMethodExecutorImpl.execute(EnhancedJavaMethodExecutorImpl.java:22) [2024-06-21T20:51:09.338Z] at com.microsoft.azure.functions.worker.chain.FunctionExecutionMiddleware.invoke(FunctionExecutionMiddleware.java:19) [2024-06-21T20:51:09.339Z] at com.microsoft.azure.functions.worker.chain.InvocationChain.doNext(InvocationChain.java:21) [2024-06-21T20:51:09.340Z] at com.microsoft.azure.functions.worker.broker.JavaFunctionBroker.invokeMethod(JavaFunctionBroker.java:125) [2024-06-21T20:51:09.341Z] at com.microsoft.azure.functions.worker.handler.InvocationRequestHandler.execute(InvocationRequestHandler.java:34) [2024-06-21T20:51:09.342Z] at com.microsoft.azure.functions.worker.handler.InvocationRequestHandler.execute(InvocationRequestHandler.java:10) [2024-06-21T20:51:09.342Z] at com.microsoft.azure.functions.worker.handler.MessageHandler.handle(MessageHandler.java:44) [2024-06-21T20:51:09.343Z] at com.microsoft.azure.functions.worker.JavaWorkerClient$StreamingMessagePeer.lambda$onNext$0(JavaWorkerClient.java:94) [2024-06-21T20:51:09.344Z] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [2024-06-21T20:51:09.345Z] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [2024-06-21T20:51:09.345Z] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [2024-06-21T20:51:09.346Z] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [2024-06-21T20:51:09.347Z] at java.base/java.lang.Thread.run(Thread.java:833) [2024-06-21T20:51:09.347Z]

To Reproduce Resend the message from Service Bus to the consumer function code

Code Snippet Refer recent comments to see the sample codes.

Expected behavior Resend the message from Service Bus should work

Screenshots If applicable, add screenshots to help explain your problem.

Setup (please complete the following information):

If you suspect a dependency version mismatch (e.g. you see NoClassDefFoundError, NoSuchMethodError or similar), please check out Troubleshoot dependency version conflict article first. If it doesn't provide solution for the problem, please provide:

Additional context Add any other context about the problem here.

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

anuchandy commented 1 month ago

Hello @rajam-git, the ServiceBusReceivedMessage#getBody() can return only body of type AMQP DATA. If the Sender sends message to Service Bus using any other body type (e.g. AMQP VALUE here), then ServiceBusReceivedMessage#getBody() will fail.

The higher-level SDK across languages supports only DATA bodies. That's not going to change. In your case, the application can obtain the raw value from the message and decode them appropriately, for example -

final AmqpAnnotatedMessage message = message.getRawAmqpMessage();
final AmqpMessageBody rBody = message.getBody();
if (rBody.getBodyType() == AmqpMessageBodyType.VALUE) {
    final Object value = rBody.getValue();
    System.out.println(value); // {"address": {"lane": "123", "city": "redmond"}}
    if (value instanceof String) {
      // Decode the value appropriately. For example, if you're expecting JSON 
      // then use a JSON parser (E.g. Jackson, Gson, Azure-Json) to parse and 
      // deserialize to a POJO.
    }
}
rajam-git commented 1 month ago

@anuchandy

In my code, the initial behavior works correctly. Here's how you can replicate the issue:

My Java Function App puts a message into a Service Bus. The consumer code successfully pulls the messages. Next, I go to the Service Bus and resend the message either from the Dead-Letter Queue (DLQ) or from a different subscriber to the same topic. After doing this, I encounter an error."

anuchandy commented 1 month ago

Hi @rajam-git, in the last step, what exact client are we using to resend the message? Is it Java SDK, Service Bus explorer (https://github.com/paolosalvatori/ServiceBusExplorer), Azure Portal or something else? Can you expand (code/screenshot/steps) on how exactly the resend is initiated?

rajam-git commented 1 month ago

AzurePortal-SeriveBusExplorer

@anuchandy , In Azure portal, I am using Service Bus Explorer screen. (Attached that screenshot)

anuchandy commented 1 month ago

@rajam-git, I'm unable to reproduce this. Here is the setup, code and steps used

final String queueName = "queue0";

final ServiceBusClientBuilder builder = new ServiceBusClientBuilder().connectionString(CONNECTION_STRING);
// STEP_1: Send one message.
//
final ServiceBusSenderClient sender = builder
        .connectionString(CONNECTION_STRING)
        .sender()
        .queueName(queueName)
        .buildClient();

sender.sendMessage(new ServiceBusMessage("Sample-Message"));
TimeUnit.SECONDS.sleep(5);

// STEP_2: Receive the message.
//
System.out.println("Press any key to receive the message.");
waitKeyPress();
System.out.println("Receiving message...");

final ServiceBusReceiverClient receiver1 = builder
        .receiver()
        .queueName(queueName)
        .buildClient();

receiver1.receiveMessages(1).stream().collect(Collectors.toList()).forEach(message -> {
    System.out.println("Received:" + message.getBody());
});
TimeUnit.SECONDS.sleep(2);
receiver1.close();

// STEP_3: Wait for message to go to DLQ.
//
// The queue has max delivery count of 1 and a lock duration of 10 seconds. Wait for 30 seconds for the message to go to DLQ.
TimeUnit.SECONDS.sleep(30);

// STEP_4: Go to portal and Resend the message from DLQ to the queue.
//
System.out.println("Press any key after resending the from DLQ to back to the queue.");
waitKeyPress();
System.out.println("Receiving message again...");

// STEP_4: Now receive the message again.
//
final ServiceBusReceiverClient receiver2 = builder
        .receiver()
        .queueName(queueName)
        .buildClient();

receiver2.receiveMessages(1).stream().collect(Collectors.toList()).forEach(message -> {
    System.out.println(message.getBody());
});

sender.close();
receiver2.close();

private static void waitKeyPress() {
    Scanner input = new Scanner(System.in);
    input.nextLine();
}

After step-1, the queue looks like:

1

After step-3, the DLQ is -

2

In step-4, resend from DLQ to queue

3

It doesn't matter where the application is hosted (function app, VM, local etc..) as long as the SDKs are used. I’m not sure if it's something specific to your message format / content type etc... Can you try to create a minimal repro (a simple console app) using one sample message adhering to your use case (along with any associated message properties)?

rajam-git commented 1 month ago

@anuchandy , To replicate this issue, you have to publish the JSON string format into topic. Also, I am using pub/sub model not the queue. I have modified your Java program. Please use this for your testing to replicate this issue.

Code `package com.function.testsupport;

import java.util.Scanner; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import com.azure.messaging.servicebus.ServiceBusClientBuilder; import com.azure.messaging.servicebus.ServiceBusMessage; import com.azure.messaging.servicebus.ServiceBusReceiverClient; import com.azure.messaging.servicebus.ServiceBusSenderClient; import com.azure.messaging.servicebus.models.DeadLetterOptions; import com.function.model.ServiceBusMsgOptions;

public class AzureResentIssueReplication {

public static void main(String[] args) throws InterruptedException {

    String CONNECTION_STRING = "CONNECTION_STRING";
    String TOPIC_NAME = "TOPIC_NAME";
    String SUBSCRIBER_NAME = "SUBSCRIBER_NAME";

    final ServiceBusClientBuilder builder =
            new ServiceBusClientBuilder().connectionString(CONNECTION_STRING);
    // STEP_1: Send one message.
    //
    final ServiceBusSenderClient sender = builder.connectionString(CONNECTION_STRING).sender().queueName(TOPIC_NAME).buildClient();

    String jsonRequest = "{\"uri\":\"null\",\"queryParams\":{\"msgType\":\"AzureServiceBus\",\"warehouseId\":\"0000000\"},\"body\":\"{\\r\\n" + //
                    "    \\\"messageHeader\\\": {\\r\\n" + //
                    "        \\\"msgId\\\": 14317,\\r\\n" + //
                    "        \\\"msgTime\\\": \\\"2023-04-18T14:11:31.5515919-04:00\\\",\\r\\n" + //
                    "        \\\"msgType\\\": \\\"AzureServiceBus\\\",\\r\\n" + //
                    "        \\\"sender\\\": \\\"Raja\\\",\\r\\n" + //
                    "        \\\"receiver\\\": \\\"Microsoft\\\",\\r\\n" + //
                    "        \\\"version\\\": \\\"1.0\\\",\\r\\n" + //
                    "    }\\r\\n" + //
                    "}\",\"headers\":{\"accept\":\"*/*\",\"connection\":\"keep-alive\",\"host\":\"localhost:7071\",\"user-agent\":\"PostmanRuntime/7.40.0\",\"accept-encoding\":\"gzip, deflate, br\",\"content-type\":\"application/json\",\"content-length\":\"502\",\"msgid\":\"14317\",\"postman-token\":\"eaba8efc-d0ff-43df-a70d-7cc96f56d115\"},\"destination\":\"Microsoft\",\"destinationEndPointUrl\":null,\"responseMsgTopicName\":null,\"publishMsgTopicName\":null,\"blobContainerEndpointDetails\":null,\"blobNamePrefixIncludeFlag\":null,\"serviceBusSubjectValue\":null}";

    ServiceBusMessage serviceBusMessage = new ServiceBusMessage(jsonRequest).setMessageId("14317")
    .setSubject("AzureServiceBus").setCorrelationId("14317")
    .setSessionId("serialize-me").setTo("Microsoft");

    // how to add custom properties to the message:
    serviceBusMessage.getApplicationProperties().put("property", "custom");

    sender.sendMessage(serviceBusMessage);
    TimeUnit.SECONDS.sleep(5);

    // STEP_2: Receive the message.
    //
    System.out.println("Press any key to receive the message.");
    waitKeyPress();
    System.out.println("Receiving message...");

    final ServiceBusReceiverClient receiver1 =
            builder.receiver().topicName(TOPIC_NAME).subscriptionName(SUBSCRIBER_NAME).buildClient();

    receiver1.receiveMessages(1).stream().collect(Collectors.toList()).forEach(message -> {
        System.out.println("Received:" + message.getBody());

        DeadLetterOptions deadLetterOptions = new DeadLetterOptions();
        deadLetterOptions.setDeadLetterReason("TESTREASON");
        deadLetterOptions.setDeadLetterErrorDescription("TESTREASONDESC");

        ServiceBusMsgOptions serviceBusMsgOptions = new ServiceBusMsgOptions();
        serviceBusMsgOptions.setDeadLetterOptions(deadLetterOptions);
        serviceBusMsgOptions.setReadyForDeadLetter(true);

        receiver1.deadLetter(message, serviceBusMsgOptions.getDeadLetterOptions());

        System.out.println("Sent to DLQ");

    });
    TimeUnit.SECONDS.sleep(2);
    receiver1.close();

    // STEP_3: Wait for message to go to DLQ.
    //
    // The queue has max delivery count of 1 and a lock duration of 10 seconds. Wait for 30
    // seconds for the message to go to DLQ.
    TimeUnit.SECONDS.sleep(30);

    // STEP_4: Go to portal and Resend the message from DLQ to the queue.
    //
    System.out.println("Press any key after resending the from DLQ to back to the queue.");
    waitKeyPress();
    System.out.println("Receiving message again...");

    // STEP_4: Now receive the message again.
    //
    final ServiceBusReceiverClient receiver2 =
            builder.receiver().topicName(TOPIC_NAME)
            .subscriptionName(SUBSCRIBER_NAME).buildClient();

    receiver2.receiveMessages(1).stream().collect(Collectors.toList()).forEach(message -> {
        System.out.println("Recevied it from DLQ" + message.getBody());
        //receiver2.complete(message);
    });

    sender.close();

    receiver2.close();
}

private static void waitKeyPress() {
    Scanner input = new Scanner(System.in);
    input.nextLine();
}

} `

Log Press any key after resending the from DLQ to back to the queue.

Receiving message again... 2024-07-25 13:34:21 ERROR ServiceBusReceivedMessage - This body type not is supported: VALUE Exception in thread "main" java.lang.UnsupportedOperationException: This body type not is supported: VALUE at com.azure.messaging.servicebus.ServiceBusReceivedMessage.getBody(ServiceBusReceivedMessage.java:120) at com.function.testsupport.AzureResentIssueReplication.lambda$1(AzureResentIssueReplication.java:96) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) at com.function.testsupport.AzureResentIssueReplication.main(AzureResentIssueReplication.java:95)

anuchandy commented 1 month ago

Hello @rajam-git, thanks for the response. Unfortunately, I'm still unable to reproduce the reported behavior using the code you shared above.

To make it easier, just created an executable standalone project with above code and pushed it here https://github.com/anuchandy/repro-41071. The project is designed to create a topic and subscription first, followed by running through your above send-receive-DLQ scenario. Can you clone & execute the project to see if the behavior is reproducible? If it doesn't, could you adjust the code to better fit your specific use case? You can push the modified executable project to your git-repro and post link here to continue our discussion.

rajam-git commented 1 month ago

@anuchandy , Thanks. I am getting 404 error while accessing this page (https://github.com/anuchandy/repro-41071).

image

anuchandy commented 1 month ago

@rajam-git, ah.. repo was private by default, just changed it to public.

rajam-git commented 1 month ago

@anuchandy It seems the issue is not related to whether we are storing a simple String or JSON. Your project code setup is working for me without any issues (Refer Screenshot#1). In your sample project, you are creating the topic and subscription on the fly, which is also working for me.

However, I am using an existing topic and subscription setup that my team created through the Azure Cloud Service Bus portal. I don't creates the topic and sub on the fly. Could you please refer to the RajaApp.java file ( I just created the PR) and map your details below? private static final String connectionString = "MyConnectionString"; private static final String TOPIC_NAME = "MyTopicName"; private static final String SUBSCRIBER_NAME = "MySubscriberName";

After that run RajaApp.java to replicate this issue. I am able to replicate it.(Refer Screenshot#2). If you are unable to replicate it then I am guessing something to do with my Azure ServiceBus env.

Screenshot#1 image

Screenshot#2 image

anuchandy commented 1 month ago

Hi @rajam-git, before creating the topic-subscription on the fly, I did try with pre-created (via azure portal).

I agree with you, this is likely some configuration local to the existing topic-subscription you have. I don't think it's some namespace level configuration given this issue is not happening if you create a new topic-subscription.

In azure portal, could you cross verify the properties of the impacted (existing) topic-subscription and the brand new one created on fly? If find some discrepancy, could you discuss with the team who created that topic-subscription and see if we can resolve it by aligning the properties?

If you can screenshot and share all the properties of the impacted topic-subscription I can also check and post any findings. It will help anyone come across similar issue.

rajam-git commented 1 month ago

@anuchandy , it's seems this is the root cause. Please update the TopicProperties like below. After that you can replicate the same issues.

// Retrieve the topic properties TopicProperties topicProperties = adminClient.getTopic(topicName); // Update the max message size topicProperties.setMaxMessageSizeInKilobytes(102400); // Set the desired max size in Kilobytes // Apply the update adminClient.updateTopic(topicProperties);

PFB updated source code. `package com.repro41071;

import com.azure.messaging.servicebus.ServiceBusClientBuilder; import com.azure.messaging.servicebus.ServiceBusMessage; import com.azure.messaging.servicebus.ServiceBusReceivedMessage; import com.azure.messaging.servicebus.ServiceBusReceiverClient; import com.azure.messaging.servicebus.ServiceBusSenderClient; import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient; import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder; import com.azure.messaging.servicebus.administration.models.TopicProperties; import com.azure.messaging.servicebus.models.DeadLetterOptions;

import java.time.Instant; import java.util.Scanner; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors;

public class App { private static final String connectionString = System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING");

public static void main( String[] args ) throws InterruptedException {
    final TopicSubscription resource = createTopicAndSubscription();
    final ServiceBusMessage messageToSend = createMessage();

    final ServiceBusClientBuilder builder =
            new ServiceBusClientBuilder().connectionString(connectionString);

    // STEP_1: Send the message.
    //
    System.out.println("Sending a message.");
    final ServiceBusSenderClient topicSender = builder.connectionString(connectionString)
            .sender()
            .topicName(resource.getTopicName())
            .buildClient();

    topicSender.sendMessage(messageToSend);
    TimeUnit.SECONDS.sleep(5);

    // STEP_2: Receive the message.
    //
    System.out.println("Press any key to receive the message and send it to DLQ.");
    waitKeyPress();
    System.out.println("Receiving message...");

    final ServiceBusReceiverClient receiver1 =
            builder.receiver()
                    .topicName(resource.getTopicName())
                    .subscriptionName(resource.getSubscriptionName())
                    .buildClient();

    receiver1.receiveMessages(1).stream().collect(Collectors.toList()).forEach(message -> {
        System.out.println("Message received:" + message.getBody());
        deadLetter(receiver1, message);
        System.out.println("Sent to DLQ");
    });
    TimeUnit.SECONDS.sleep(5);
    receiver1.close();

    // STEP_3: Go to portal and Resend the message from DLQ to the queue.
    //
    System.out.println("Press any key after resending the from DLQ to back to the topic.");
    waitKeyPress();

    // STEP_4: Receive the message again.
    //
    System.out.println("Receiving message again...");

    final ServiceBusReceiverClient receiver2 = builder.receiver()
            .topicName(resource.getTopicName())
            .subscriptionName(resource.getSubscriptionName())
            .buildClient();

    receiver2.receiveMessages(1).stream().collect(Collectors.toList()).forEach(message -> {
        System.out.println("Recevied it from DLQ" + message.getBody());
    });

    topicSender.close();
    receiver2.close();
}

private static TopicSubscription createTopicAndSubscription() {
    final ServiceBusAdministrationClient adminClient = new ServiceBusAdministrationClientBuilder()
            .connectionString(connectionString)
            .buildClient();

    final long millis = Instant.now().toEpochMilli();
    final String topicName = "topic-" + millis;
    final String subscriptionName = "sub0";

    adminClient.createTopic(topicName);
    // Retrieve the topic properties
    TopicProperties topicProperties = adminClient.getTopic(topicName);
    // Update the max message size
    topicProperties.setMaxMessageSizeInKilobytes(102400); // Set the desired max size in Kilobytes
    // Apply the update
    adminClient.updateTopic(topicProperties);

    adminClient.createSubscription(topicName, subscriptionName);
    System.out.println("Created topic: " + topicName + " and subscription: " + subscriptionName);
    return new TopicSubscription(topicName, subscriptionName);
}

private static ServiceBusMessage createMessage() {
    final String jsonRequest = "{\"uri\":\"null\",\"queryParams\":{\"msgType\":\"AzureServiceBus\",\"warehouseId\":\"0000000\"},\"body\":\"{\\r\\n" + //
            "    \\\"messageHeader\\\": {\\r\\n" + //
            "        \\\"msgId\\\": 14317,\\r\\n" + //
            "        \\\"msgTime\\\": \\\"2023-04-18T14:11:31.5515919-04:00\\\",\\r\\n" + //
            "        \\\"msgType\\\": \\\"AzureServiceBus\\\",\\r\\n" + //
            "        \\\"sender\\\": \\\"Raja\\\",\\r\\n" + //
            "        \\\"receiver\\\": \\\"Microsoft\\\",\\r\\n" + //
            "        \\\"version\\\": \\\"1.0\\\",\\r\\n" + //
            "    }\\r\\n" + //
            "}\",\"headers\":{\"accept\":\"*/*\",\"connection\":\"keep-alive\",\"host\":\"localhost:7071\",\"user-agent\":\"PostmanRuntime/7.40.0\",\"accept-encoding\":\"gzip, deflate, br\",\"content-type\":\"application/json\",\"content-length\":\"502\",\"msgid\":\"14317\",\"postman-token\":\"eaba8efc-d0ff-43df-a70d-7cc96f56d115\"},\"destination\":\"Microsoft\",\"destinationEndPointUrl\":null,\"responseMsgTopicName\":null,\"publishMsgTopicName\":null,\"blobContainerEndpointDetails\":null,\"blobNamePrefixIncludeFlag\":null,\"serviceBusSubjectValue\":null}";

    final ServiceBusMessage serviceBusMessage = new ServiceBusMessage(jsonRequest)
            .setMessageId("14317")
            .setContentType("application/json")
            .setSubject("AzureServiceBus")
            .setCorrelationId("14317")
            .setSessionId("serialize-me")
            .setTo("Microsoft");

    serviceBusMessage.getApplicationProperties().put("property", "custom");
    return serviceBusMessage;
}

private static void deadLetter(ServiceBusReceiverClient receiver, ServiceBusReceivedMessage message) {
    final DeadLetterOptions deadLetterOptions = new DeadLetterOptions();
    deadLetterOptions.setDeadLetterReason("TESTREASON");
    deadLetterOptions.setDeadLetterErrorDescription("TESTREASONDESC");
    receiver.deadLetter(message, deadLetterOptions);
}

private static void waitKeyPress() {
    Scanner input = new Scanner(System.in);
    input.nextLine();
}

private static class TopicSubscription {
    private final String topicName;
    private final String subscriptionName;

    public TopicSubscription(String topicName, String subscriptionName) {
        this.topicName = topicName;
        this.subscriptionName = subscriptionName;
    }

    public String getTopicName() {
        return topicName;
    }

    public String getSubscriptionName() {
        return subscriptionName;
    }
}

} `

anuchandy commented 1 month ago

Hi @rajam-git, interesting... are you able to change the message size? I've applied your suggested updates to our repro code to update the message size, but it resulted in a bad request error.

If I understand correctly, the service doesn't permit changes to the maximum message size for a topic after it's been created.

Here is our repro project updated with your suggested change https://github.com/anuchandy/repro-41071 Could you run, verify it & modify it to repro.

Between, I was able to set the max message size at the time of topic creation with the size you were trying to use. The code just ran without any issue.

anuchandy commented 1 month ago

Hello @rajam-git, digging a bit more, I was able to reproduce this using premium namespace. Earlier, I was trying with standard which doesn't allow changing the max message size.

github-actions[bot] commented 1 month ago

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @EldertGrootenboer.

anuchandy commented 1 month ago

Hi @EldertGrootenboer, this need your help and service attention, here is the steps to reproduce

  1. Create a topic and subscription in premium namespace. By default, the created topic will have a default maximum message size of 1024KB.
  2. Update topic max message size to the maximum the premium tier supports i.e., 100MB.
  3. Send a message A to the topic (by default will be sent as AMQP body type DATA).
  4. Receive message A from the topic subscription, Service Bus will return message as expected body type DATA, and now dead letter the message.
  5. From Azure portal Service Bus explorer, peek the message A from Dead letter queue, select that message and resend it to the topic.
  6. Receive message A from the topic subscription again, this time Service Bus will return message in an unexpected body type VALUE.

This seems like an issue either in Service Bus explorer or broker. Can you please help to take a look?

Between if we don’t update the max message size (i.e., step-2) then this issue won’t happen.

EldertGrootenboer commented 1 month ago

In Service Bus Explorer, when an entity supports large messages, we don't load the message body unless specifically loaded by customer.

image

If you resend message without first loading the body, the body type will change as there is no body. Hence, make sure to first load the full message body in Service Bus Explorer before resending the message.

rajam-git commented 1 month ago

That is not the root cause. I always select "Load complete message body" before I resend it. If you don't load the complete message body, you will encounter a different error when you run our sample.

Attached are the log details for your reference.

I also walk through before with your support team.

Receiving message again... Exception in thread "main" java.lang.RuntimeException: The receiver client is terminated. Re-create the client to continue receive attempt. (Reason: upstream-error) at com.azure.core.amqp.implementation.WindowedSubscriber.hookOnError(WindowedSubscriber.java:189) at reactor.core.publisher.BaseSubscriber.onError(BaseSubscriber.java:180) at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222) at com.azure.messaging.servicebus.TracingFluxOperator.hookOnError(TracingFluxOperator.java:74) at reactor.core.publisher.BaseSubscriber.onError(BaseSubscriber.java:180) at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) at com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drainLoop(MessageFlux.java:476) at com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drain(MessageFlux.java:405) at com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:878) at com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:722) at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440) at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.lang.NullPointerException: 'value' cannot be null. at java.base/java.util.Objects.requireNonNull(Unknown Source) at com.azure.core.amqp.models.AmqpMessageBody.fromValue(AmqpMessageBody.java:126) at com.azure.messaging.servicebus.ServiceBusMessageSerializer.deserializeMessage(ServiceBusMessageSerializer.java:413) at com.azure.messaging.servicebus.ServiceBusMessageSerializer.deserialize(ServiceBusMessageSerializer.java:279) at com.azure.messaging.servicebus.ServiceBusAsyncConsumer.lambda$new$1(ServiceBusAsyncConsumer.java:54) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106) ... 13 more

Receiving message again... Exception in thread "main" java.lang.UnsupportedOperationException: This body type not is supported: VALUE
at com.azure.messaging.servicebus.ServiceBusReceivedMessage.getBody(ServiceBusReceivedMessage.java:120) at com.repro41071.App.lambda$1(App.java:74) at java.base/java.util.ArrayList.forEach(Unknown Source) at com.repro41071.App.main(App.java:73)

EldertGrootenboer commented 1 month ago

Thank you for the additional feedback, we will investigate further and come back on this.

EldertGrootenboer commented 1 month ago

@rajam-git Can you share the ticket ID you opened previously with our support team?

rajam-git commented 1 month ago

2406060040005460


From: Eldert Grootenboer @.> Sent: Friday, August 2, 2024 3:26 PM To: Azure/azure-sdk-for-java @.> Cc: Rajaselvam Marimuthu @.>; Mention @.> Subject: [EXTERNAL] Re: [Azure/azure-sdk-for-java] [BUG] - Resend the message from Service Bus is not working while consuming the message from Function code ( Error - This body type not is supported: VALUE) (Issue #41071)

@ rajam-git Can you share the ticket ID you opened previously with our support team? — Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you were mentioned. Message ID: Azure/azure-sdk-for-java/issues/41071/2266013666@ github. com

@rajam-githttps://urldefense.com/v3/__https://github.com/rajam-git__;!!Bx2-fUZJmOwbkYqyQ_Dpyg!JJQLtV7Rg4n32s84iFTe4UO-2FQ1z6jkugWLmn-aB7WIdfaaxHGoDhE2MT-Zj1MblZet0J1iTYMw9z7VxiEdHF-5Hw$ Can you share the ticket ID you opened previously with our support team?

— Reply to this email directly, view it on GitHubhttps://urldefense.com/v3/__https://github.com/Azure/azure-sdk-for-java/issues/41071*issuecomment-2266013666__;Iw!!Bx2-fUZJmOwbkYqyQ_Dpyg!JJQLtV7Rg4n32s84iFTe4UO-2FQ1z6jkugWLmn-aB7WIdfaaxHGoDhE2MT-Zj1MblZet0J1iTYMw9z7VxiHIP67JTA$, or unsubscribehttps://urldefense.com/v3/__https://github.com/notifications/unsubscribe-auth/AIBT2RRHWMVOGIRXV4FEFLLZPPMN7AVCNFSM6AAAAABKTSVD7GVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDENRWGAYTGNRWGY__;!!Bx2-fUZJmOwbkYqyQ_Dpyg!JJQLtV7Rg4n32s84iFTe4UO-2FQ1z6jkugWLmn-aB7WIdfaaxHGoDhE2MT-Zj1MblZet0J1iTYMw9z7VxiH00ZJPlA$. You are receiving this because you were mentioned.Message ID: @.***>

EldertGrootenboer commented 3 weeks ago

We have worked together on this offline and identified and fixed the issue. The issue here was related to bodyType. When you have LMS enabled entity, we first omit message body and load message’s meta data only. Now in this case message body is not returned (undefined) and bodyType is returned as “value”. When user clicks on “Load complete message body” we load the message body keeping all the other meta data from previous operation. Due to this bodyType was still being kept as “value”. Fix here was to replace bodyType along with body so that correct bodyType (“data”) is set.