rebus-org / Rebus.AzureServiceBus

:bus: Azure Service Bus transport for Rebus
https://mookid.dk/category/rebus
Other
33 stars 20 forks source link

Azure Service Bus provider appears to be broken #62

Closed sfmskywalker closed 3 years ago

sfmskywalker commented 3 years ago

In order to see how to use the Azure Service Bus provider works, I cloned the Rebus Samples repository and updated the RabbitTopics project to use the Azure Service Bus provider. Before the change, the program worked as advertised. But when I run the program with my changes, it fails with the following error:

An error occurred when attempting to receive the next message: System.InvalidOperationException: Operation is not valid due to the current state of the object.
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.OnReceiveAsync(Int32 maxMessageCount, TimeSpan serverWaitTime)
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.<>c__DisplayClass64_0.<<ReceiveAsync>b__0>d.MoveNext()
--- End of stack trace from previous location ---
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.<>c__DisplayClass64_0.<<ReceiveAsync>b__0>d.MoveNext()
--- End of stack trace from previous location ---
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func`1 operation, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.ReceiveAsync(Int32 maxMessageCount, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.ReceiveAsync(Int32 maxMessageCount, TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.ReceiveAsync(TimeSpan operationTimeout)
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.ReceiveAsync(TimeSpan operationTimeout)
   at Rebus.AzureServiceBus.AzureServiceBusTransport.ReceiveInternal()
   at Rebus.AzureServiceBus.AzureServiceBusTransport.ReceiveInternal()
   at Rebus.AzureServiceBus.AzureServiceBusTransport.ReceiveInternal()
   at Rebus.AzureServiceBus.AzureServiceBusTransport.ReceiveInternal()
   at Rebus.AzureServiceBus.AzureServiceBusTransport.Receive(ITransactionContext context, CancellationToken cancellationToken)
   at Rebus.AzureServiceBus.AzureServiceBusTransport.Receive(ITransactionContext context, CancellationToken cancellationToken)
   at Rebus.Workers.ThreadPoolBased.ThreadPoolWorker.ReceiveTransportMessage(CancellationToken token, ITransactionContext context)
   at Rebus.Workers.ThreadPoolBased.ThreadPoolWorker.ReceiveTransportMessage(CancellationToken token, ITransactionContext context)
   at Rebus.Workers.ThreadPoolBased.ThreadPoolWorker.ReceiveTransportMessage(CancellationToken token, ITransactionContext context)
   at Rebus.Workers.ThreadPoolBased.ThreadPoolWorker.ReceiveTransportMessage(CancellationToken token, ITransactionContext context)

Here's my complete Program.cs file:

using System;
using Rebus.Activation;
using Rebus.Config;
using Rebus.Logging;
using Rebus.RabbitMq;
#pragma warning disable 1998

namespace RabbitTopics
{
    class Program
    {
        const LogLevel MinimumLogLevel = LogLevel.Warn;
        //const string ConnectionString = "amqp://localhost";
        const string ConnectionString = "(too secret for you, replace with your own Azure service Bus endpoint :))";

        static void Main()
        {
            using (var publisher = new BuiltinHandlerActivator())
            using (var subscriber1 = new BuiltinHandlerActivator())
            using (var subscriber2 = new BuiltinHandlerActivator())
            using (var subscriber3 = new BuiltinHandlerActivator())
            {
                ConfigureSubscriber(subscriber1, "endpoint1");
                ConfigureSubscriber(subscriber2, "endpoint2");
                ConfigureSubscriber(subscriber3, "endpoint3");

                subscriber1.Bus.Advanced.Topics.Subscribe("mercedes.#").Wait();
                subscriber2.Bus.Advanced.Topics.Subscribe("mercedes.bmw.#").Wait();
                subscriber3.Bus.Advanced.Topics.Subscribe("mercedes.bmw.vw").Wait();

                var publisherBus = Configure.With(publisher)
                    .Logging(l => l.ColoredConsole(MinimumLogLevel))
                    //.Transport(t => t.UseRabbitMqAsOneWayClient(ConnectionString))
                    .Transport(t => t.UseAzureServiceBusAsOneWayClient(ConnectionString))
                    .Start();

                var topicsApi = publisherBus.Advanced.Topics;

                topicsApi.Publish("mercedes.bmw.vw", "This one should be received by all!").Wait();
                topicsApi.Publish("mercedes.bmw.mazda", "This one should be received by 1 & 2").Wait();
                topicsApi.Publish("mercedes.honda", "This one should be received by 1").Wait();

                Console.WriteLine("Press ENTER to quit");
                Console.ReadLine();
            }
        }

        static void ConfigureSubscriber(BuiltinHandlerActivator activator, string inputQueueName)
        {
            activator.Handle<string>(async str =>
            {
                Console.WriteLine("{0} => '{1}'", str, inputQueueName);
            });

            Configure.With(activator)
                .Logging(l => l.ColoredConsole(MinimumLogLevel))
                //.Transport(t => t.UseRabbitMq(ConnectionString, inputQueueName))
                .Transport(t => t.UseAzureServiceBus(ConnectionString, inputQueueName))
                .Start();
        }
    }
}

I also updated the .csproj file as follows in order to make things build on my local machine:

<Project Sdk="Microsoft.NET.Sdk">
    <PropertyGroup>
        <TargetFramework>net5.0</TargetFramework>
        <OutputType>Exe</OutputType>
    </PropertyGroup>
    <ItemGroup>
        <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.0" />
        <PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
        <PackageReference Include="RabbitMQ.Client" Version="6.2.1" />
        <PackageReference Include="Rebus" Version="6.4.1" />
        <PackageReference Include="Rebus.AzureServiceBus" Version="7.1.6" />
        <PackageReference Include="Rebus.RabbitMq" Version="7.3.0" />
    </ItemGroup>
</Project>

I made sure that I didn't break things by making sure the RabbitMQ provider still works.

sfmskywalker commented 3 years ago

I just realized that I should probably have filed this issue under the Rebus Azure Service Bus project. Let me know if I should re-create it there.

sfmskywalker commented 3 years ago

Ah, so I just came across rebus-org/Rebus#60 and rebus-org/Rebus#61, and tried Rebus.AzureServiceBus 8.0.0-a1, which seems to work a little bit better in that it now no longer throws the exception I mentioned. However, something still doesn't seem right, because only "endpoint3" is now receiving the "This one should be received by all! => 'endpoint3'" message. Console output:

This one should be received by all! => 'endpoint3'
Press ENTER to quit
mookid8000 commented 3 years ago

Sounds weird with the InvalidOperationException... but, could I get you to remove the wildcards from the subscribers' topics?

Azure Service Bus doesn't support wildcards in topics like RabbitMQ does, so a topic like mercedes.# doesn't "catch" events published to any of the used topics. I would expect Rebus to "sanitize" the topic though, so it probably subscribes without an error to a topic like mercedes__, or something like that. But it obviously won't receive anything in this particular sample.

sfmskywalker commented 3 years ago

Ah, good thinking! Indeed that change made things work again 😁

Thanks!