rebus-org / Rebus.AzureServiceBus

:bus: Azure Service Bus transport for Rebus
https://mookid.dk/category/rebus
Other
33 stars 20 forks source link

Support for Session-aware entities #84

Closed georgechond94 closed 1 year ago

georgechond94 commented 1 year ago

This pull request sets the SessionId property on the ServiceBusMessage based on a SessionId header.

This is useful for FIFO scenarios (similar to Aws SQS) as described here: https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions


Rebus is MIT-licensed. The code submitted in this pull request needs to carry the MIT license too. By leaving this text in, I hereby acknowledge that the code submitted in the pull request has the MIT license and can be merged with the Rebus codebase.

CLAassistant commented 1 year ago

CLA assistant check
All committers have signed the CLA.

sabbadino commented 1 year ago

Hi, looking at this

https://learn.microsoft.com/en-us/azure/service-bus-messaging/message-sessions And this https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/servicebus/Azure.Messaging.ServiceBus/samples/Sample03_SendReceiveSessions.md

It seems to me that 1) queues must be created with session aware flag on 2) api to get messages with session aware queues is different than non session aware

Not clear to me if it has already been considered how to surface this from the rebus api

sabbadino commented 1 year ago

Maybe the idea is to have just a single living forever sessionid for the queue ? But in this case what difference would it make if compared to a normal queue with a single consumer ?

mookid8000 commented 1 year ago

@georgechond94 How do sessions work with dead-lettering?

Rebus deadletters messages by copying them to another queue ("error" by default). Will the next message(s) in the session just wait until the dead-lettered message is redelivered?

sabbadino commented 1 year ago

I think this was already answered here: https://github.com/rebus-org/Rebus.AzureServiceBus/issues/59#issuecomment-1249148132

I threw together a linqpad file to test this. The test sends 5 messages in a session, then the receiver throws an exception on message 3.

It looks like it retries step 3 10 times, dead-letters it, then moves on to message 4. It doesn't look like it waits for a message to be completed successfully before moving on.

georgechond94 commented 1 year ago

@sabbadino you are right this is how it works in ASB.

Just pushed some changes which enable receiving session-aware messages. When there are no messages waiting to be received, the service bus client tries to accept the next available session. In that way multiple different SessionIds can be used. The throughput is definitely reduced, but ordering works quite well based on my tests.

PRODUCER (the order the messages were sent):
session 1 - A
session 2 - A
session 1 - B
session 2 - B
session 1 - C
session 3 - A
session 3 - B
session 3 - C
session 2 - C

CONSUMER (the order the messages were consumed):
session 3 - A
session 3 - B
session 3 - C
session 1 - A
session 1 - B
session 1 - C
session 2 - A
session 2 - B
session 2 - C

Any comments/feedback/improvements are definitely welcome. @mookid8000

mookid8000 commented 1 year ago

It should be noted that everyone who wants to use sessions should make themselves aware of their shortcomings, specifically in regard to reordered messages due to messages being dead-lettered (i.e. moved to the error queue, thus having been "correctly handled" in the POV of the ASB session, but being missing from the sequence from the POV of the consumer).

With that said: Cool! Thanks for contributing! 🙂

It's available on NuGet.org as Rebus.AzureServiceBus 9.1.0 now!

mookid8000 commented 1 year ago

Hi @georgechond94 , I just wanted to play around with the new sessions feature, but I couldn't make it work.

My experiment is here: https://github.com/rebus-org/Rebus.AzureServiceBus/blob/master/Rebus.AzureServiceBus.Tests/Checks/PlayAroundWithSessions.cs

I get a timeout when starting the bus, apparently coming from calling

                _messageReceiver = await _client.AcceptNextSessionAsync(Address, receiverSessionOptions, _cancellationToken).ConfigureAwait(false);

during initialization.

Could you maybe tell me what I am doing wrong?


Here's the full stack trace:

Rebus.Injection.ResolutionException : Could not resolve Rebus.Bus.IBus with decorator depth 0 - registrations: Rebus.Injection.Injectionist+Handler
  ----> Azure.Messaging.ServiceBus.ServiceBusException : The operation did not complete within the allocated time 00:01:00 for object receiver26. (ServiceTimeout)
  ----> System.TimeoutException : The operation did not complete within the allocated time 00:01:00 for object receiver26.
   at Rebus.Injection.Injectionist.ResolutionContext.Get[TService]()
   at Rebus.Injection.Injectionist.Get[TService]()
   at Rebus.Config.RebusConfigurer.Start()
   at Rebus.AzureServiceBus.Tests.Checks.PlayAroundWithSessions.CreateBus(String inputQueueName, Action`1 routing, Action`1 handlers) in C:\projects-rebus\Rebus.AzureServiceBus\Rebus.AzureServiceBus.Tests\Checks\PlayAroundWithSessions.cs:line 122
   at Rebus.AzureServiceBus.Tests.Checks.PlayAroundWithSessions.CanReceiveOrderedMessages(Int32 sessionCount, Int32 messageCount) in C:\projects-rebus\Rebus.AzureServiceBus\Rebus.AzureServiceBus.Tests\Checks\PlayAroundWithSessions.cs:line 57
   at NUnit.Framework.Internal.TaskAwaitAdapter.GenericAdapter`1.GetResult()
   at NUnit.Framework.Internal.AsyncToSyncAdapter.Await(Func`1 invoke)
   at NUnit.Framework.Internal.Commands.TestMethodCommand.RunTestMethod(TestExecutionContext context)
   at NUnit.Framework.Internal.Commands.TestMethodCommand.Execute(TestExecutionContext context)
   at NUnit.Framework.Internal.Commands.BeforeAndAfterTestCommand.<>c__DisplayClass1_0.<Execute>b__0()
   at NUnit.Framework.Internal.Commands.DelegatingTestCommand.RunTestMethodInThreadAbortSafeZone(TestExecutionContext context, Action action)
--ServiceBusException
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logRetriesAsVerbose)
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.OpenLinkAsync(CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.ServiceBusReceiver.OpenLinkAsync(CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.ServiceBusSessionReceiver.CreateSessionReceiverAsync(String entityPath, ServiceBusConnection connection, ServiceBusSessionReceiverOptions options, String sessionId, CancellationToken cancellationToken, Boolean isProcessor)
   at Azure.Messaging.ServiceBus.ServiceBusClient.AcceptNextSessionAsync(String queueName, ServiceBusSessionReceiverOptions options, CancellationToken cancellationToken)
   at Rebus.AzureServiceBus.AzureServiceBusTransport.<Initialize>b__44_0() in C:\projects-rebus\Rebus.AzureServiceBus\Rebus.AzureServiceBus\AzureServiceBus\AzureServiceBusTransport.cs:line 838
   at Rebus.Internals.AsyncHelpers.CustomSynchronizationContext.<Run>b__7_0(Object _) in C:\projects-rebus\Rebus.AzureServiceBus\Rebus.AzureServiceBus\Internals\AsyncHelpers.cs:line 71
   at Rebus.Internals.AsyncHelpers.CustomSynchronizationContext.Run() in C:\projects-rebus\Rebus.AzureServiceBus\Rebus.AzureServiceBus\Internals\AsyncHelpers.cs:line 92
   at Rebus.Internals.AsyncHelpers.RunSync(Func`1 task) in C:\projects-rebus\Rebus.AzureServiceBus\Rebus.AzureServiceBus\Internals\AsyncHelpers.cs:line 30
   at Rebus.AzureServiceBus.AzureServiceBusTransport.Initialize() in C:\projects-rebus\Rebus.AzureServiceBus\Rebus.AzureServiceBus\AzureServiceBus\AzureServiceBusTransport.cs:line 820
   at Rebus.Config.RebusConfigurer.<>c__DisplayClass13_0.<Start>b__28(IResolutionContext c)
   at Rebus.Injection.Injectionist.Resolver`1.InvokeResolver(IResolutionContext context)
   at Rebus.Injection.Injectionist.ResolutionContext.Get[TService]()
--TimeoutException
   at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.Azure.Amqp.AmqpObject.OpenAsyncResult.End(IAsyncResult result)
   at Microsoft.Azure.Amqp.AmqpObject.EndOpen(IAsyncResult result)
   at Microsoft.Azure.Amqp.AmqpObject.<>c.<OpenAsync>b__53_1(IAsyncResult r)
   at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location ---
   at Azure.Messaging.ServiceBus.Amqp.AmqpConnectionScope.OpenAmqpObjectCoreAsync(AmqpObject target, String entityPath, Nullable`1 timeout, Nullable`1 cancellationToken)
   at Azure.Messaging.ServiceBus.Amqp.AmqpConnectionScope.OpenAmqpLinkAsync(AmqpLink link, String entityPath, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.Amqp.AmqpConnectionScope.OpenReceiverLinkAsync(String identifier, String entityPath, TimeSpan timeout, UInt32 prefetchCount, ServiceBusReceiveMode receiveMode, String sessionId, Boolean isSessionReceiver, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.OpenReceiverLinkAsync(TimeSpan timeout, UInt32 prefetchCount, ServiceBusReceiveMode receiveMode, String identifier, CancellationToken cancellationToken)
   at Microsoft.Azure.Amqp.FaultTolerantAmqpObject`1.OnCreateAsync(TimeSpan timeout, CancellationToken cancellationToken)
   at Microsoft.Azure.Amqp.Singleton`1.GetOrCreateAsync(TimeSpan timeout, CancellationToken cancellationToken)
   at Microsoft.Azure.Amqp.Singleton`1.GetOrCreateAsync(TimeSpan timeout, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.<>c.<<OpenLinkAsync>b__74_0>d.MoveNext()
--- End of stack trace from previous location ---
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logRetriesAsVerbose)
georgechond94 commented 1 year ago

@mookid8000 hmm interesting, just tried it out again with the console app I was playing with (I let it running for minutes) and I didn't get that exception.

This is how I set it up:

const string QueueName = "sessionqueue";

const string ConnectionString = "connectionstring";

using var activator = new BuiltinHandlerActivator();

activator.Register(() => new SessionHandler());

Configure.With(activator)
    .Logging(l => l.None())
    .Transport(t =>
        {
            t.UseAzureServiceBus(ConnectionString, QueueName)
                .EnableSessions();
        }
    )
    .Routing(r => r.TypeBased()
        .Map<TestMessage>(QueueName)
    )
    .Start();

I will try yours tests a bit later today.

mookid8000 commented 1 year ago

Did you send messages to it before starting it?

Because it seems like it's calling the AcceptNextSessionAsync method in Initialize, which I believe would complete immediately if the queue contained a message at that point in time, but it seems to block until timing out after a minute when the queue is empty.

georgechond94 commented 1 year ago

@mookid8000 no the queue was empty.

But you are right, I just noticed that calling AcceptNextSessionAsync in Initialize blocks (at .Start();), compared to CreateReceiver, until a message is received. When a message is received it's not blocking anymore and works "as expected".

Still no timeout on my side, which is a bit weird as the remarks of AcceptNextSessionAsync says:

Remarks:
Because this is establishing a session lock, this method performs a service call. If there are no available messages in the queue, this will throw a ServiceBusException with Reason of ServiceTimeout.

Any idea if this can be called somewhere else instead of in the Initialize()?

Also, do we have to use the ServiceBus(Session)Receiver way of receiving messages? There are also the ServiceBus(Session)Processor wrappers which is the recommended/"friendlier"/event-based way of receiving messages. I will take a look and see if I can make that work.

georgechond94 commented 1 year ago

@mookid8000 #85 I managed to replace the receiver with the processor wrappers, which also fixes the issue mentioned above. (Also the throughput is way better now when sessions are enabled)

Note that MessageLockRenewer was also removed as lock renewal is now handled by the processors.

It definitely needs some more testing, but so far it works great!