zeromq / netmq

A 100% native C# implementation of ZeroMQ for .NET
Other
2.94k stars 742 forks source link

Executing disk I/O operations breaks dealer-socket #998

Open nicolaspierre1990 opened 2 years ago

nicolaspierre1990 commented 2 years ago

Environment

NetMQ Version:    4.0.1.6
Operating System:  Win11 12H2 22000.434
.NET Version:  .NET6

Expected behaviour

When executing an I/O operation on the disk, I still expect that the dealer-socket is able to send the message to a router-socket

Actual behaviour

When executing an I/O operation on the disk, the dealer-socket cannot send a message to a router-socket. First I thought it was a Threading issue and converter all the code to synchronous code but to no avail.

Steps to reproduce the behaviour

I will abbreviate code samples for readability

Broker-code (router-socket)

        public MessageBroker(IStorageProvider storageProvider, MessageBrokerOptions options) {
            _logger = Log.Logger;
            _storageProvider = storageProvider;
            _options = options;

//this line breaks all the code, without it it runs like expected
            _queueDefinitions = new HashSet<QueueDefinition>(storageProvider.ConfiguredQueues.Select(tQueue => new QueueDefinition(tQueue, _options)));
        }

 public void Connect(CancellationToken cancellationToken = default) {
            if (_routerSocket == null) {
                if (CheckIfPortInUse(_options.Port)) {
                    _routerSocket = new RouterSocket();
                    _routerSocket.Options.RouterMandatory = true;
                    _routerSocket.Bind($"tcp://{_options.Host}:{_options.Port}");
                } else {
                    _routerSocket = new RouterSocket();
                    _routerSocket.Connect($"tcp://{_options.Host}:{_options.Port}");
                }

                var poller = new NetMQPoller { _routerSocket };

                _logger.Information($"{nameof(Connect)} begin Thread {Environment.CurrentManagedThreadId}");

                _routerSocket.ReceiveReady += (sender, eventArgs) => {
                    var frames = eventArgs.Socket.ReceiveMultipartMessage();
                    var queueMessage = frames.Last.ConvertToString();
                    var queueNameFrame = Encoding.UTF8.GetString(eventArgs.Socket.Options.Identity);

                    _logger.Information($"Received for queue {queueNameFrame} message => {queueMessage}");

                    var queueDefinition = _queueDefinitions.SingleOrDefault(x => x.Queue.QueueName.Equals(queueNameFrame));

                    if (queueDefinition == null) {
                        throw new ArgumentException(string.Format(Messages.QueueDefinition_Publish_QueueNotFound, queueNameFrame));
                    }

                    queueDefinition.Queue.Enqueue(JsonConvert.DeserializeObject<Message>(queueMessage));
                };

                poller.RunAsync();
                _logger.Information($"{nameof(Connect)} after poller Thread {Environment.CurrentManagedThreadId}");
            }

            _logger.Information($"{nameof(Connect)} end Thread {Environment.CurrentManagedThreadId}");
        }

Queue code (dealer-socket)

  public QueueDefinition(Queue queue, MessageBrokerOptions messageBrokerOptions) {
            _queue = queue;
            _dealerSocket = new DealerSocket();
            _dealerSocket.Options.Identity = Encoding.UTF8.GetBytes(queue.QueueName);
            _messageBrokerOptions = messageBrokerOptions;
        }

        public Queue Queue { get { return _queue; } }

        public void RunQueue() {
            _queue.RunAsync();
            _dealerSocket.Connect($"tcp://{_messageBrokerOptions.Host}:{_messageBrokerOptions.Port}");
        }

        public bool Publish<T>(T body, CancellationToken cancellationToken = default) {
            Message tMessage = MessageFactory.CreateNewMessage(body, _queue.QueueName, _messageBrokerOptions.MessageExpiry, new JsonSerializerSettings());
            return Publish(tMessage, cancellationToken);
        }

        internal bool Publish(Message tMessage, CancellationToken cancellationToken = default) {
            var retryPolicy = Policy
               .HandleResult(false)
               .WaitAndRetry(_messageBrokerOptions.RetryCount, retryAttempt => _messageBrokerOptions.RetryDelay, (result, timeSpan, retryCount, context) => {
                   _logger.Warning($"{nameof(Publish)} failed for queue {Queue.QueueName}. Waiting {timeSpan} before next retry. Retry attempt {retryCount}");

                   if (retryCount == _messageBrokerOptions.RetryCount) {
                       _logger.Error($"{nameof(Publish)} failed for queue {Queue.QueueName}.");
                   }
               });

            var result = retryPolicy.Execute(() => {
                try {
                    _logger.Information($"{nameof(Publish)} start Thread {Environment.CurrentManagedThreadId}");
                    var message = new NetMQMessage(3);

                    byte[] messagePayload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(tMessage));
                    message.Append(messagePayload);

                    _logger.Information($"Sending NetMQ Message {message}");

                    var sendComplete = _dealerSocket.TrySendMultipartMessage(message);

                    return sendComplete;

                } catch (Exception ex) {
                    _logger.Error(ex, $"{nameof(Publish)} failed with {ex.Message}");
                    return false;
                }
            });

            return result;
        }

StorageProvider

public class JsonStorageProvider : IStorageProvider
    {
        private readonly ILogger _logger;
        private readonly string _storagePath;
        private readonly string _queueStoragePath;
        private readonly JsonSerializerSettings _jsonSerializerSettings = new() {
            NullValueHandling = NullValueHandling.Ignore
        };

        private List<Queue> _configuredQueues = new();

        public List<Queue> ConfiguredQueues { get => _configuredQueues; private set => _configuredQueues = value; }

        public JsonStorageProvider(ILogger logger) {
            _storagePath = Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData);
            _queueStoragePath = Path.Combine(_storagePath, "db\\queue.tmq");

            _logger = logger;
            InitStorage();
        }

        private void InitStorage() {
            _configuredQueues = ReadQueueConfiguration();
        }

        public List<Queue> ReadQueueConfiguration() {
            var serializedQueueConfigurations = ReadJsonFile(_queueStoragePath);
            var queues = JsonConvert.DeserializeObject<List<Queue>>(serializedQueueConfigurations, _jsonSerializerSettings);

            return queues ?? new List<Queue>();
        } 

        public string ReadJsonFile(string filePath) {
            string jsonFile;

            using (StreamReader sr = new(filePath)) {
                jsonFile = sr.ReadToEnd();
                sr.Close();
            }

            return jsonFile;
}