Closed ghost closed 4 years ago
Thanks for reporting this @gio-frigione and apologies for not responding sooner.
Have you run a comparison and seen any difference between subscriptions for partitioned and un-partitioned topics?
Hi @ramya-rao-a
thank you for getting back. I've performed some tests and I'm not sure anymore about the fact the slowness has something to do with partitioned entities. What I measured though, is that the azure sdk is much slower than the old (an now deprecated) amqp10 library.
What I did was setting up a topic (I tried both partitioned and non-partitioned topic without measuring big differences) with a subscription and I then sent 1000 messages to the topic. After that I connected a receiver implemented with the azure sdk to the subscription and it took 43190.691ms to read all the messages. I then repeated the same test with a receiver implemented using amqp10 and it took 969.710ms to read all the messages.
Here is the code I used:
// sender.js
const { Namespace, delay } = require("@azure/service-bus");
const connectionString = "YOUR_CONNECTION_STRING";
async function main() {
const ns = Namespace.createFromConnectionString(connectionString);
const topicClient = ns.createTopicClient("performance-test-topic");
try {
const sender = topicClient.getSender();
for (let index = 1; index <= 1000; index++) {
const message = {
body: `${index}`,
label: "test"
};
await sender.send(message);
}
console.log("done");
await sender.close();
await topicClient.close();
} finally {
await ns.close();
}
}
main().catch(err => {
console.log("Error occurred: ", err);
});
// Azure SDK receiver
const { Namespace, delay } = require("@azure/service-bus");
const connectionString = "YOUR_CONNECTION_STRING";
const topic = "performance-test-topic";
const subscription = "performance-test";
const sleep = (waitTimeInMs) => new Promise(resolve => setTimeout(resolve, waitTimeInMs));
async function main() {
let messageCounter = 0;
const ns = Namespace.createFromConnectionString(connectionString);
const subscriptionClient = ns.createSubscriptionClient(
topic,
subscription
);
const receiver = subscriptionClient.getReceiver();
const onMessageHandler = async brokeredMessage => {
messageCounter++
console.log(`Received message: ${brokeredMessage.body}`);
if(messageCounter === 1000) {
console.timeEnd("receive");
}
};
const onErrorHandler = err => {
console.log("Error occurred: ", err);
};
try {
console.time("receive");
receiver.receive(onMessageHandler, onErrorHandler, { autoComplete: true });
await sleep(500000);
await receiver.close();
await subscriptionClient.close();
} finally {
await ns.close();
}
}
main().catch(err => {
console.log("Error occurred: ", err);
});
// AMQP10 receiver
const azure = require("azure");
const amqp10 = require("amqp10");
const AMQPClient = amqp10.Client;
const Policy = amqp10.Policy;
const amqpUri= "YOU AMQP URI";
const topic = "performance-test-topic";
const subscription = "performance-test";
const sleep = (waitTimeInMs) => new Promise(resolve => setTimeout(resolve, waitTimeInMs));
async function main() {
let client;
try {
client = new AMQPClient(Policy.ServiceBusTopic);
await client.connect(amqpUri);
const receiver = await client.createReceiver(`${topic}/Subscriptions/${subscription}`);
console.time("receive");
let messageCounter = 0;
receiver.on("message", brokeredMessage => {
messageCounter++;
const body = brokeredMessage.body;
console.log("Received message: ", body);
if(messageCounter === 1000) {
console.timeEnd("receive");
}
});
receiver.on("errorReceived", err => {
console.log("Error occurred: ", err);
});
await sleep(500000);
}
finally {
console.log("Disconnecting...");
await client.disconnect();
}
}
main().catch(err => {
console.log("Error occurred: ", err);
});
Thanks for the details @gio-frigione, that was very helpful.
Can you try passing say "maxConcurrentCalls": 100
or higher values in the 3rd parameter for the receive
call?
By default, the value for maxConcurrentCalls
is 1 and so, the receiver calls the message handler for 1 message at a time. Only after the handler has been executed for 1 message, we request the next message.
Ignore my previous comment. The maxConcurrentCalls
option has not been exposed in preview 1, so you will not be able to use it.
We will dig deeper and see what we can do to improve the performance here.
@ramya-rao-a I've noticed it wasn't present in the preview-1 so I cloned the sdk and npm-linked the sdk to my test application. Setting maxConcurrentCalls to 100 seemed to solve the problem in the test application but not on our e2e test suite. This is very weird, if I revert to the old amqp10 implementation it works as expected... Probably I need to incrementally complicate the test code and see if I can spot the problem. So far, I'm clueless, since the test application I provided here is exactly how we use the sdk inside our e2e test suite.
Hi @ramya-rao-a I performed additional testing and I noticed that in our e2e test suite we use receiveMode = 2 (receiveAndDelete) instead of the default value for the streaming receiver, so I set this value in the test code I shared above and now receiving the same amount of messages is almost ten times slower. This may be the culprit, is this expected behavior?
relevant code change in receiver.js
const receiverOptions = {
receiveMode: 2
};
const receiver = subscriptionClient.getReceiver(receiverOptions);
I'm using the last version of the sdk pulled from master. I'm also setting maxConcurrentCalls to 100.
That is an interesting find about the receiveMode @gio-frigione.
We have recently made another fix regarding the ReceiveAndDelete mode, so can you get the latest from our master branch and try again to see if there is any difference?
In your comment from https://github.com/Azure/azure-sdk-for-js/issues/1389#issuecomment-473930151, you mention the below
Setting maxConcurrentCalls to 100 seemed to solve the problem in the test application but not on our e2e test suite
Does this mean that you no longer saw significant difference in the time taken in the test application vs amqp10?
Hi @ramya-rao-a unfortunately the fix regarding ReceiveAndDelete mode doesn't seem to help.
Does this mean that you no longer saw significant difference in the time taken in the test application vs amqp10?
I'll briefly recap the current status of the tests (based on the sample code I provided above):
receiveMode: peekLock and maxConcurrentCalls: 100 -> all messages are received in around 1 second, same as amqp10
receiveMode: receiveAndDelete and maxConcurrentCalls: 100 -> all messages are received in around 8 seconds.
I perceive this as strange but maybe this is the expected behaviour.
Thank you for taking the time to investigate this problem!
Thanks for the details @gio-frigione We are currently in the phase of stress testing the service bus library followed by some performance tests. When we get to the performance tests, we will keep in mind to test your scenario and dig deeper into why ReceiveAndDelete mode takes longer than PeekLock mode.
Thank you @ramya-rao-a that's great to know! Let me know if I can contribute in any way!
@gio-frigione I have updated the issue title to reflect the problem in ReceiveAndDelete mode rather than partitioned entities. We will investigate this further soon
@gio-frigione A follow up question for you.
When you ran your experiments that you described in https://github.com/Azure/azure-sdk-for-js/issues/1389#issuecomment-476783383, did you set autoComplete
to true
or false
when running in PeekLock mode?
I could observe some differences with a sample code similar to the one posted in the above comments.
Time taken to receive the 1000 messages is as follows.
- When the `maxConcurrentCalls` is not set - receive: 78031.349ms - for peekLock (autoComplete: true) - Range 70-80s - receive: 25065.772ms - for peekLock (autoComplete: false) - Range 24-28s - receive: 171870.294ms - for receiveAndDelete - Range 170-180s - When the `maxConcurrentCalls = 100` - receive: 2245.992ms - for peekLock (autoComplete: true) - Range 1.6-2.8s - receive: 2238.906ms - for peekLock (autoComplete: false) - Range 2.1-3.5s - receive: 6572.747ms - for receiveAndDelete - Range 6.4-7s
With receiveAndDelete
taking more time than the peekLock
mode, not sure why.
The affect of autoComplete
in peekLock mode can be observed when the maxConcurrentCalls is not set.
Tried rhea-promise with peekLock
and receiveAndDelete
. (Receiving 1000 messages)
recvAndDel mode - 42557.193ms peekLock mode - 1666.800ms
peekLock
is much faster than the receiveAndDelete
mode.
Code
import {
Connection, Receiver, EventContext, ConnectionOptions, ReceiverOptions, delay, ReceiverEvents, types
} from "rhea-promise";
const host = "";
const username = "RootManageSharedAccessKey";
const password = "";
const port = 5671;
const receiverAddress = "partitioned-queue";
async function main(): Promise<void> {
const connectionOptions: ConnectionOptions = {
transport: "tls",
host: host,
hostname: host,
username: username,
password: password,
port: port,
reconnect: false
};
const connection: Connection = new Connection(connectionOptions);
const receiverName = "receiver-1";
// receive messages from the past one hour
const filterClause = `amqp.annotation.x-opt-enqueued-time > '${Date.now() - 3600 * 1000}'`;
const receiverOptions: ReceiverOptions = {
name: receiverName,
// receiveAndDelete
// autoaccept: true,
// rcv_settle_mode: 0,
// snd_settle_mode: 1,
// peekLock
autoaccept: false,
rcv_settle_mode: 1,
snd_settle_mode: 0,
source: {
address: receiverAddress,
filter: {
"apache.org:selector-filter:string": types.wrap_described(filterClause, 0x468C00000004)
}
},
onSessionError: (context: EventContext) => {
const sessionError = context.session && context.session.error;
if (sessionError) {
console.log(">>>>> [%s] An error occurred for session of receiver '%s': %O.",
connection.id, receiverName, sessionError);
}
}
};
await connection.open();
const receiver: Receiver = await connection.createReceiver(receiverOptions);
console.time("receiving");
let count = 0;
receiver.on(ReceiverEvents.message, (context: EventContext) => {
console.log("Received message: %O", context.message.message_id);
count++;
if (count == 1000) {
console.timeEnd("receiving");
}
});
receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {
const receiverError = context.receiver && context.receiver.error;
if (receiverError) {
console.log(">>>>> [%s] An error occurred for receiver '%s': %O.",
connection.id, receiverName, receiverError);
}
});
// sleeping for 2 mins to let the receiver receive messages and then closing it.
await delay(120000);
await receiver.close();
await connection.close();
}
main().catch((err) => console.log(err));
Python SDK results for comparison (From @yunhaoling)
in python, the performance is almost the same
link-credit to 1000: PeekLock: received 1000 msg in 0.36406946182250977s ReceiveAndDelete: received 1000 msg in 0.2190256118774414s
link-credit to 1: (receive one msg per call) PeekLock: received 1000 msg in 55.45851230621338s ReceiveAndDelete: received 1000 msg in 55.25486183166504s
@ramya-rao-a - Can we please get someone assigned to this one? It is currently 536 days old as of today.
Since we are seeing the slowness when working directly with the rhea, this is most likely either a problem with the underlying library we are using or the service itself. Regardless, since there is nothing we can do at the client level to resolve this at the moment, we will be closing this issue.
We will be carrying out perf and stress tests for this package for the new 7.0.0 version which will result in follow up tasks where we will look into this in more details.
Thank you for your patience
Describe the bug It seems that receiving messages from a subscription binded to a partitioned topic is extremely slow.
It appears that it's the same bug reported on the old azure-sdk-for-nodejs and never fixed. For more info see Service Bus methods do not work properly on a partitioned queue
We are experiencing the very same problem, when using amqp10 our e2e test suite get executed in around 1h and all test passes, when we are using @azure/service-bus it takes about 5h and many tests fails because messages are not received in a timeout window of 180 seconds.
To Reproduce Steps to reproduce the behavior: See the linked issue above.
Expected behavior We expect to have better performance on partitioned entities.