dotnetcore / CAP

Distributed transaction solution in micro-service base on eventually consistency, also an eventbus with Outbox pattern
http://cap.dotnetcore.xyz
MIT License
6.61k stars 1.28k forks source link

Azure Service Bus Custom Consumer #1571

Open KuchaD opened 1 month ago

KuchaD commented 1 month ago

Current Situation: Currently, our Azure Service Bus provider lacks support for multiple namespaces and topics for consumers. We are utilizing a single topic within Azure Service Bus for messaging, and a single namespace for the application.

Solution Attempt: I have attempted to implement a solution that addresses this limitation, and it works as expected. However, I am currently unable to submit a pull request (PR) for this update.

Register in program files.

builder.Services.AddCap(c =>
{
    c.Version = "v1";
    c.UseInMemoryStorage();
    c.UseAzureServiceBus(asb =>
    {
        asb.ConnectionString =
            "Endpoint=xxx";
        asb.CustomHeadersBuilder = (message, serviceProvider) =>
        {
            var snowFlakeId = serviceProvider.GetRequiredService<ISnowflakeId>();

            return new List<KeyValuePair<string, string>>()
                        {

                            new(DotNetCore.CAP.Messages.Headers.MessageId,
                                snowFlakeId.NextId().ToString()),
                            new(DotNetCore.CAP.Messages.Headers.MessageName, message.Subject),
                            new("IsFromSampleProject", "'true'")
                        };
        };
        asb.SQLFilters = new List<KeyValuePair<string, string>>() {
            new("IsFromSampleProjectFilter","IsFromSampleProject = 'true'")
        };

        asb.ConfigureCustomProducer<EntityCreatedForIntegration>(cfg => cfg.UseTopic("entity-created").WithSubscription());
        asb.ConfigureCustomProducer<EntityDeletedForIntegration>(cfg => cfg.UseTopic("entity-deleted").WithSubscription());
        asb.ConfigureCustomConsumer(cfg =>
        {
            cfg.UseGroupName($"test.{c.Version}");
            cfg.UseTopic("entity-created");
        });
    });

    c.UseDashboard();
});

Consumer

    [CapSubscribe(nameof(EntityCreatedForIntegration), Group = "test")]
    public void Handle(EntityCreatedForIntegration message)
    {
        Console.WriteLine($"Message {message.Id} received");
    }

CustomConsumer

public class ServiceBusConsumerDescriptorBuilder
{
    private string GroupName { get; set; } = null!;
    private string? Namespace { get; set; } = null;
    private string TopicPath { get; set; } = null!;

    public ServiceBusConsumerDescriptorBuilder UseGroupName(string groupName)
    {
        GroupName = groupName;
        return this;
    }

    public ServiceBusConsumerDescriptorBuilder UseTopic(string topicPath)
    {
        TopicPath = topicPath;
        return this;
    }

    public ServiceBusConsumerDescriptorBuilder UseNamespace(string @namespace)
    {
        Namespace = @namespace;
        return this;
    }

    public KeyValuePair<string, IServiceBusConsumerDescriptor> Build()
    {
        return new KeyValuePair<string, IServiceBusConsumerDescriptor> (GroupName, new ServiceBusConsumerDescriptor(TopicPath, Namespace));
    }
}

public interface IServiceBusConsumerDescriptor
{
    string TopicPath { get; }
    string? Namespace { get; }
}

public class ServiceBusConsumerDescriptor : IServiceBusConsumerDescriptor
{
    public ServiceBusConsumerDescriptor(string topicPath, string? @namespace)
    {
        TopicPath = topicPath;
        Namespace = @namespace;
    }

    public string TopicPath { get; }
    public string? Namespace { get; }
}

Client factory

internal sealed class AzureServiceBusConsumerClientFactory : IConsumerClientFactory
{
    private readonly IOptions<AzureServiceBusOptions> _asbOptions;
    private readonly ILoggerFactory _loggerFactory;
    private readonly IServiceProvider _serviceProvider;

    public AzureServiceBusConsumerClientFactory(
        ILoggerFactory loggerFactory,
        IOptions<AzureServiceBusOptions> asbOptions,
        IServiceProvider serviceProvider)
    {
        _loggerFactory = loggerFactory;
        _asbOptions = asbOptions;
        _serviceProvider = serviceProvider;
    }

    public IConsumerClient Create(string groupName, byte groupConcurrent)
    {
        try
        {
            var logger = _loggerFactory.CreateLogger(typeof(AzureServiceBusConsumerClient));
            if (_asbOptions.Value.CustomConsumers.TryGetValue(groupName, out var customConsumer))
            {
                var customClient = new AzureServiceBusConsumerClient(logger, groupName, groupConcurrent, _asbOptions, _serviceProvider, customConsumer);
                customClient.ConnectAsync().GetAwaiter().GetResult();
                return customClient;
            }

            var client = new AzureServiceBusConsumerClient(logger, groupName, groupConcurrent, _asbOptions, _serviceProvider);
            client.ConnectAsync().GetAwaiter().GetResult();
            return client;
        }
        catch (Exception e)
        {
            throw new BrokerConnectionException(e);
        }
    }
}

Ctor

    public AzureServiceBusConsumerClient(
        ILogger logger,
        string subscriptionName,
        byte groupConcurrent,
        IOptions<AzureServiceBusOptions> options,
        IServiceProvider serviceProvider,
        IServiceBusConsumerDescriptor? consumerDescriptor = null
        )
    {
        _logger = logger;
        _subscriptionName = subscriptionName;
        _groupConcurrent = groupConcurrent;
        _semaphore = new SemaphoreSlim(groupConcurrent);
        _serviceProvider = serviceProvider;
        _asbOptions = options.Value ?? throw new ArgumentNullException(nameof(options));

        if (consumerDescriptor is not null)
        {
            _asbOptions.TopicPath = consumerDescriptor.TopicPath;
            _asbOptions.Namespace = consumerDescriptor.Namespace ?? _asbOptions.Namespace;
        }

        CheckValidSubscriptionName(subscriptionName);
    }
Admi99 commented 3 weeks ago

@yang-xiaodong This would be really usefull for us too. Would it be problem to add it ?

yang-xiaodong commented 3 weeks ago

@mviegas What do you think about this feature?

mviegas commented 3 weeks ago

It makes sense to me. I always envisioned such a feature. It provides more flexibility in the pub/sub configuration in similar ways to other libraries, such as MassTransit.

A couple of things that come to my mind are:

KuchaD commented 3 weeks ago

@mviegas Thanks for the quick response.

c.UseAzureServiceBus(asb =>
{
    asb.ConnectionString = ...
           asb.ConfigureCustomGroupConsumer("test", cfg =>
        {
            cfg.UseTopic("entity-created");
            cfg.UseConnectionString("external connection string");
            cfg.UseDefaultOptions(); // Use default options from default consumer
        });

        asb.ConfigureCustomGroupConsumer("test2", cfg =>
        {
            cfg.UseTopic("entity-deleted");
            //Set custom options for this consumer
            cfg.Configuration(c =>
            {
                c.EnableSessions = true;
            });
        });
});

I hope that is what did you expected

mviegas commented 3 weeks ago

Thanks for your prompt reply @KuchaD. I think I might have misread what your suggestion was. So we're not going deep on a type level, but rather on a topic (group) level. I have to think a bit more just to make sure we deliver a coherent and long-lived API for this, if you could give me a couple of days to reflect on it before a final decision it'd be great.

Meanwhile, I will start the code review in the PR for what's currently there, it's a great kickstart.

mviegas commented 3 weeks ago

Another question that comes to my mind: are you proposing to support multiple topics within a single namespace? Or multiple topics in multiple namespaces? If the latter is the answer, what is the motivation behind it?

KuchaD commented 3 weeks ago

@mviegas I already processed review comments.

We have realy lots of microservice and where microservisea has topic for single type message. motivation is that we i suppost (I didnt in star of application ) is we see in portal who subscibe whitch type of message without we need to check every subcriber filter and when somebody didnt process messages correcly we know were is problem in first look.

Example:

Service A with namespace NA Producete Message MA1 to topic A-namaspace Producete Message MA2 to topic A-namaspace Consumer Message MC1 from NC

Service B with namespace NB Producete Message MB1 to topic B1 Producete Message MB2 to topic B2 Consumer Message MA1 from NA Message MC1 from NC

Service C with namespace NC Producete Message MC1 to topic C1 Producete Message MC2 to topic C2 Consumer Message MB2 from NB

mviegas commented 3 weeks ago

Hi @KuchaD! Thanks for your feedback.

After some more thorough thoughts, I have no issues with the approach of producing/consuming multiple topics within the same namespace. However supporting multiple namespaces would mean supporting multiple broker addresses, which is something that goes against current CAP contracts and would also introduce a lot of background work not only in the transport layer but also in the framework itself to ensure proper connection management (multiple topologies, resiliency, etc.). cc @yang-xiaodong

That way, I think that we can move forward with the approach of adding custom topic consumers for the same namespace, but not for multiple namespaces. In practice, this would mean that every ConsumerDescriptor as proposed in #1572 should only have Entity-related properties (like EnableSessions, message TTL), not Namespace-related properties (like Connection String/Credentials).

KuchaD commented 3 weeks ago

@mviegas Hi I understand your problems because you use BrokerAdress in ConsumerRegister. But if you use TokenCredential dont you need BrokerAdress ? What do you think when we could supported multiple namespace if we use TokenCredential ? I just quick checked imlementation. Maybe i wrong.

mviegas commented 3 weeks ago

@KuchaD you're right that there's a bug in the BrokerAddress setter for the ASB transport. It should be settled as either the ConnectionString or the Namespace when the authentication is done via TokenCredential. Nowadays it doesn't consider the latter.

That said, the rule for connecting to a single namespace is still valid. We should keep it that way to keep the library standards with other transports as well.

KuchaD commented 3 weeks ago

@mviegas thx for explained. Do you think that in future you will support multiple azure service bus or (namespacing) as Mass Transit. For company were i am working is it really necessary for using cap in our project. Now we have one project with cap and we really satify with them but this is big blocker for us for migrate from mass transint to cap

KuchaD commented 3 weeks ago

I am open to contribute with this situation if you are open to help me make solution and figure out how to implement this to cap.

For start i fix solution for one namespace :)

thx

mviegas commented 2 weeks ago

@KuchaD, yes I'm aware that MassTransit allows you to configure multiple buses with some adjustments using .NET DI, where each bus has its own configuration, including the broker address/namespace. In CAP, configuring multiple buses isn't currently possible. If this changes in the future, we can adapt the ASB transport to support it. But it should be something coming from the framework first.