BEagle1984 / silverback

Silverback is a simple but feature-rich message bus for .NET core (it currently supports Kafka, RabbitMQ and MQTT).
https://silverback-messaging.net
MIT License
260 stars 39 forks source link

MQTT broker connection status #176

Open aminmesbahi opened 1 year ago

aminmesbahi commented 1 year ago

The broker connection status is not working correctly when using silverback to produce/consume the MQTT message (ActiveMQ).

  1. In ProducerBackgroundService and protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    await broker.DisconnectAsync();
    _logger.LogError($"{broker.IsConnected}"); //RETURNS TRUE!!
    await broker.ConnectAsync();
    _logger.LogError($"{broker.IsConnected}"); //RETURNS TRUE!!
  2. The output:
    info: Silverback.Messaging.Broker.Mqtt.DefaultMqttNetLogger[4103]
    Information from MqttClient (MqttClient): 'Disconnected.'.
    fail: Silverback.Messaging.Broker.Mqtt.DefaultMqttNetLogger[4101]
    Error from MqttClient (MqttClient): 'Error while connecting with server.'.
    MQTTnet.Exceptions.MqttCommunicationException: Error while connecting with host 'localhost:1883'.
    ---> System.Net.Sockets.SocketException (10061): No connection could be made because the target machine actively refused it.
    at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.ThrowException(SocketError error, CancellationToken cancellationToken)
    at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.System.Threading.Tasks.Sources.IValueTaskSource.GetResult(Int16 token)
    at System.Net.Sockets.Socket.<ConnectAsync>g__WaitForConnectWithCancellation|281_0(AwaitableSocketAsyncEventArgs saea, ValueTask connectTask, CancellationToken cancellationToken)
    at MQTTnet.Implementations.CrossPlatformSocket.ConnectAsync(String host, Int32 port, CancellationToken cancellationToken)
    --- End of inner exception stack trace ---
    at MQTTnet.Implementations.CrossPlatformSocket.ConnectAsync(String host, Int32 port, CancellationToken cancellationToken)
    at MQTTnet.Implementations.MqttTcpChannel.ConnectAsync(CancellationToken cancellationToken)
    at MQTTnet.Adapter.MqttChannelAdapter.ConnectAsync(CancellationToken cancellationToken)
    at MQTTnet.Client.MqttClient.ConnectAsync(MqttClientOptions options, CancellationToken cancellationToken)

    While:

    
    private async Task ProduceMessageAsync(IPublisher publisher, int number)
    {
    try
    {
    await publisher.PublishAsync(
    new SampleMessage
    {
    Number = number
    },true);
        _logger.LogInformation($"Produced {number}");
    }
    catch(MqttCommunicationException ex)
    {
        _logger.LogError($"Failed to produce {number}, {ex.Message}"); //NEVER FIRES CATCH BLOCK
    }
}


3. `.AddSilverback().AddSingletonBrokerCallbackHandler<IMqttClientConnectedCallback>`
Not working properly, after couple of messages identifying the new status.

4. `.AddSilverback().AddSingletonBrokerCallbackHandler<IMqttClientDisconnectingCallback>`
Not working and in case of broker is not available not firing
BEagle1984 commented 1 year ago

I'm not sure I understand exactly all points. Can you maybe share the whole code to reproduce the issue?

In which case do you get the output from point 2? After calling DisconnectAsync() on the broker?

The IMqttClientConnectedCallback is not working at all or what's the behavior you are experiencing?

IMqttClientDisconnectingCallback is called when you disconnect the broker via DisconnectAsync() (or when the application is shutting down). The connection is managed internally and retried in case of failures. It would probably be more correct to invoke a callback also in this case, when the connection is dropped. What's your use case?

aminmesbahi commented 1 year ago

Firstly, thanks for your quick response :)

1: When the ActiveMQ is unavailable, I receive the log mentioned in "2". Also, in this situation, the "publisher.PublishAsync" method does not drop in the catch block.

2: The IMqttClientConnectedCallback does not work in all scenarios I've tested.

Generally speaking, I want to handle situations where ActiveMQ is not available. For example: try five times to connect and then kill the process or send a notification to the admin

Vorsortierung.zip

BEagle1984 commented 1 year ago

Which QoS level are you setting for the produced messages? You should definitely get an exception when producing with QoS >= 1.

For monitoring and alarming purpose you could try to rely on logs and health checks.

By the way, Silverback autonomously retries and reconnects, you don't have to take care of this aspect.