Open Shiny380 opened 3 years ago
As per @Shiny380, @zero88 has started looking on it. And I was also doing some inspections and changes on the project https://github.com/NubeIO/lora-raw same issue.
Till now my inspection is:
time.sleep(<second>)
, it start executing multiple runs.pre-load=False
for Gunicorn & move that setup
on load
, it will solve that issue. As like this: https://github.com/NubeIO/lora-raw/compare/fix/misc?expand=1Working both of us on same task won't be fruitful so I will let @zero88 to work on. Or, if you wanna continue this inspection please let me know.
P.S: This task is on priority
P.S. check this one: https://github.com/NubeIO/lora-raw/pull/29
It is my result after make simple test on rubix-bacnet-server
without any change of current structure code.
As you see, MQTT client
still connect and receive message successfully, however mqtt client
keep restart constantly.
So, first assumption for causing a constant re-connection issue for mqtt
is address correctly.
However, the cause that @Shiny380 for dummy thread is not correct. Flask thread and Dummy Thread still point to same id 140669658012112
and not growing.
And more importance, it is not blocker issue, then I think you're still able to develop project
After investigate more, I realize: paho-mqtt-client
manage event-loop
itself, and it will overlap and conflict with current gevent
event loop as main event loop
in our python project.
Then I think make an adapter to paho-mqtt
with gevent
like this docs and example
Furthermore, I've not yet looked your code in detail about Event Dispatcher
or sth similar that, but might you are trying to implement eventbus system with multiple thread programming. I think you @Shiny380 should learn and use gevent.events instead of doing it by yourself. Also make whole application is seamless
For PR from @RaiBnod , I don't expect it. It makes custom thread out of server context. If server receive signal to stop/restart, it might stop gunicorn master thread but not for custom thread. And not yet test, but if run in multiple workers, it will call setup db and thread more than one time for each worker, it doesn't share thread context between each workers.
@zero88 it doesn't make custom thread out of server context, you can see we are wrapping things inside application context. If it was out of that that context, it shows failure on run-time at the db access.
Just go with the simple inspection through my PR: NubeIO/lora-raw#29 and test it, it works perfectly fine without any issue like yours in above. If you get any issue let me know and if anything better could you do that's also be appreciated.
My inspection is, as I had talked on above:
If we make pre-load=True
and do a start, it will give us that flexibility to run code after that application gets ready. So you are doing those db.create_all()
for creating table, and Threading
related work on there. But on the other end, it's starts two processes when you do time.sleep(<seconds>)
inside Threads. See the example below:
After my change:
And lastly, those consecutive two calls on that flow is causing issue. One gets the exact context value, and one gets None
.
Do you see a problem? @RaiBnod
If more than one workers, createDB
and Thread
is invoked more than one time
import os
os.environ['GEVENT_SUPPORT'] = 'True'
runfile('/data/projects/nubeio/rubix-py/rubix-lora/run.py', args=['-s', 'config.json', '--workers', '2'], wdir='/data/projects/nubeio/rubix-py/rubix-lora')
After gevent.monkey.patch_all()
, it is already made whole python thread concept to coroutine
aka eventloop
.
Then time.sleep()
is become to gevent.sleep()
, when it is invoked, it will fork worker thread to sleep, instead of main current thread. Then it will create dummy thread
I don't see any wrong with time.sleep()
in here.
{'_gevent_saved_patch_all_module_settings': {'socket': True, 'dns': True, 'time': True, 'select': True, 'thread': True, 'os': True, 'ssl': True, 'subprocess': True, 'sys': False, 'aggressive': True, 'Event': True, 'builtins': True, 'signal': True, 'queue': True, 'contextvars': True}, 'os': {'fork': <built-in function fork>, 'forkpty': <built-in function forkpty>, 'waitpid': <built-in function waitpid>, 'posix_spawn': <built-in function posix_spawn>, 'posix_spawnp': <built-in function posix_spawnp>}, '_thread': {'allocate_lock': <built-in function allocate_lock>, 'get_ident': <built-in function get_ident>, 'exit': <built-in function exit>, 'LockType': <class '_thread.lock'>, 'stack_size': <built-in function stack_size>, 'start_new_thread': <built-in function start_new_thread>, '_local': <class '_thread._local'>}, 'threading': {'local': <class '_thread._local'>, '_start_new_thread': <built-in function start_new_thread>, '_allocate_lock': <built-in function allocate_lock>, 'Lock': <built-in function allocate_lock>, '_DummyThread': <class 'threading._DummyThread'>, 'Thread': <class 'threading.Thread'>, 'Timer': <class 'threading.Timer'>, '_set_sentinel': <built-in function _set_sentinel>, 'get_ident': <built-in function get_ident>, '_CRLock': <class '_thread.RLock'>, 'Event': <class 'threading.Event'>, '_shutdown': <function _shutdown at 0x7fb68ead9b80>}, 'logging': {'_lock': <unlocked _thread.RLock object owner=0 count=0 at 0x7fb68e8e7720>}, '_threading_local': {'local': <class '_threading_local.local'>}, 'time': {'sleep': <built-in function sleep>}, 'socket': {'create_connection': <function create_connection at 0x7fb68ea7c1f0>, 'socket': <class 'socket.socket'>, 'SocketType': <class '_socket.socket'>, 'fromfd': <function fromfd at 0x7fb68ea5d3a0>, 'socketpair': <function socketpair at 0x7fb68ea77af0>, 'getaddrinfo': <function getaddrinfo at 0x7fb68ea7c3a0>, 'gethostbyname': <built-in function gethostbyname>, 'gethostbyname_ex': <built-in function gethostbyname_ex>, 'gethostbyaddr': <built-in function gethostbyaddr>, 'getnameinfo': <built-in function getnameinfo>, 'getfqdn': <function getfqdn at 0x7fb68ea77b80>}, 'select': {'select': <built-in function select>, 'poll': <built-in function poll>, 'epoll': <class 'select.epoll'>}, 'selectors': {'DefaultSelector': <class 'selectors.EpollSelector'>, 'EpollSelector': <class 'selectors.EpollSelector'>}, 'ssl': {'SSLContext': <class 'ssl.SSLContext'>, 'SSLSocket': <class 'ssl.SSLSocket'>, 'wrap_socket': <function wrap_socket at 0x7fb68e445a60>, 'get_server_certificate': <function get_server_certificate at 0x7fb68e447430>}, 'subprocess': {'Popen': <class 'subprocess.Popen'>, 'call': <function call at 0x7fb68e8c9310>, 'check_call': <function check_call at 0x7fb68e8c93a0>, 'check_output': <function check_output at 0x7fb68e8c9430>, '_posixsubprocess': <module '_posixsubprocess' from '/usr/lib64/python3.8/lib-dynload/_posixsubprocess.cpython-38-x86_64-linux-gnu.so'>, 'run': <function run at 0x7fb68e8c94c0>, 'CompletedProcess': <class 'subprocess.CompletedProcess'>, '_use_posix_spawn': <function _use_posix_spawn at 0x7fb68e8c98b0>, '_USE_POSIX_SPAWN': True}, 'signal': {'signal': <function signal at 0x7fb68e966940>, 'getsignal': <function getsignal at 0x7fb68e9669d0>}, 'queue': {'SimpleQueue': <class '_queue.SimpleQueue'>}}
If you want to manage your threading by yourself, just not use gevent
anymore.
use worker_class
: sync
moreover, your fix is just tricky workaround to avoid starting thread in worker initialization time, but I'm not sure it works if create new thread in runtime, and time.sleep()
should be forbid??
And when I say custom thread out of server context
:
server context
is gunicorn
application context
is flask
Then spawn custom thread that is not managed by gunicorn
is bad practice, it will make thread leak
issue when threading is not well managed.If all of you want to leverage the power of gevent
to make asynchronous application, I invite you to learn from here before starting implement more complexity multiple threading.
Otherwise:
If you want to manage your threading by yourself, just don't use gevent
anymore.
use worker_class: sync
Even though we use woker_class: sync
it shows same issue on multiple workers.
So let come into the conclusion:
Issue: It has issue when we have multiple workers.
When we do multiple workers turned on our system, all our executable codes gets forked into multiples.
In our case, Background tasks are also gets forked into workers numbers. And on our background task we are establishing a connection with MQTT. Since we have multiple workers: it will try to establish multiple connections on the same program. And it shows the issue as:
1609991993: Client lora-raw-mqtt already connected, closing old connection.
1609991993: New client connected from 172.17.0.1 as lora-raw-mqtt (p2, c1, k60).
1609991994: New connection from 172.17.0.1 on port 1883.
1609991994: Client lora-raw-mqtt already connected, closing old connection.
1609991994: New client connected from 172.17.0.1 as lora-raw-mqtt (p2, c1, k60).
1609991995: New connection from 172.17.0.1 on port 1883.
1609991995: Client lora-raw-mqtt already connected, closing old connection.
So, I was running with only one worker. Believing that, we could do some sort solution later on for establishing MQTT connection
on from same run.
Do you have any solution for handling this? If you have please do it, otherwise we will proceed with 1
worker atm and later we will dig out on it bit.
Sorry, you're trying your solution with sync
@RaiBnod
Just use latest code, with my tweak some configuration and follow my setup in idea
in here
It is my result: https://github.com/NubeIO/rubix-bacnet-server/pull/75
Note
Btw, if mqtt-broker
kick out mqtt-client
in simple case, it is due to same mqtt-client
name. So please lookup into your code, and ensure only one mqtt client
is initialized, and take care with your threading issue
Conclusion @RaiBnod @Shiny380
As I said above, repeat myself:
If all of you want to leverage the power of
gevent
to make asynchronous application, I invite you to learn from here before starting implement more complexity multiple threading. Otherwise: If you want to manage your threading by yourself, just don't usegevent
anymore. useworker_class: sync
In case of still want to use gevent
, must make adapter to paho.mqtt
as repeat myself:
After investigate more, I realize:
paho-mqtt-client
manageevent-loop
itself, and it will overlap and conflict with currentgevent
event loop as mainevent loop
in our python project. Then I think make an adapter topaho-mqtt
withgevent
like this docs and example
So, to simple for your development, just make changes as I did in https://github.com/NubeIO/rubix-bacnet-server/pull/75
gevent monkey patching seems to be breaking threads. This can be seen in
mqtt_client_base.py
https://github.com/NubeIO/rubix-point-server/blob/master/src/services/mqtt_client/mqtt_client_base.py#L32This line somehow splits the thread into two separate threads. One thread the normal
FlaskThread
and onegevent.threading._DummyThread
. Seems to be linked to accessingthreading.currentThread()
or similar.This is causing a constant reconnection issue for mqtt and probably other issues. It stops happening if
monkey.patch_all()
(https://github.com/NubeIO/rubix-point-server/blob/master/src/server.py#L14) is removed.Debug code from
mqtt_client_base.py
:Output:
Prevent threading being patched with
curious_george.patch_all(thread=False)
:But then the event dispatching across threads breaks in a weird way where it appears to function correctly but the event never gets added to the event queue. The event queue object is the exact same object (address) across the different threads but the loop in the target thread never seems to receive the event. seen in this line https://github.com/NubeIO/rubix-point-server/blob/master/src/services/event_service_base.py#L70 where it functions correctly on the correct object address but just disappears. can be seen by calling this function https://github.com/NubeIO/rubix-point-server/blob/master/src/source_drivers/modbus/resources/point/point_singular.py#L88 from a Flask http request
/api/modbus/poll/point
Some links: