nanomsg / nnpy

cffi-based Python bindings for nanomsg
MIT License
117 stars 39 forks source link

Subscribe to subprocess not receiving with multiple nnpy channels #18

Closed grodtron closed 8 years ago

grodtron commented 8 years ago

Problem

When I have two nnpy.Sockets, a REQ socket in the main thread, and a SUB socket created later in a multiprocessing.Process, the SUB socket never receives anything. This problem does not occur for:

main.py

import nnpy
import multiprocessing

cmd_socket = nnpy.Socket(nnpy.AF_SP, nnpy.REQ)
cmd_socket.bind("ipc:///tmp/nnpy-test-cmd")

cmd_socket.send("start")
assert cmd_socket.recv() == "Starting!"

def run_sub():
    sub_socket = nnpy.Socket(nnpy.AF_SP, nnpy.SUB)
    sub_socket.setsockopt(nnpy.SUB, nnpy.SUB_SUBSCRIBE, "topic:")
    sub_socket.connect("ipc:///tmp/nnpy-test-pub")

    while True:
        print "received " + repr(sub_socket.recv())

sub = multiprocessing.Process(target=run_sub)
sub.daemon = True
sub.start()
sub.join()

child.py

import nnpy
import time

cmd_socket = nnpy.Socket(nnpy.AF_SP, nnpy.REP)
cmd_socket.connect("ipc:///tmp/nnpy-test-cmd")

assert 'start' == cmd_socket.recv()

pub_socket = nnpy.Socket(nnpy.AF_SP, nnpy.PUB)
pub_socket.bind("ipc:///tmp/nnpy-test-pub")

cmd_socket.send("Starting!")

for n in range(10):
    pub_socket.send("topic:It is now " + str(time.time()) + " o'clock!")
    time.sleep(0.25)

This code works if we either remove the 'start' 'Starting!' request/response, or run run_sub in the main thread or a threading.Thread instead of in a multiprocessing.Process.

Here are test cases with more print statements that show the cases where this works and does not work.

The problem is definitely on the receiving end, as the SUB/PUB socket can be subscribed to and receives the messages in other programs.

djc commented 8 years ago

Quick guess: multiprocessing.Process uses fork(), and I think sockets tend not to survive forks unless you do special stuff. So try only creating your sockets inside/after the fork?

grodtron commented 8 years ago

Hi @djc actually the socket is only created inside/after the fork (inside the multiprocessing.Process "thread")

grodtron commented 8 years ago

Possibly this is related to the same root cause as #4 ?

djc commented 8 years ago

Yeah, you should probably talk to the nanomsg community first about what you're trying to do.

djc commented 8 years ago

Closing this, as it seems to be an upstream nanomsg issue.