kubemq-io / kubemq-Python

Python client for KubeMQ server https://kubemq.io
MIT License
23 stars 9 forks source link

Senders in multiple subprocesses fail #56

Open mikeawalker opened 11 months ago

mikeawalker commented 11 months ago

Having an issue with running multiple kubemq sender objects across disparate processes in python.

I am running the latest kubemq community docker container with hash 520bdca15eef with port 50000 forwarded to 5001

I worked up a minimal example to repro:

from kubemq.events import Sender

import multiprocessing

def some_sub_proc( ):
    b = Sender( "localhost:5001")
    print("Can has ping", flush=True)
    b.ping()
    print("I never get here!", flush=True)
def main( ):
    print("Starting")
    a = Sender( "localhost:5001")
    a.ping()
    p = multiprocessing.Process(target=some_sub_proc)
    p.start()
    print("Ok trying stuff", flush=True)
    p.join()
if __name__=="__main__":
    main()

This results in the following exception:

python3 ex.py 
Starting
Ok trying stuff
Can has ping
Process Process-1:
Traceback (most recent call last):
  File "/home/linuxbrew/.linuxbrew/Cellar/python@3.11/3.11.6_1/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/linuxbrew/.linuxbrew/Cellar/python@3.11/3.11.6_1/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/mike/messaging/python/ex.py", line 8, in some_sub_proc
    b.ping()
  File "/home/linuxbrew/.linuxbrew/Cellar/python@3.11/3.11.6_1/lib/python3.11/site-packages/kubemq/events/lowlevel/sender.py", line 85, in ping
    ping_result = self.get_kubemq_client().Ping(Empty())
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/linuxbrew/.linuxbrew/Cellar/python@3.11/3.11.6_1/lib/python3.11/site-packages/grpc/_channel.py", line 1161, in __call__
    return _end_unary_response_blocking(state, call, False, None)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/linuxbrew/.linuxbrew/Cellar/python@3.11/3.11.6_1/lib/python3.11/site-packages/grpc/_channel.py", line 1004, in _end_unary_response_blocking
    raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "Socket operation on non-socket"
        debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Socket operation on non-socket", grpc_status:14, created_time:"2023-12-07T16:11:13.449738652-07:00"}"
>
mikeawalker commented 11 months ago

What is strange is that the following variation works fine

from kubemq.events import Sender
import time
import multiprocessing

def some_sub_proc( ):
    b = Sender( "localhost:5001")
    print("Can has ping", flush=True)
    b.ping()
    print("I never get here!", flush=True)
    time.sleep(10)
def main( ):
    print("Starting")
    #a = Sender( "localhost:5001")
    #a.ping()
    p = multiprocessing.Process(target=some_sub_proc)
    p.start()
    p2 = multiprocessing.Process(target=some_sub_proc)
    p2.start() 
    print("Ok trying stuff", flush=True)
    p.join()
    p2.join()
if __name__=="__main__":
    main()

And results in:

python3 ex.py 
Starting
Ok trying stuff
Can has ping
Can has ping
I never get here!
I never get here!
kubemq commented 9 months ago

Hi, I cannot reproduced what you are experiencing: When i run the code you have provided, i get: Starting Ok trying stuff Can has ping I never get here!

mikeawalker commented 9 months ago

Interesting.

Let me dig into this a little more.