zeromq / netmq

A 100% native C# implementation of ZeroMQ for .NET
Other
2.95k stars 744 forks source link

Load balancing for long running tasks #946

Open Clyde256 opened 4 years ago

Clyde256 commented 4 years ago

Hello, on the server I try to distribute message processing among several workers. I use a poller object for this. However, I would expect different behavior. If I do two workers on the server and send, for example, 10 messages, then the pooler will split the messages for the two workers fairly. That is 5 and 5. If, however, the first message processing takes a long time, 4 messages will wait unnecessarily for its completion, even if worker 2 is already free (already process his 5 messages).

How to ensure that messages are delivered only to free workers?

public static readonly string _frontendAddress = "tcp://*:5559";
public static readonly string _backednAddress = "tcp://*:5560";

static void Main(string[] args)
{
    Task.Run(() =>
    {
        using var frontend = new RouterSocket();
        frontend.Bind(_frontendAddress);

        using var backend = new DealerSocket();
        backend.Bind(_backednAddress);

        frontend.ReceiveReady += (s, e) =>
        {
            var msg = e.Socket.ReceiveMultipartMessage();
            backend.SendMultipartMessage(msg);
        };

        backend.ReceiveReady += (s, e) =>
        {
            var msg = e.Socket.ReceiveMultipartMessage();
            frontend.SendMultipartMessage(msg);
        };

        using var poller = new NetMQPoller { backend, frontend };
        poller.Run();
    });

    RunWorkers(2);
}

public static void RunWorkers(int count)
{
    var workers = new List<Task>();

    for (var i = 0; i < count; i++)
    {
        var task = Task.Run(() =>
        {
            Worker();
        });

        workers.Add(task);
    }

    Task.WaitAll(workers.ToArray());
}

public static void Worker()
{
    using var worker = new ResponseSocket();
    worker.Connect(_backednAddress);

    while (true)
    {
        var msg = worker.ReceiveMultipartMessage();
        var msgText = msg.Last.ConvertToString();
        Console.WriteLine("Processing Message {0}", msgText);

        if (msgText[msgText.Length - 1] == '0')
        {
            Thread.Sleep(8000);
        }

        worker.SendFrame(msg.Last.ConvertToString());
    }
}
b2yq commented 3 years ago

Hello, i had similar problem few years ago so i have made it in a different way. I can't paste here whole code, but i can give you few advices. Just like in your implementation, Server will have two sockets:

serverFront = new RouterSocket(); and serverBack = new DealerSocket();

than i have created something in the middle, between server and worker that i called Broker with:

brokerFront = new RouterSocket(); and brokerBack= new RouterSocket();

than worker with: workerSocket = new DealerSocket();

Worker when is starting is sending "ready" message to Broker, and Broker is saving its address on ConcurencyQueue. Than when new message arrive to Broker it will redirect it to first free worker. My implementation is using static number of workers, and 1000 message buffer, when buffer is overflow, client will start getting errors. You can easily change this logic to make workers dynamicly etc.

I am sending back and forward serialized DTOs so this part you need to cover by yourself ;) because its specific for my program

Parts of Server Code: `public class Server { private void ServerBody() { try { _poller = new NetMQPoller();

        using (_worker = new Worker(...))
        using (_broker = new Broker(...))
        {
            _asynWorkerBroker.Start();
            _broker.Start();

            using (_frontSocket = new RouterSocket())
            using (_backSocket = new DealerSocket())
            {
                _frontSocket.Bind(_frontAddress);
                _backSocket.Bind(_backAddress);

                _frontSocket.ReceiveReady += OnFrontReceiveReady;
                _backSocket.ReceiveReady += OnBackReceiveReady;

                _poller.Add(_frontSocket);
                _poller.Add(_backSocket);

                _poller.Run();
            }
        }
    }
    catch (Exception e)
    {
        _logger.Fatal(e);
    }
}

private void OnBackReceiveReady(object sender, NetMQSocketEventArgs netMqSocketEventArgs)
{
    NetMQMessage message = netMqSocketEventArgs.Socket.ReceiveMultipartMessage();
    _frontSocket.SendMultipartMessage(message);
}

private void OnFrontEndSocketReceiveReady(object sender, NetMQSocketEventArgs netMqSocketEventArgs)
{
    NetMQMessage message = netMqSocketEventArgs.Socket.ReceiveMultipartMessage();
    _backSocket.SendMultipartMessage(message);
}

}`

Parts of Broker code: ` class Broker { private Thread _brokerThread; //List of free workers private readonly ConcurrentQueue _freeWorkerAddresses = new ConcurrentQueue(); // Buffer of waiting requests private readonly Queue _buffer = new Queue(1000);

    public void Start()
{
    //start main broker thread
    _brokerThread = new Thread(() => BrokerBody(_brokerFrontPoller));
    _brokerThread.Start();
}

private void BrokerBody()
{
    try
    {
        using (_brokerFrontSocket = new RouterSocket())
        using (_brokerBackSocket = new RouterSocket())
        {
            //bind to sockets
            _brokerFrontSocket.Bind(_brokerFrontAddress);
            _brokerBackSocket.Bind(_brokerBackAddress);

            //attach to event from sockets
            _brokerBackSocket.ReceiveReady += BrokerBackSocketOnReceiveReady;
            _brokerFrontSocket.ReceiveReady += BrokerFrontSocketOnReceiveReady;

            //add sockets into poller
            poller.Add(_brokerBackSocket);
            poller.Add(_brokerFrontSocket);

            //start polling socket
            poller.Run();
        }
    }
    catch (Exception ex)
    {
        _logger.Fatal(ex);
    }
}

//Runs when Worker will send a message to Broker
private void BrokerBackSocketOnReceiveReady(object sender, NetMQSocketEventArgs netMqSocketEventArgs)
{
    try
    {
        NetMQMessage message = netMqSocketEventArgs.Socket.ReceiveMultipartMessage();

        //pop worker address from message
        string workerAddress = message.Pop().ConvertToString();
        if (message.FrameCount > 1) //worker response, route it to the original client
        {
            _brokerFrontSocket.SendMultipartMessage(message);
        }
        else // worker first registration 'Ready' message
        {
            //string readyMessage = message.Pop().ConvertToString();
            //worker sended 'Ready message' so new worker has connected to broker
        }

        //check if queue of messages is empty before saving worker on list of free workers
        if (_buffer.Count > 0)
        {
            //use free worker address asap because there are requests in front socket
            GetRequestAndSendToWorker(workerAddress, _buffer.Dequeue());
        }
        else
        {
            //push worker into free worker queue,
                           //because it ended up previous work or sent 'Ready' message at startup
            _freeWorkerAddresses.Enqueue(workerAddress);
        }
    }
    catch (Exception ex)
    {
        _logger.Fatal(ex);
    }
}

//Runs when client message arrived at front socket
private void BrokerFrontSocketOnReceiveReady(object sender, NetMQSocketEventArgs netMqSocketEventArgs)
{
    try
    {
        string workerAddress;

        NetMQMessage message = netMqSocketEventArgs.Socket.ReceiveMultipartMessage();

        if (_freeWorkerAddresses.IsEmpty)
        {
            if (_buffer.Count >= 1000)
            {
                _logger.Debug(String.Format("Number of queued requests reach 1000 limit. Error will be sent to Client. 
                                                                Number of waiting requests {0}", _buffer.Count));
                GetRequestAndSendErrorToClient(message);
            }
            else
            {
                _buffer.Enqueue(message);
                _logger.Debug(String.Format("All workers are busy. Request will be queued into buffer. Number of         
                                                              waiting requests {0}", _buffer.Count)); 
            }
        }

        //try get free worker address
        if (_freeWorkerAddresses.TryDequeue(out workerAddress))
        {
            GetRequestAndSendToWorker(workerAddress, message);
        }
    }
    catch (Exception ex)
    {
        _logger.Fatal(ex);
    }
}

//Get request from client on front-socket and send to worker on back-socket
public void GetRequestAndSendToWorker(string workerAddress, NetMQMessage message)
{
    if (_numberOfWaitingRequests > 0)
    {
        _numberOfWaitingRequests--;
    }

    _logger.Debug(String.Format("Request has been received, and will be sent to worker, number of waiting requests {0}", _buffer.Count));

    message.Push(workerAddress);//add worker addres to the top of a message, like in typical ethernet encapsulation

    _brokerBackSocket.SendMultipartMessage(message);//send message to worker
}

//Get request from client on front-socket and send error to client on fron-socket
public void GetRequestAndSendErrorToClient(NetMQMessage message)
{
    //get the message envelope, so cut off, client message from addresses
    NetMQMessage envelope = new NetMQMessage();
    foreach (var frame in message)
    {
        if (frame.IsEmpty)
        {
            envelope.AppendEmptyFrame();
            break;//empty frame is a delimiter between adress part and dto part of a message
        }

        envelope.Append(frame);
    }

    //add error message to envelope
    NetMQMessage erroMessage = OperationWorkerBase.AddErrorMessage(envelope, new Exception("All workers are             busy! More than 1000 requests are waiting for handling!"), _serializer);
    _brokerFrontSocket.SendMultipartMessage(erroMessage);//send error message to client
}

} `

Parts of worker code: ` public class Worker { private readonly List _workerTasks = new List();

public void Start()
{
    var token = _cts.Token;

    for (int i = 0; i < _numberOfAsyncWorkers; i++)
    {
        int id = i;
        System.Threading.Tasks.Task workerTask = System.Threading.Tasks.Task.Factory.StartNew(
            () => WorkerBody(id,token),
            token,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);

        _workerTasks.Add(workerTask);
    }

    _workersSemaphoreStarted.Wait();
}

private void WorkerBody(int workerId, CancellationToken cancellationToken)
{
    try
    {
        using (var workerSocket = new DealerSocket())
        {
            workerSocket.Options.Identity = Encoding.Unicode.GetBytes(String.Format("Worker{0}", workerId));
            workerSocket.Connect(_workersAddress);
            workerSocket.SendFrame("Ready");//send ready to broker, to register on free workers queue

            _workersSemaphoreStarted.Signal();

            while (cancellationToken.IsCancellationRequested == false)
            {
                NetMQMessage requestMessage = new NetMQMessage();
                bool success = workerSocket.TryReceiveMultipartMessage(TimeSpan.FromSeconds(5), ref requestMessage);

                if (success == false)
                    continue;

                _logger.Debug(String.Format("Worker task {0} received message", System.Threading.Tasks.Task.CurrentId));

                //DealerSocket wont unpack message automaticly, so we need to make it by our own
                //to generate response with the same socket addresses
                NetMQMessage envelope = new NetMQMessage();
                for (int i = 0; i < requestMessage.FrameCount - 1; i++)
                {
                    envelope.Append(requestMessage[i]);
                }

                NetMQMessage message = new NetMQMessage();
                                    //serialized request dto is in the end
                message.Append(requestMessage[requestMessage.FrameCount - 1]);

                var responseMessage = HandleRequestMessage(message);

                //pack response into envelope, response is always 2-framed
                envelope.AppendEmptyFrame();;//message empty frame, because NetMQ frames with addresses and message content need to have empty frame between
                envelope.Append(responseMessage[1]);//serialized response 

                workerSocket.SendMultipartMessage(envelope);
            }
        }
    }
    catch (Exception e)
    {
        _logger.Error("IPC async worker error", e);
    }
    finally
    {
        cancellationToken.ThrowIfCancellationRequested();
    }
}

} `

stale[bot] commented 2 years ago

This issue has been automatically marked as stale because it has not had activity for 365 days. It will be closed if no further activity occurs within 56 days. Thank you for your contributions.