Closed shlomiassaf closed 4 years ago
@shlomiassaf - one change we've been talking about (to solve a slightly different but related issue) is to provide a way for the user to do a "init connection" without having to do a send/receive operation. So this would occur via an explicit call (where that method exists is under discussion).
Would something like that also work for your needs?
@richardpark-msft yes. It will now be a 2 step operation?
What happens if there is no connection when registering?
What happens on disconnect?
Thabks
@shlomiassaf - basically we'd give the user a choice of when the initialization happens - either explicitly (new functionality we could add) or implicitly (what we do today).
So no change to normal user's workflow but, for people that want more control, you'd have a way of triggering the connection initialization at an explicit time rather than just on first send/receive.
(As I said, just an idea we're kicking around. Any feedback is welcome!)
Another option we are considering is to make the createReceiver()
method return a promise. Whether the promise gets resolved or rejected depends on whether the receiver links was successfully established or not.
Regarding disconnect, we already have code in place to recover broken links when faced with retryable errors. On exhausting all retries or on facing a non retryable error, the onError
callback gets called. Though we do need a sample that shows best practice for when that happens.
@ramya-rao-a I think that returning a promise from createReceiver()
might do some damage, for several reasons.
1) It's a breaking API, createReceiver
is currently lazy and this might break stuff due to unhandled promise rejections.
2) createReceiver
should not connect automatically on creation unless explicitly instructed to as it connects to the bus in various ways (receiveMessages, registerMessageHandler, renewMessageLock, etc...) and some of them allow a lazy strategy which will no longer be available
3) I think that createSender
and createReceiver
should behave the same, both sync.
If we look at the API, all of the relevant methods in Receiver
return a Promise
except for registerMessageHandler
which does not return (void
)
This is the issue in the design of the API and that method should return a Promise
and the onError
should only pop up for connection issues arising after the initial connection.
Since this will be a huge breaking change, I think that @richardpark-msft suggestion to add the initialization
method is perfect since it does not break the API anywhere and does allow handling initial connection errors.
await receiver.init()
receiver.registerMessageHandler(onMessage, onError, options)
// onError will only throw on handling issues or if the connection has issues after the initial connection
@richardpark-msft @ramya-rao-a
Here's another example for why this is critical, the following code describes the bug:
import { ServiceBusClient, ReceiveMode } from '@azure/service-bus';
async function run() {
const serviceBusClient = ServiceBusClient.createFromConnectionString('YOUR KEY HERE');
const queueClient = serviceBusClient.createQueueClient('YOUR_ENTITY_ID_HERE');
const receiver = queueClient.createReceiver(ReceiveMode.peekLock);
receiver.registerMessageHandler(async () => {}, e => console.error(e));
// uncomment for it to work
// await new Promise( r => { setTimeout(r, 2000); })
await queueClient.close();
await receiver.close();
await serviceBusClient.close();
}
run();
The code above will never exit, leaving us with a memory leak because the connection never closes.
This is because it happens to fast and the library, when closed is called, thinks it is not connected...
The actual state of the connection at that point is not opened but also not closed (actually, local is opened remote is closed)
Because there's no way for the user to identify when the connection is done an artificial "setTimeout" is required, which ofcourse is not something one can count on.
I would expect having a promise to know when it's done, as well as a cancellation token to cancel the operation if I would like to stop before it finishe.
Note that this was done in 1.1.6
As a side note, the ugly way I currently solve this is by using receiver.isReceivingMessages()
and polling it with over and over, wrapped in a promise with a safety timeout... ugly but for now I don't see any other choice
import { OnMessage, OnError, MessageHandlerOptions, MessagingError, Receiver, SessionReceiver } from '@azure/service-bus';
/**
* The async version for `Receiver.registerMessageHandler` with feedback on the connection status.
*
* Remove then https://github.com/Azure/azure-sdk-for-js/issues/7986 is resolved
*/
export async function registerMessageHandler(receiver: Receiver | SessionReceiver, onMessage: OnMessage, onError: OnError, options?: MessageHandlerOptions) {
return new Promise( (resolve, reject) => {
let done = false;
const onErrorRouter: OnError = err => {
if (done || err instanceof MessagingError) {
onError(err);
}
else {
done = true;
reject(err);
}
};
const poll = () => {
setTimeout(() => {
if (receiver.isReceivingMessages()) {
done = true;
resolve();
} else if (!done) {
poll();
}
}, 10);
};
receiver.registerMessageHandler(onMessage, onErrorRouter);
poll();
});
}
First, thank you for keeping the conversation going with this @shlomiassaf - you've been our reference customer as we talk about how to implement this!
What you've written above looks like just an outright bug in our logic, ignoring the method that we use to determine if the connection is open or not. The main issue here is that we also have background reconnect logic as well. So all of that needs to properly obey close().
We will be looking into this this month - I'll post updates as we have them.
@richardpark-msft On another subject, I would appreciate if you guys take a look at a new library I built that allows using @azure/service-bus
with NestJS
.
This can significantly increase the exposure of service bus to a lot of JS developers.
Thanks!
@shlomiassaf, I've got a tentative solution that looks good to our team that will fix the issue you posted above. The fix is pretty simple - it just makes sure that if we're closing the receiver that we wait until initialization has completed before returning.
The init process isn't easily cancellable at the moment so this is a simpler way of ensuring that you don't end up with an object that appears closed but does have a receiver link open. I'll be posting a PR for this sometime today after some more testing.
On re-reading the issue, I did have a question about something you wrote earlier about making createReceiver()
async:
- It's a breaking API, createReceiver is currently lazy and this might break stuff due to unhandled promise rejections.
Can you elaborate more on the "unhandled promise rejections" part? Is the concern that users might not realize they're getting back a Promise? Or something else?
Just for some information - in the newer version of the library (7.0.0-preview.2) our plan is to make createReceiver()
async (this is already done for createSender()
and createSessionReceiver()
). This means the link will be active and ready immediately rather than waiting for first operation. Our goal is to move initialization higher in the stack so the usage model will be simpler - if you have a Receiver (or Sender) the object is live and ready to be used.
BTW, implementing a lazy strategy will still be possible - ServiceBusClient
itself will remain a lazy initialized object.
Yes, since createReceiver()
returns void and changing it to return a promise will not break anything in the user applications.
I guess that with such a huge change moving from 1 to 7, this should pass without much noise but not sure that all of the consumers will refactor their code so it will handle that promise...
Update:
Thanks for your time & patience!
Is your feature request related to a problem? Please describe.
The call to
registerMessageHandler
accept's anonError
callback which throws anError
for any connection related issue orMessageError
for any issue thrown while handling an incoming message.This creates a problem when trying to determine the result of the connect operation.
onError
is invoked with the errorIf the flow requires several operations, running in sequence with one being
registerMessageHandler
and the other, running after a successful connection, this is not easy to achive.One can use the
onError
callback to catch such issues but if no error occurs?Describe the solution you'd like
registerMessageHandler
should return a promise that will resolve when connection is up and it's ready to receive message.The promise will throw if it fails, i.e. if the receiver is already in state of receiving messages or if there was a problem connecting to service bus.
The
onError
will be used forMessageError
and for connection errors that occur AFTER the initial connection resolves.In addition, the flow is will now be more clear, as
registerMessageHandler
.I guess there is an API problem here, as this is a breaking change... but I think this one is required.