zeromq / chumak

Pure Erlang implementation of ZeroMQ Message Transport Protocol.
Mozilla Public License 2.0
197 stars 47 forks source link

Is polling implemented? #17

Closed vegabook closed 6 years ago

vegabook commented 6 years ago

I've downloaded all the examples but ACK doesn't respond with any line in the code implementing socket polling. In Python (pyzmq) for multiple sockets I can use a poller:

    # now open up all the sockets
    context = zmq.Context()
    outsub = context.socket(zmq.SUB)
    outsub.bind("tcp://" + myip + ":" + str(args.outsubport))
    outsub.setsockopt(zmq.SUBSCRIBE, b"")
    inreq = context.socket(zmq.ROUTER)  
    inreq.bind("tcp://" + myip + ":" + str(args.inreqport))
    outref = context.socket(zmq.ROUTER)  
    outref.bind("tcp://" + myip + ":" + str(args.outrefport))
    req = context.socket(zmq.ROUTER)  
    req.bind("tcp://" + myip + ":" + str(args.reqport))
    repub = context.socket(zmq.PUB)  
    repub.bind("tcp://" + myip + ":" + str(args.repubport))

    # sort out the poller
    poller = zmq.Poller() 
    poller.register(inreq, zmq.POLLIN)
    poller.register(outsub, zmq.POLLIN)
    poller.register(outref, zmq.POLLIN)
    poller.register(req, zmq.POLLIN)

    # UDP socket setup for broadcasting this server's address 
    cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)

    # housekeeping variables
    pulsecheck = datetime.utcnow() + timedelta(seconds = 1)
    alivelist = dict()
    pulsetimeout = 5

    # here must check if a master node is alread running on the network !!!
    # and only run this one if there isn't another. 

    while True: 
        polls = dict(poller.poll(1000))
        if inreq in polls:
            msg = inreq.recv_multipart()
            if msg[1] == b"pulse":           # handle pluse
                ansi("cyan", False, textout = " pulse" + "-" + msg[0].decode())
                if not msg[0] in alivelist.keys():
                    handlechange(msg[0])
                alivelist[msg[0]] = datetime.utcnow() + timedelta(seconds = pulsetimeout)
        if outsub in polls:
            msgin = outsub.recv_multipart()[0]
            repub.send(msgin) # republish
            msg = unpacker(msgin)
            if isinstance(msg, dict):
                valu = msg.get("value")
                print(".", end = "", flush = True)
            else:
                ansi("green", False, textout = msg)

        if req in polls:
            msg = req.recv_multipart()
            valmsg = validate_request(msg)
            if not valmsg[0]:
                ansi("red", True); print(valmsg[1]); ansi()
            elif len(alivelist) > 0:
                targetnode = random.choice(list(alivelist.keys()))
                inreq.send_multipart([targetnode, packer(valmsg[1])])
                ansi("blue", True, textout = "sent to " + targetnode.decode())
            else:
                ansi("red", True, textout = "NO CONNECTED NODES TO SEND REQUEST TO")
        if outref in polls:
            msg = outref.recv_multipart()
            destinataire, correlid = msg[1].split(b"/")
            req.send_multipart([destinataire, correlid, msg[2]])

Is there something similar in chumak? Using successfully from Elixir but I want non-blocking receives from a router socket to a dealer socket and I can't seem to see how I would do this using Chumak?

drozzy commented 6 years ago

Hey, Do you have some examples of how you're trying to do it in Elixir?

If not, give me some time to look at this, and I'll get back to you.

drozzy commented 6 years ago

Ok, I just remembered that no such thing is available in Chumak (because I ran into a similar issue myself before).

But it seems like you don't really need this in erlang/Elixir. You can just start multiple processes, each for a different type of socket.

It might be nice to have a general wrapper that would simulate this polling. If you manage to come up with a nice abstraction for this, let me know.

Sorry if this wasn't helpful :-(

vegabook commented 6 years ago

Okay thanks Andriy,

I have moved my socket types to fire/forget pub-sub for now, but I am also investigating a new socket per request model each in separate process. I will let you know if I find something interesting.

Thomas

On Oct 07, 2017, at 05:55 AM, Andriy Drozdyuk notifications@github.com wrote:

Ok, I just remember that no such thing is available in Chumak (because I ran into a similar issue myself before).

But it seems like you don't really need this in erlang/Elixir. You can just start multiple processes, each for a different type of socket.

It might be nice to have a general wrapper that would simulate this polling. If you manage to come up with a nice abstraction for this, let me know.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub, or mute the thread.

vegabook commented 6 years ago

Andriy, here is my Stack Overflow question which might help. I have decided to go with either pub-sub or new connection per query. I need to evaluation performance and scalability of each.

https://stackoverflow.com/questions/46600255/how-do-i-implement-non-blocking-socket-receives-with-zeromq-in-erlang-or-elixir

On Oct 07, 2017, at 05:02 AM, Andriy Drozdyuk notifications@github.com wrote:

Hey, Do you have some examples of how you're trying to do it in Elixir?

If not, give me some time to look at this, and I'll get back to you.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub, or mute the thread.

vegabook commented 6 years ago

by the way, I have another question:

How does on close a socket with chumak?

On Oct 07, 2017, at 05:55 AM, Andriy Drozdyuk notifications@github.com wrote:

Ok, I just remember that no such thing is available in Chumak (because I ran into a similar issue myself before).

But it seems like you don't really need this in erlang/Elixir. You can just start multiple processes, each for a different type of socket.

It might be nice to have a general wrapper that would simulate this polling. If you manage to come up with a nice abstraction for this, let me know.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub, or mute the thread.

drozzy commented 6 years ago

Good question!

You should be able to just kill the process that requested the socket and chumak should clean it up.

Now, I just checked the source here, and it seems it is correctly trapping the exit: https://github.com/zeromq/chumak/blob/master/src/chumak_socket.erl#L34

However, I don't know if it actually shuts down the socket (only the peer): https://github.com/zeromq/chumak/blob/master/src/chumak_socket.erl#L109

If you play with this, could you double check that after you kill the parent process, the socket is still alive? (E.g. via observer or PID alive checks)

Regardless, I've created an issue for this (#18). But you should be ok using it for a while, as the sockets are pretty cheap to create, and unless you're creating them dynamically - you won't have memory issues. In fact, you'll run out of ports way before that.

vegabook commented 6 years ago

Thanks Andriy I will experiment some more and feedback.

Thomas

On Oct 11, 2017, at 05:56 AM, Andriy Drozdyuk notifications@github.com wrote:

Good question!

You should be able to just kill the process that requested the socket and chumak should clean it up.

Now, I just checked the source here, and it seems it is correctly trapping the exit: https://github.com/zeromq/chumak/blob/master/src/chumak_socket.erl#L34

However, I don't know if it actually shuts down the socket (only the peer): https://github.com/zeromq/chumak/blob/master/src/chumak_socket.erl#L109

If you play with this, could you double check that after you kill the parent process, the socket is still alive? (E.g. via observer or PID alive checks)

Regardless, I've created an issue for this (#18). But you should be ok using it for a while, as the sockets are pretty cheap to create, and unless you're creating them dynamically - you won't have memory issues. In fact, you'll run out of ports way before that.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub, or mute the thread.

vegabook commented 6 years ago

Andriy I have updated your issue 18. 

It seems that chumak is starting processes to manage the dealer socket, but not closing them even though the process the created the socket has terminated. So we have "hanging sockets" I think?

On Oct 11, 2017, at 05:56 AM, Andriy Drozdyuk notifications@github.com wrote:

Good question! You should be able to just kill the process that requested the socket and chumak should clean it up. Now, I just checked the source here, and it seems it is correctly trapping the exit: https://github.com/zeromq/chumak/blob/master/src/chumak_socket.erl#L34 However, I don't know if it actually shuts down the socket (only the peer): https://github.com/zeromq/chumak/blob/master/src/chumak_socket.erl#L109 If you play with this, could you double check that after you kill the parent process, the socket is still alive? (E.g. via observer or PID alive checks) Regardless, I've created an issue for this (#18). But you should be ok using it for a while, as the sockets are pretty cheap to create, and unless you're creating them dynamically - you won't have memory issues. In fact, you'll run out of ports way before that. — You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub, or mute the thread.

drozzy commented 6 years ago

Thanks!

drozzy commented 6 years ago

If you don't mind, I'll close this for now. If you want, you can re-open it or file a new one.