zeromq / netmq

A 100% native C# implementation of ZeroMQ for .NET
Other
2.95k stars 744 forks source link

Sending Multiple Messages using NetMQPoller #983

Open goforgold opened 3 years ago

goforgold commented 3 years ago

I have implemented a router-dealer strategy in my server and client application. I need to send a large set of data (up to 5 GB) from the server (router) to the client (dealer). I am trying to send this data in chunks instead of a full large message to save memory as well as to allow the client to read data in parallel.

I have defined my RouterSocket and NetMQPoller in the following manner.

_socket = new RouterSocket();
_queue = new NetMQQueue<NetMQMessage>();
_poller = new NetMQPoller() { _socket, _queue };
_queue.ReceiveReady += Queue_ReceiveReady;
_socket.ReceiveReady += SocketServer_ReceiveReady;

_socket.Bind($"tcp://127.0.0.1:8888");
_poller.RunAsync();

Definition of Queue_ReceiveReady and SocketServer_ReceiveReady looks like below:

private void Queue_ReceiveReady(object sender, NetMQQueueEventArgs<NetMQMessage> e)
{
  NetMQMessage msg = e.Queue.Dequeue();
  _socket.SendMultipartMessage(msg);
  _logger.Debug($"Called SendMultipartMessage. Framecount: {msg.FrameCount}");
}

private async void SocketServer_ReceiveReady(object sender, NetMQSocketEventArgs e)
{
  await ProcessMessage(e.Socket.ReceiveMultipartMessage());
}

I am enqueuing messages in a loop similar to below.

List<TqDataBar> chunk = new List<TqDataBar>();
long totalBarsCount = 0;
int chunkSerial = 1;

while (reader.Read())
{
  DataBar tqBar = new DataBar
  {
    Time = ((DateTimeOffset)reader.GetDateTime(0)).ToUnixTimeSeconds(),
    BarType = (BarType)reader.GetInt16(1),
    Open = reader.GetDouble(3),
    High = reader.GetDouble(4),
    Low = reader.GetDouble(5),
    Close = reader.GetDouble(6),
    Volume = reader.GetDouble(7),
  };

  totalBarsCount++;
  chunk.Add(tqBar);

  if (chunk.Count == MaxChunkSize)
  {
    MarketDataChunk marketDataChunk = new MarketDataChunk(chunk, chunkSerial++, totalBarsCount, isLast: false);
    EnqueueMessage(clientId, ResponseMessageType.DataBarCollection, marketDataChunk, isPriority);
    chunk.Clear();
  }
}

// Code to enqueue the last chunk 

EnqueueMessage is defined as below:

private void EnqueueMessage(byte clientId, ResponseMessageType messageType, object message)
{
  var msg = new NetMQMessage();
  msg.Append(new[] { clientId });
  msg.AppendEmptyFrame();
  msg.Append((int)messageType);

  var bytes = MessageParser.Serialize(message);
  msg.Append(bytes);
  int msgLength = bytes.Length;

 _logger.Debug($"ZMQ message enqueued. Message Type: {messageType}, Length: {msgLength}, Frame Count: {msg.FrameCount}");
  _queue.Enqueue(msg);
}

Expected behaviour

I expect my messages to hit Queue_ReceiveReady each time (or after a few Enqueue calls) so I can actually wire messages to the client in chunks as I am reading from my database.

Actual behaviour

Queue_ReceiveReady only starts triggering when my loop has finished. When debugging the issue, I see logs in the below sequence.

ZMQ message enqueued. ...
ZMQ message enqueued. ...
ZMQ message enqueued. ...
.... about a thousand times and then
Called SendMultipartMessage. Framecount ...
Called SendMultipartMessage. Framecount ...
Called SendMultipartMessage. Framecount ...
... same number of times (I don't miss any message)

This behaviour is not allowing me to use memory efficiently as I see that memory usage increases to up to 7 GB and then starts reducing when Queue_ReceiveReady starts trigerring.

Is this the expected behaviour of NetMQPoller? What is the fix or workaround to actually wire multiple messages in a loop to a dealer?

goforgold commented 3 years ago

I think I have found a solution by having a separate NetMQPoller for RouterSocket and NetMQQueue. Now Queue_ReceiveReady is triggered immediately as I enqueue messages in NetMQQueue.

I wish if we could document this part properly and highlight this use case.

@somdoron Could you please let us know about some technicalities behind this behaviour? I can then try raising a pull request for documentation changes.