paolosalvatori / ServiceBusExplorer

The Service Bus Explorer allows users to connect to a Service Bus namespace and administer messaging entities in an easy manner. The tool provides advanced features like import/export functionality or the ability to test topic, queues, subscriptions, relay services, notification hubs and events hubs.
MIT License
2.01k stars 585 forks source link

Move messages from one queue to another in a safe transact manner #725

Open AndersMalmgren opened 1 year ago

AndersMalmgren commented 1 year ago

Hi. We have made an application using Masstransit and azure service bus. If something goes wrong and after a few retries masstransit will move the message to a error queue. We need a transact way of moving the message from the error queue back to the original queue. And it needs to be transact so we are sure the message will not be removed from the error queue until it have successfully been publish on the destination queue.

SeanFeldman commented 1 year ago

@AndersMalmgren is your error queue a centralized error queue? I.e. when moving messages back those would be more than one destination?

AndersMalmgren commented 1 year ago

Mass transit adds a error queue with the postfix _error All errors for the queue goes here. I created a little tool for now that does the job in a transact way


    public async Task<IEnumerable<Message>> PeekMessages()
    {
        if (_source != null) await _source.DisposeAsync();

        _source = _client.CreateReceiver($"{Path}_error");
        return (await _source.PeekMessagesAsync(100))
            .Select(m => new Message(m));
    }

    public async Task ResendMessages(IEnumerable<ResendableMessage> messages)
    {
        if (_source == null) throw new ArgumentNullException();

        using var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);
        await using var destination = _client.CreateSender(Path);

        var lookup = messages.ToDictionary(m => m.Seq);

        var lockedMessages = await _source.ReceiveMessagesAsync(1000);

        foreach (var lockedMessage in lockedMessages)
        {
            if (lookup.TryGetValue(lockedMessage.SequenceNumber, out var orginalMessage))
            {
                if(orginalMessage.Message.MessageId != lockedMessage.MessageId) throw new ApplicationException("MessageId desync");
                var send = new ServiceBusMessage(orginalMessage.Message);
                if (orginalMessage.Content != null)
                    send.Body = new BinaryData(orginalMessage.Content);

                await _source.CompleteMessageAsync(lockedMessage);
                await destination.SendMessageAsync(send);
            }
            else
                await _source.AbandonMessageAsync(lockedMessage);
        }

        ts.Complete();
    }
flower7434 commented 1 year ago

I believe I solved this by temporary enable auto forwarding to a temp queue and then back to the original queue.