AymDev / MessengerAzureBundle

A Symfony 4/5/6 bundle providing a Messenger transport for Azure Service Bus using the Azure REST API.
MIT License
10 stars 7 forks source link

Max delivery count and dead-lettering #11

Open HUG-socloz opened 1 year ago

HUG-socloz commented 1 year ago

Context

In order to use the dead-lettering feature after Max delivery count. We want the message to stay on the queue with the lock. So the lock will expire. And the then the message could be retry.

Then the goal is to use Max delivery count, and Message lock duration for peek-lock

Messenger workflow

In error case : 1 -> messenger call 'reject' method on transport - (the stamp RedeliveryStamp is added) -> the message is deleted
2 -> messenger call 'send' method on transport to recreate the message

How to do

First option

Add a bundle option called for example : keep-locked-on-error: true (used only with peek-lock) When a message is 'reject' and/or 'send' then check if RedeliveryStamp exists and keep-locked-on-error is true to prevent deletion and avoid re-send.

Second option

Implement a middleware which catch errors. When there is an error the middleware catch it and can add a custom stamp for example KeepLockedStamp.

But now we have 2 options :

I prefer the first option because it is relying more on messenger workflow.

AymDev commented 1 year ago

Hello and sorry again for the late reply. I don't know much about this feature: the current workflow recreates the message, preventing the max delivery count being exceeded and the lock to expire, did I understand right ?

I'm ok with your 1st solution. A custom stamp would be nice too, to avoid duplicating the logic on multiple methods but I don't see where we should add it right now.

Feel free to work on a PR, I'd be happy to make a new release.

HUG-socloz commented 1 year ago

I don't know much about this feature: the current workflow recreates the message, preventing the max delivery count being exceeded and the lock to expire, did I understand right ?

Yes, exactly.

Feel free to work on a PR, I'd be happy to make a new release.

I'll do that as soon as I can.

mixllet commented 11 months ago

Hi guys, thanks for a great repo! I've been struggling with this and maybe you could help me out with a few pointers. When/if my serializer throws a SerializerDecodingException i would think that a service bus message in Azure would increment DeliverCount but i can't make that happen. If my serializer validates incoming message and validation doesn't pass, i want to increment DeliveryCount so that the message finally (after max counts) ends up in DLQ. If that is by design, any pointers on how to move a message from main queue to DLQ?

Thank you :-)

AymDev commented 10 months ago

Hello @mixllet , I did not dive into this topic yet (I'm not using ASB anymore), maybe you could check @HUG-socloz's PR on his fork: https://github.com/HUG-socloz/MessengerAzureBundle/pull/1

PRs here are always welcome :slightly_smiling_face:

HUG-socloz commented 10 months ago

Hi ! Sorry I did take the time to answer you.

As @AymDev said I already played with the bundle and use it in production. We use something similar to the PR https://github.com/HUG-socloz/MessengerAzureBundle/pull/1 to manage to use lock expiration and retry automatic provided on service bus. -> the job will go in dead letter after max retry count. The delevery count is here https://github.com/AymDev/MessengerAzureBundle/blob/195c429de0c7b4b9d4453ed33db79c38f00d7a04/src/Messenger/Stamp/AzureBrokerPropertiesStamp.php#L27C14-L27C27

For the delivery count using MessengerAzureBundle current workflow : a job is recreate again and again on errors and then on a azure queue the delivery count is not incremented. I added manually our own counter on serializer.

When the max (implemented) is reached I added a redirection through middlewares to other queues or drop message, depending on functionality.


/**
 * Json serializing job message for azure service bus.
 */
class JobServiceBusSerializer implements SerializerInterface
{
    public function encode(Envelope $envelope): array
    {
       // message serialization
       ....

        $headers = [];

        /** @var null|RedeliveryStamp $redeliveryStamp */
        $resubmitCount = 0;
        $redeliveryStamp = $envelope->last(RedeliveryStamp::class);
        if (null !== $redeliveryStamp) {
            $resubmitCount = $redeliveryStamp->getRetryCount();
        }

        $headers['resubmit-count'] = $resubmitCount;
        return [
            'body' => $body,
            'headers' => $headers,
        ];
    }

    public function decode(array $encodedEnvelope): Envelope
    {
       // message deserialization
      ....

        $stamps = [];
        if (key_exists('resubmit-count', $encodedEnvelope['headers'])) {
            $stamps[] = new RedeliveryStamp((int)$encodedEnvelope['headers']['resubmit-count'][0]);
        }

        return new Envelope($message, $stamps);
    }
}