Closed asdbabil closed 1 year ago
hi @asdbabil , thank you very much for reaching out! You're completely right, I totally forgot to add a call to BasicAck :) I updated the code a bit (and even updated it to .NET7). Thanks for pointing this out!
Regarding the exception handling, the situation is a bit tricky.An exception thrown while writing to the Channel would be captured by RabbitSubscriber
. However, the problem is that blocking in the Channel consumer would reduce throughput. An option would be to update the Consumer
class, catch exceptions there and publish a new message on a different queue (dead-letter perhaps?)
About your last questions, I'd go for one channel per queue.
Hope this helps!
Hi @mizrael, thank you for the quick reply and updating the code! Yeah that makes sense to let the consumer workers handle failed messages.
Regarding one channel per queue, let me know what you think about the change below
public class RabbitSubscriber : ISubscriber, IDisposable
{
public event AsyncEventHandler<RabbitSubscriberEventArgs> OnMessage;
public RabbitSubscriber(IBusConnection connection,
RabbitSubscriberOptions options,
ILogger<RabbitSubscriber> logger)
{
_connection = connection ?? throw new ArgumentNullException(nameof(connection));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public void Start()
{
foreach (var queue in new List<string> { "queue1", "queue2" })
{
var channel = InitChannel(queue);
InitSubscription(channel, queue);
}
}
public void Dispose()
{
foreach (var channel in _channels.Values)
channel.Dispose();
_channels.Clear();
}
private readonly IBusConnection _connection;
private readonly ILogger<RabbitSubscriber> _logger;
private readonly RabbitSubscriberOptions _options;
private readonly ConcurrentDictionary<string, IModel> _channels = new ConcurrentDictionary<string, IModel>();
private IModel InitChannel(string queue)
{
_channels.TryGetValue(queue, out IModel channel);
channel?.Dispose();
_channels.TryRemove(queue, out channel);
channel = _connection.CreateChannel();
_channels.TryAdd(queue, channel);
channel.ExchangeDeclare(exchange: _options.ExchangeName, type: ExchangeType.Fanout);
channel.QueueDeclare(queue: queue,
durable: false,
exclusive: false,
autoDelete: true,
arguments: null);
channel.QueueBind(queue, _options.ExchangeName, string.Empty, null);
channel.CallbackException += (sender, ea) =>
{
var item = _channels.SingleOrDefault(x => x.Value.ChannelNumber == channel.ChannelNumber);
var newChannel = InitChannel(item.Key);
InitSubscription(newChannel, item.Key);
};
return channel;
}
private void InitSubscription(IModel channel, string queue)
{
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += OnMessageReceivedAsync;
channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
}
private async Task OnMessageReceivedAsync(object sender, BasicDeliverEventArgs eventArgs)
{
var consumer = sender as IBasicConsumer;
var channel = consumer?.Model;
Message message = null;
try
{
var body = Encoding.UTF8.GetString(eventArgs.Body.Span);
message = JsonSerializer.Deserialize<Message>(body);
await OnMessage(this, new RabbitSubscriberEventArgs(message));
channel.BasicAck(eventArgs.DeliveryTag, multiple: false);
}
catch(Exception ex)
{
var errMsg = (message is null) ? $"an error has occurred while processing a message: {ex.Message}"
: $"an error has occurred while processing message '{message.Id}': {ex.Message}";
_logger.LogError(ex, errMsg);
if (eventArgs.Redelivered)
channel.BasicReject(eventArgs.DeliveryTag, requeue: false);
else
channel.BasicNack(eventArgs.DeliveryTag, multiple: false, requeue: true);
}
}
}
Yeah I've read that especially the part that says create one RMQ channel per thread so my interpretation/understanding was to create one RMQ channel per queue. But it sounds like I can just create one RMQ channel and bind all queues to it like you've done in your code.
if the channel is used only by the background worker then it should be limited to a single thread. Should be pretty safe :)
Sounds good thank you!
Hello, first of all I just want to say thank you for this series, I like your approach. I'm trying to tweak it so that I can use some parts of it in my consumer application. I have some questions and I'm going to start off with this one:
I noticed
autoAck
is turned off butBasicAck
is never called from anywhere. I assume it was missed by accident? Since the call to eitherBasicAck
when message processing is done orBasicReject/BasicNack
in case of an exception happens in the consumer, how would you handle passing the channel reference to the consumer class? I can only think of 2 ways:await OnMessage(this, new RabbitSubscriberEventArgs(message, channel))
andSystem.ThreadingChannels.Channel<RabbitSubscriberEventArgs>
IModel
public onISubscriber
and import it through the consumer constructorAlso, I have more queues to subscribe to, so should I be creating one channel per queue or one channel for all queues?
I'd like to hear your thoughts