Particular / NServiceBus.AzureFunctions.InProcess.ServiceBus

Process messages in AzureFunctions using the Azure Service Bus trigger and the NServiceBus message pipeline.
Other
11 stars 5 forks source link

Azure Service Bus CreateRule Exception #748

Open Lars-Kolsaker opened 7 months ago

Lars-Kolsaker commented 7 months ago

Describe the bug

Description

We have starting to receive the following exception in our logs. Not sure if this is a bug or is some misuse. Had a discussion on ParticularDiscussion (https://discuss.particular.net/t/createrule-exception/3774) and was recommend to raise an issue here.

sbt-adp-adapter-incoming/Subscriptions/nsb-sub-avonovaAdapter-xxxxxxxx: CreateRule Exception: Azure.Messaging.ServiceBus.ServiceBusException: The messaging entity 'sb-adp-westeurope-prod:Topic:sbt-adp-adapter-incoming|nsb-sub-avonovaadapter|Adp.Common.Messages.Mds.ReceiveMessageMdsCompany' already exists. To know more visit https://aka.ms/sbResourceMgrExceptions.  TrackingId:7e7ac61f-3e7f-4920-bbf7-004242148300_B14, SystemTracker:NoSystemTracker, Timestamp:2023-11-29T12:37:27 Reference:e23d7c27-199c-495f-92ad-ad49cec2d65c, TrackingId:2cc0489a-0556-4d37-982f-41170123d009_G22, SystemTracker:NoSystemTracker, Timestamp:2023-11-29T12:37:27 (MessagingEntityAlreadyExists). For troubleshooting information, see https://aka.ms/azsdk/net/servicebus/exceptions/troubleshoot.
   at Azure.Messaging.ServiceBus.Amqp.AmqpRuleManager.AddRuleInternalAsync(RuleProperties description, TimeSpan timeout)
   at Azure.Messaging.ServiceBus.Amqp.AmqpRuleManager.<>c.<<CreateRuleAsync>b__8_0>d.MoveNext()
--- End of stack trace from previous location ---
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.<>c__22`1.<<RunOperation>b__22_0>d.MoveNext()
--- End of stack trace from previous location ---
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose)
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose)
   at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.Amqp.AmqpRuleManager.CreateRuleAsync(RuleProperties properties, CancellationToken cancellationToken)
   at Azure.Messaging.ServiceBus.ServiceBusRuleManager.CreateRuleAsync(CreateRuleO

This exception occurs when our function app receives a Service bus message in our implementation of the IMessageHandler (see start of code below).

Does NServicebus try to add a filter rule to the topic ?

Here is some traces from application insight that hints me that this initiated by NServiceBus


11/29/2023, 1:37:26 PM
-
Trace
Executing 'NServiceBusFunctionEndpointTrigger-nsb-sub-avonovaAdapter' (Reason='(null)', Id=cf36c430-1dcd-4a6d-bfb8-88b729b42056)
Severity level: InformationParent Id: c855c800fa83fe57
11/29/2023, 1:37:26 PM
-
Trace
Trigger Details: MessageId: 3a36a7d7-c285-48e6-9a3f-0b4bd3410ac3, SequenceNumber: 1013424, DeliveryCount: 1, EnqueuedTimeUtc: 2023-11-29T12:37:26.4700000+00:00, LockedUntilUtc: 2023-11-29T12:38:26.4700000+00:00, SessionId: (null)
Severity level: InformationParent Id: c855c800fa83fe57
11/29/2023, 1:37:26 PM
-
Trace
No valid license could be found. Falling back to trial license with start date '2023-11-29'.
Severity level: InformationParent Id: c855c800fa83fe57
11/29/2023, 1:37:26 PM
-
Trace
Auditing processed messages to 'nsb-sbq-audit'
Severity level: InformationParent Id: c855c800fa83fe57
11/29/2023, 1:37:27 PM
-
Trace
sbt-adp-adapter-incoming/Subscriptions/nsb-sub-avonovaAdapter-35acba06-1835-4dea-89a5-88d4107419f1: CreateRule Exception: Azure.Messaging.ServiceBus.ServiceBusException: The messaging entity 'sb-adp-westeurope-prod:Topic:sbt-adp-adapter-incoming|nsb-sub-avonovaadapter|Adp.Common.Messages.Mds.ReceiveMessageMdsPerson' already exists. To know more visit https://aka.ms/sbResourceMgrExceptions. TrackingId:d2af9b10-7b3a-45e4-8e88-632d83e77670_B14, SystemTracker:NoSystemTracker, Timestamp:2023-11-29T12:37:27 Reference:917a4bd9-223c-4cf7-84af-ffa4275e454e, TrackingId:cdea97b2-03a4-4b0d-a8a4-cfe1c4d50cd4_G22, SystemTracker:NoSystemTracker, Timestamp:2023-11-29T12:37:27 (MessagingEntityAlreadyExists). For troubleshooting information, see https://aka.ms/azsdk/net/servicebus/exceptions/troubleshoot. at Azure.Messaging.ServiceBus.Amqp.AmqpRuleManager.AddRuleInternalAsync(RuleProperties description, TimeSpan timeout) at Azure.Messaging.ServiceBus.Amqp.AmqpRuleManager.<>c.<<CreateRuleAsync>b__8_0>d.MoveNext() --- End of stack trace from previous location --- at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.<>c__22`1.<<RunOperation>b__22_0>d.MoveNext() --- End of stack trace from previous location --- at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose) at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose) at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken) at Azure.Messaging.ServiceBus.Amqp.AmqpRuleManager.CreateRuleAsync(RuleProperties properties, CancellationToken cancellationToken) at Azure.Messaging.ServiceBus.ServiceBusRuleManager.CreateRuleAsync(CreateRuleOptions options, CancellationToken cancellationToken). RuleName = Adp.Common.Messages.Mds.ReceiveMessageMdsPerson
Severity level: ErrorParent Id: c855c800fa83fe57
11/29/2023, 1:37:27 PM
-
Trace
sbt-adp-adapter-incoming/Subscriptions/nsb-sub-avonovaAdapter-35acba06-1835-4dea-89a5-88d4107419f1: CreateRule Exception: Azure.Messaging.ServiceBus.ServiceBusException: The messaging entity 'sb-adp-westeurope-prod:Topic:sbt-adp-adapter-incoming|nsb-sub-avonovaadapter|Adp.Common.Messages.Mds.ReceiveMessageMdsCompany' already exists. To know more visit https://aka.ms/sbResourceMgrExceptions. TrackingId:7e7ac61f-3e7f-4920-bbf7-004242148300_B14, SystemTracker:NoSystemTracker, Timestamp:2023-11-29T12:37:27 Reference:e23d7c27-199c-495f-92ad-ad49cec2d65c, TrackingId:2cc0489a-0556-4d37-982f-41170123d009_G22, SystemTracker:NoSystemTracker, Timestamp:2023-11-29T12:37:27 (MessagingEntityAlreadyExists). For troubleshooting information, see https://aka.ms/azsdk/net/servicebus/exceptions/troubleshoot. at Azure.Messaging.ServiceBus.Amqp.AmqpRuleManager.AddRuleInternalAsync(RuleProperties description, TimeSpan timeout) at Azure.Messaging.ServiceBus.Amqp.AmqpRuleManager.<>c.<<CreateRuleAsync>b__8_0>d.MoveNext() --- End of stack trace from previous location --- at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.<>c__22`1.<<RunOperation>b__22_0>d.MoveNext() --- End of stack trace from previous location --- at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose) at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose) at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1](Func`4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken) at Azure.Messaging.ServiceBus.Amqp.AmqpRuleManager.CreateRuleAsync(RuleProperties properties, CancellationToken cancellationToken) at Azure.Messaging.ServiceBus.ServiceBusRuleManager.CreateRuleAsync(CreateRuleOptions options, CancellationToken cancellationToken). RuleName = Adp.Common.Messages.Mds.ReceiveMessageMdsCompany
Severity level: ErrorParent Id: c855c800fa83fe57
11/29/2023, 1:37:27 PM
-
Trace
Handling ReceiveMessageMdsPerson, MessageId: 65be7af420014f3d846b1943b6ab8a02, contextId: 21b329b3-9bef-41dd-add0-b0c900d0097f, HandleMessage, F:\Agents\_work\12\s\Src\Adp.Adapter.AvonovaDigital\MessageHandlers\ReceiveMessageMdsHandler.cs, 54
Severity level: InformationParent Id: c855c800fa83fe57
11/29/2023, 1:37:27 PM
-
Trace
OriginalSource is AvonovaDigital, HandleMessage, F:\Agents\_work\12\s\Src\Adp.Adapter.AvonovaDigital\MessageHandlers\ReceiveMessageMdsHandler.cs, 66
Severity level: InformationParent Id: c855c800fa83fe57
11/29/2023, 1:37:28 PM
-
Trace
Executed 'NServiceBusFunctionEndpointTrigger-nsb-sub-avonovaAdapter' (Succeeded, Id=cf36c430-1dcd-4a6d-bfb8-88b729b42056, Duration=1470ms)
Severity level: InformationParent Id: c855c800fa83fe57

And here is part of our IMessageHandler implementation

public class ReceiveMessageMdsHandler :
    IHandleMessages<ReceiveMessageMdsCompany>,
    IHandleMessages<ReceiveMessageMdsPerson>
{
    private readonly IMediator _mediator;
    private readonly ILogger _logger;
    private readonly IFeatureFlags _featureFlags;

    public ReceiveMessageMdsHandler(IMediator mediator, ILogger<ReceiveMessageMdsHandler> logger, IFeatureFlags featureFlags)
    {
        _mediator = mediator;
        _logger = logger;
        _featureFlags = featureFlags;
    }

    public async Task Handle(ReceiveMessageMdsCompany message, IMessageHandlerContext context)
    {
        await HandleMessage<Company>(message, context);
        await context.Reply(new PublishSuccessAvonovaDigitalCompany());
    }

    public async Task Handle(ReceiveMessageMdsPerson message, IMessageHandlerContext context)
    {
        await HandleMessage<Person>(message, context);
        await context.Reply(new PublishSuccessAvonovaDigitalPerson());
    }

    private async Task HandleMessage<T>(IReceiveMdsMessage<T> message, IMessageHandlerContext context)
        where T : BaseDto
    {
        _logger.LogExtInfo($"Handling {message.GetType().Name}, MessageId: {message.MessageId}, contextId: {context.MessageId}");

        var adpMessage = message.AdpMessage;

And here is our startup with NserviceBus configuration

  public const string EndpointName = "nsb-sub-avonovaAdapter";

   private const string ErrorQueueName = "nsb-sbq-error";
   private const string AuditQueueName = "nsb-sbq-audit";
   private const string TopicName = "sbt-adp-adapter-incoming";

   public override void Configure(IFunctionsHostBuilder builder)
   {
       builder.Services.AddSingleton<IAvonovaDigitalCommonService, AvonovaDigitalCommonService>();
       builder.Services.AddSingleton<IAvonovaDigitalAdpService, AvonovaDigitalAdpService>();
       builder.Services.AddSingleton<IValidator, Validator>();
       builder.Services.AddSingleton<IRequestProcessor, RequestProcessor>();
       builder.Services.AddScoped<IUpserter, Upserter>();

       builder.Services.AddAutoMapper(typeof(DtoToAdProfile));

       builder.UseNServiceBus(nsbConfiguration =>
       {
           nsbConfiguration.Transport.Topology = TopicTopology.Single(TopicName);

           nsbConfiguration.AdvancedConfiguration.Recoverability()
               .Immediate(immediate =>
               {
                   immediate.NumberOfRetries(0);
               })
               .Delayed(delayed =>
               {
                   delayed.NumberOfRetries(5);
                   delayed.TimeIncrease(TimeSpan.FromSeconds(10));
               });

           nsbConfiguration.AdvancedConfiguration.SendFailedMessagesTo(ErrorQueueName);
           nsbConfiguration.AdvancedConfiguration.AuditProcessedMessagesTo(AuditQueueName);
           nsbConfiguration.AdvancedConfiguration.Recoverability().AddUnrecoverableException<UnrecoverableException>();
       });

Expected behavior

Actual behavior

Versions

We are using the following NServiceBus components NServiceBus.AzureFunctions.InProcess.ServiceBus 4.2.0 NServiceBus.Transport.AzureServiceBus 3.2.1

Steps to reproduce

See description above

Relevant log output

No response

Additional Information

Workarounds

Possible solutions

Additional information

mauroservienti commented 6 months ago

Thanks for raising this issue, @Lars-Kolsaker.

In talking with a colleague, he made me notice that we explicitly guard against this from happening in the transport code.

I'll try to reproduce the issue on my side to better understand what's going on.

mauroservienti commented 6 months ago

@Lars-Kolsaker, I did the following:

<PackageReference Include="NServiceBus.AzureFunctions.InProcess.ServiceBus" Version="4.2.0" />
<PackageReference Include="NServiceBus.Transport.AzureServiceBus" Version="3.2.1" />
public class SomethingHappened : IEvent
{
    public string What { get; set; }
}
public async Task Handle(TriggerMessage message, IMessageHandlerContext context)
{
    Log.Warn($"Handling {nameof(TriggerMessage)} in {nameof(TriggerMessageHandler)}");
    Log.Warn($"Custom component returned: {customComponent.GetValue()}");

    await context.SendLocal(new FollowupMessage());
    await context.Publish(new SomethingHappened() { What = "Wow!"});
}
public class SomethingHappenedHandler : IHandleMessages<SomethingHappened>
{
    static readonly ILog Log = LogManager.GetLogger<SomethingHappenedHandler>();
    public Task Handle(SomethingHappened message, IMessageHandlerContext context)
    {
        Log.Warn($"Handling {nameof(SomethingHappened)} in {nameof(SomethingHappenedHandler)}: {message.What}.");

        return Task.CompletedTask;
    }
}

Running the sample works as expected. I can inspect the bundle-1 and see the newly added rule:

image

Running the sample more than once, which retries the rule creation, doesn't throw.

Can you please try to do the same and see if you manage to make it fail similarly to what happens in your environment?

mauroservienti commented 6 months ago

One more thing, can you show me the definitions for the ReceiveMessageMdsCompany and ReceiveMessageMdsPerson event types?

Lars-Kolsaker commented 6 months ago

@mauroservienti sorry for late reply, but I did not got any notifications about your reply

Have not any problem with using the sample application. Our application has been working for a while, but are now throwing this exception.

Could be due to our configuration of the transport topology ref Sean Feldman comment in the discussion group

You’re configuring the transport topology. This might be an issue if the transport assembly is picked up.

Lars-Kolsaker commented 6 months ago

And here is the definitions

using Adp.Common.Models.DTO;

namespace Adp.Common.Messages.Mds;

// WARNING! Changed name will affect the NServiceBus subscription
public class ReceiveMessageMdsCompany : IReceiveMdsMessage<Company>
{
    public string? MessageId { get; set; }
    public AdpMessage<Company>? AdpMessage { get; set; }
}
using Adp.Common.Models.DTO;

namespace Adp.Common.Messages.Mds;

// WARNING! Changed name will affect the NServiceBus subscription
public class ReceiveMessageMdsPerson : IReceiveMdsMessage<Person>
{
    public string? MessageId { get; set; }
    public AdpMessage<Person>? AdpMessage { get; set; }
}
`using Adp.Common.Models.Enums;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;

namespace Adp.Common.Messages;

public class AdpMessage<T> where T : class
{
    [JsonConverter(typeof(StringEnumConverter))]
    public EntityType EntityType { get; set; }
    [JsonConverter(typeof(StringEnumConverter))]
    public ActionType Action { get; set; }
    [JsonConverter(typeof(StringEnumConverter))]
    public Source Source { get; set; }
    [JsonConverter(typeof(StringEnumConverter))]
    public Source OriginalSource { get; set; }
    public T Data { get; set; }
}

public enum ActionType
{
    Upsert = 0,
    Delete = 2
}

public enum EntityType
{
    Unknown = -1,
    User = 0,
    Company = 1,
    Person = 2,
    Unit = 3,
    Customer = 4,
    Location = 5
}
Lars-Kolsaker commented 5 months ago

@mauroservienti Any news on this?

mauroservienti commented 5 months ago

I made no progress. I'm still not able to reproduce the issue you're facing. And sorry for getting back to you late, I should have provided an update earlier.

mauroservienti commented 5 months ago

I forgot about adding that it has nothing to do with the custom topology. I ran the same sample using a custom topic name, which worked as expected. I suspect it might be related to inheritance and generics in the messages' definitions. I'll try to factor that in my example.

mauroservienti commented 5 months ago

@Lars-Kolsaker I added two messages with a polymorphic structure similar to yours to my example. It still works as expected without throwing any exceptions. I published the example on my GitHub account.

Could you look at it and see if you can spot any difference in your environment?