robinrodricks / FluentStorage

A polycloud .NET cloud storage abstraction layer. Provides Blob storage (AWS S3, GCP, FTP, SFTP, Azure Blob/File/Event Hub/Data Lake) and Messaging (AWS SQS, Azure Queue/ServiceBus). Supports .NET 5+ and .NET Standard 2.0+. Pure C#.
MIT License
263 stars 33 forks source link

Azure Servicebus: Migrate to new SDK and implement new utility methods #46

Closed GiampaoloGabba closed 10 months ago

GiampaoloGabba commented 10 months ago

This PR updates the Azure ServiceBus plugin for FluentStorage with the new package Azure.Messaging.ServiceBus, which is currently supported by Microsoft.

The old plugin had some missing methods, and I'm not sure it was working with subscriptions (see issue #44 for more details).

IMessageReceiver

This new package introduces the following methods for the IMessageReceiver interface:

/// <summary>
/// Creates Azure Service Bus Receiver for topic and subscriptions
/// </summary>
public static IMessageReceiver AzureServiceBusTopicReceiver(this IMessagingFactory factory,
   string connectionString,
   string topicName,
   string subscriptionName,
   bool autocompleteMessages = false,
   ServiceBusClientOptions serviceBusClientOptions = null,
   ServiceBusProcessorOptions messageProcessorOptions = null) 
/// <summary>
/// Creates Azure Service Bus Receiver for queues
/// </summary>
public static IMessageReceiver AzureServiceBusQueueReceiver(this IMessagingFactory factory,
   string connectionString,
   string queueName,
   bool autocompleteMessages = false,
   ServiceBusClientOptions serviceBusClientOptions = null,
   ServiceBusProcessorOptions messageProcessorOptions = null)

Where ServiceBusClientOptions and ServiceBusProcessorOptions are optional parameters to fine-tune the options for the ServiceBusClient and ServiceBusProcessor:

IMessenger

While in the Wiki there are documentations for the IMessagePublisher interface, this is no more, it doesn't exist in FluentStorage (it was in the old interface in Storage.NET removed at some point).

The interface used to send messages is IMessenger but is a bit limited because it doesn't have the notions of queues, topics and subscriptions.

So, if you want to use the IMessenger interface, when passing a channel you have to use this format:

All the public methods of IMessenger that take a channel (or channel collections) in input need to follow this naming convention.

Because of this, I created a new interface IAzureServiceBusMessenger which extends IMessenger. So, if you cast your AzureServiceBusMessenger messenger to IAzureServiceBusMessenger you have access to these utility methods that target directly queues, messages, and subscriptions:

/// <summary>
/// Sends a collection of messages to the specified queue asynchronously.
/// </summary>
/// <param name="queue">The name of the queue.</param>
/// <param name="messages">The collection of messages to send.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
Task SendToQueueAsync(string queue, IEnumerable<QueueMessage> messages, CancellationToken cancellationToken = default);
/// <summary>
/// Sends a  message to the specified queue asynchronously.
/// </summary>
/// <param name="queue">The name of the queue.</param>
/// <param name="message">The collection of messages to send.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
Task SendToQueueAsync(string queue, QueueMessage message, CancellationToken cancellationToken = default);
/// <summary>
/// Sends a collection of messages to the specified topic asynchronously.
/// </summary>
/// <param name="topic">The name of the topic.</param>
/// <param name="messages">The collection of messages to send.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
Task SendToTopicAsync(string topic, IEnumerable<QueueMessage> messages, CancellationToken cancellationToken = default);
/// <summary>
/// Sends a single message to the specified topic asynchronously.
/// </summary>
/// <param name="topic">The name of the topic.</param>
/// <param name="message">The collection of messages to send.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
Task SendToTopicAsync(string topic, QueueMessage message, CancellationToken cancellationToken = default);
/// <summary>
/// Sends a collection of messages to the specified topic subscription asynchronously.
/// </summary>
/// <param name="topic">The name of the topic.</param>
/// <param name="subscription">The name of the subscription.</param>
/// <param name="messages">The collection of messages to send.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
Task SendToSubscriptionAsync(string topic, string subscription, IEnumerable<QueueMessage> messages, CancellationToken cancellationToken = default);
/// <summary>
/// Sends a single message to the specified topic subscription asynchronously.
/// </summary>
/// <param name="topic">The name of the topic.</param>
/// <param name="subscription">The name of the subscription.</param>
/// <param name="message">The collection of messages to send.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
Task SendToSubscriptionAsync(string topic, string subscription, QueueMessage message, CancellationToken cancellationToken = default);
/// <summary>
/// Create a new Queue in Azure ServiceBus
/// </summary>
/// <param name="name">Queue name</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns></returns>
Task CreateQueueAsync(string name, CancellationToken cancellationToken = default);
/// <summary>
/// Create a new Topic in Azure ServiceBus
/// </summary>
/// <param name="topic">Topic name</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns></returns>
Task CreateTopicAsync(string topic, CancellationToken cancellationToken = default);
/// <summary>
/// Create a new Subscription in Azure ServiceBus
/// </summary>
/// <param name="topic">Topic name</param>
/// <param name="subscription">Subscription name</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns></returns>
Task CreateSubScriptionAsync(string topic, string subscription, CancellationToken cancellationToken = default);
/// <summary>
/// Deletes the specified queue asynchronously.
/// </summary>
/// <param name="queue">The name of the queue.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
Task DeleteQueueAsync(string queue, CancellationToken cancellationToken = default);
/// <summary>
/// Deletes the specified topic subscription asynchronously.
/// </summary>
/// <param name="topic">The name of the topic.</param>
/// <param name="subscription">The name of the subscription.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
Task DeleteSubScriptionAsync(string topic, string subscription, CancellationToken cancellationToken = default);
/// <summary>
/// Deletes the specified topic asynchronously.
/// </summary>
/// <param name="topic">The name of the topic.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
Task DeleteTopicAsync(string topic, CancellationToken cancellationToken = default);
/// <summary>
/// Counts the number of messages in the specified queue asynchronously.
/// </summary>
/// <param name="queue">The name of the queue.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The number of messages in the queue.</returns>
Task<long> CountQueueAsync(string queue, CancellationToken cancellationToken = default);
/// <summary>
/// Counts the number of messages in the specified topic subscription asynchronously.
/// </summary>
/// <param name="topic">The name of the topic.</param>
/// <param name="subscription">The name of the subscription.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The number of messages in the topic subscription.</returns>
Task<long> CountSubScriptionAsync(string topic, string subscription, CancellationToken cancellationToken = default);
/// <summary>
/// Counts the number of messages in the specified topic asynchronously.
/// </summary>
/// <param name="topic">The name of the topic.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The number of messages in the topic.</returns>
Task<long> CountTopicAsync(string topic, CancellationToken cancellationToken = default);
robinrodricks commented 10 months ago

Is this correct?

https://github.com/robinrodricks/FluentStorage/wiki/Azure-Service-Bus

robinrodricks commented 10 months ago

Looks great btw thanks so much for this huge PR and contribution!

robinrodricks commented 10 months ago

Released!

https://www.nuget.org/packages/FluentStorage.Azure.ServiceBus/6.0.0

robinrodricks commented 10 months ago

I added your name in the package authors as your contribution was so huge.