zeromq / libzmq

ZeroMQ core engine in C++, implements ZMTP/3.1
https://www.zeromq.org
Mozilla Public License 2.0
9.63k stars 2.35k forks source link

is using push/pull <-> router a valid socket combination? #4722

Open John-Trager opened 1 month ago

John-Trager commented 1 month ago

Please use this template for reporting suspected bugs or requests for help.

Issue description

I am writing an async client and server. For my needs the client needs to be able to send and receive messages at the same time, ie have a thread for receiving the messages and processing them, and then another thread that can send messages to the server. In my case I will have multiple clients connecting to the server at the same time so that is why I went with the router on the server side.

BUT, since I can't share a dealer socket for the client(since threads to recv and send at the same time), could I use a push/pull sockets to send and recv on the client to the server router?

Environment

Minimal test code / Steps to reproduce the issue

N/A

What's the actual result? (include assertion message & call stack if applicable)

N/A

What's the expected result?

use push/pull on client and router on server. Be able to set identity for push/pull socket.

John-Trager commented 1 month ago

This discussion over on jeromq, seems to point that it is a valid option but I put together a simple example in python which seems not to work:

client:

import zmq
import threading

class ZmqClient:
    def __init__(self, server_address):
        self.context = zmq.Context()
        self.pull_socket = self.context.socket(zmq.PULL)
        self.push_socket = self.context.socket(zmq.PUSH)

        # same identity so server can send messages back to the client pull sock
        self.pull_socket.setsockopt(zmq.IDENTITY, b'client')
        self.push_socket.setsockopt(zmq.IDENTITY, b'client')

        self.pull_socket.connect(server_address)
        self.push_socket.connect(server_address)

    def start(self):
        threading.Thread(target=self.receive_messages, daemon=True).start()

    def receive_messages(self):
        while True:
            message = self.pull_socket.recv_string()
            print(f"Received message: {message}")

    def send_message(self, message):
        self.push_socket.send_string(message)

if __name__ == "__main__":
    server_address = "tcp://localhost:5555"
    client = ZmqClient(server_address)
    client.start()

    while True:
        message = input("Enter a message to send: ")
        client.send_message(message)

server:

import zmq

class Server:
    def __init__(self, address):
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.ROUTER)
        self.socket.bind(address)

    def run(self):
        while True:
            identity, _, message = self.socket.recv_multipart()
            print(f"Received message: {message} from {identity}")
            self.socket.send_multipart([identity, b"", b"Hello, " + message])

if __name__ == "__main__":
    server = Server("tcp://localhost:5555")
    server.run()

The result is the server never even receives the message sent by the client.

John-Trager commented 1 month ago

alternatively I can use an agent model: a pair socket and poller to get the desired results without push/pull but still much less clean then just integrating with push/pull <-> router.

client code (a little long):

import zmq
import threading

class ThreadedDealerClient:
    def __init__(self, server_address):
        self.server_address = server_address
        self.dealer_id = b"client"
        self.context = zmq.Context()

        # create pair socket for thread comms
        # sender sends messages to forwarder which 
        # which then uses dealer to send to server
        self.sender = self.context.socket(zmq.PAIR)
        self.sender.bind("inproc://sender")

    def start(self):
        threading.Thread(target=self.agent, daemon=True).start()

    def agent(self):
        # create dealer for comms with server
        dealer = self.context.socket(zmq.DEALER)
        dealer.setsockopt(zmq.IDENTITY, self.dealer_id)
        dealer.connect(self.server_address)

        # receiver pair for message from main thread
        forwarder = self.context.socket(zmq.PAIR)
        forwarder.connect("inproc://sender")

        poller = zmq.Poller()
        poller.register(dealer, zmq.POLLIN)
        poller.register(forwarder, zmq.POLLIN)

        while True:
            try:
                socks = dict(poller.poll())
            except KeyboardInterrupt:
                break

            if forwarder in socks:
                message = forwarder.recv()
                # forward message to server
                print(f"Forwarding message: {message.decode("utf-8")}")
                dealer.send(message)

            if dealer in socks:
                message = dealer.recv_string()
                # process message from server
                # ... do some work ...
                print(f"Received message: {message}")

    def send_message(self, message):
        self.sender.send_string(message)

    def close(self):
        self.sender.close()
        self.context.term()

if __name__ == "__main__":
    client = ThreadedDealerClient("tcp://localhost:5555")
    client.start()
    while True:
        message = input("Enter message: ")
        client.send_message(message)
    client.close()