Closed Set4now closed 4 years ago
Edit:- am now being able to send messages with broadcast and namespace. But can't use multithreading anymore.
when i use for loop i can see messages in the UI but with multithreading pool i don't see them anymore.
Working code :-
def send_msg(event, msg):
emit(event, msg, broadcast=True, namespace='/')
def long_stage_task(i):
print ("started executing for {}".format(i))
time.sleep(5)
send_msg('step1', {"nodename":i,"status":"completed"})
time.sleep(5)
send_msg('step2', {"nodename":i,"status":"completed"})
# if running_threads[i] == False:
# raise Exception("Thread Aborted by the user")
time.sleep(5)
send_msg('step3', {"nodename":i,"status":"completed"})
time.sleep(5)
send_msg('step4', {"nodename":i,"status":"failed"})
time.sleep(5)
send_msg('step5', {"nodename":i,"status":"failed"})
time.sleep(5)
send_msg('step6', {"nodename":i,"status":"failed"})
time.sleep(5)
send_msg('step7', {"nodename":i,"status":"failed"})
time.sleep(5)
send_msg('step8', {"nodename":i,"status":"failed"})
time.sleep(5)
send_msg('step9', {"nodename":i,"status":"failed"})
print ("I am finshed with all the stpes {}".format(i))
@blueprint.route('/stageworkflow', methods=['POST'])-----> This doesn't work, it was working in the test phase when i was doing without this project directory.
def longtask():
global pool
if request.method == "POST":
node = request.form["node"]
print (node)
executor = ThreadPoolExecutor(4)
for i in node.split(','):
executor.submit(long_stage_task, i)
# @blueprint.route('/stageworkflow', methods=['POST']) ----> This works
# def longtask():
# # lock = Lock()
# global pool
# if request.method == "POST":
# node = request.form["node"]
# for i in node.split(','):
# long_stage_task(i)
Is there any way i can achieve the same with multithreading?
I was trying to capture the same from result = executor.submit(long_stage_task, i) print (result.result())
TraceBack....
This typically means that you attempted to use functionality that needed to interface with the current application object in a way. To solve this set up an application context with app.app_context(). See the
code with multithreading
def send_msg(event, msg):
emit(event, msg, broadcast=True, namespace='/')
def long_stage_task(i):
print ("started executing for {}".format(i))
time.sleep(5)
send_msg('step1', {"nodename":i,"status":"completed"})
time.sleep(5)
send_msg('step2', {"nodename":i,"status":"completed"})
# if running_threads[i] == False:
# raise Exception("Thread Aborted by the user")
time.sleep(5)
send_msg('step3', {"nodename":i,"status":"completed"})
time.sleep(5)
send_msg('step4', {"nodename":i,"status":"failed"})
time.sleep(5)
send_msg('step5', {"nodename":i,"status":"failed"})
time.sleep(5)
send_msg('step6', {"nodename":i,"status":"failed"})
time.sleep(5)
send_msg('step7', {"nodename":i,"status":"failed"})
time.sleep(5)
send_msg('step8', {"nodename":i,"status":"failed"})
time.sleep(5)
send_msg('step9', {"nodename":i,"status":"failed"})
print ("I am finshed with all the stpes {}".format(i))
@blueprint.route('/stageworkflow', methods=['POST'])
def longtask():
global pool
if request.method == "POST":
node = request.form["node"]
print (node)
executor = ThreadPoolExecutor(4)
for i in node.split(','):
result = executor.submit(long_stage_task, i)
print (result.result())
First, if you are going to use the emit()
function outside of a request context, then you need to provide all arguments, since there is no context from where to get defaults. Your code is missing the room
argument, which indicates who is the recipient(s) of the message.
Second, if you are using multithreading along with eventlet or gevent, you have to monkey patch the standard library so that it becomes compatible with those frameworks. Without monkey patching your application will block.
def send_msg(event, msg): emit(event, msg, broadcast=True, namespace='/')
am i not passing the arguments?
Also do i need rooms, since am using this mainly because of server side events...and broadcast=True serves my purpose. It's only not working when am using multithreading. if am not wrong each thread will have its own context.. so i guess am missing somewhere in this.
Sorry, but please make an effort to explain exactly what happens. You are not being clear.
Sorry, i will try to give as much info as possible
yes when am trying to use with threading pool ( native python) its not working.. The same code works fine when i use it in a standalone test module like i explained earlier.
YES i am getting "outside of request context"
@blueprint.route('/stageworkflow', methods=['POST'])
def longtask():
global pool
if request.method == "POST":
node = request.form["node"]
print (node)
executor = ThreadPoolExecutor(4)
for i in node.split(','):
result = executor.submit(long_stage_task, i)
print (result.result())
Traceback:-
Traceback (most recent call last): File "/local/home/sumdeb/workplace/IeeWebPortal/src/IeeWebPortal/manage.py", line 25, in call return self.app(environ, start_response) File "/local/home/sumdeb/bra-pkg-cache/packages/Python-Flask/Python-Flask-0.11.x.1209.0/RHEL5_64/DEV.STD.PTHREAD/build/lib/python3.6/site-packages/flask/app.py", line 1991, in wsgi_app response = self.make_response(self.handle_exception(e)) File "/local/home/sumdeb/bra-pkg-cache/packages/Python-Flask/Python-Flask-0.11.x.1209.0/RHEL5_64/DEV.STD.PTHREAD/build/lib/python3.6/site-packages/flask/app.py", line 1567, in handle_exception reraise(exc_type, exc_value, tb) File "/local/home/sumdeb/bra-pkg-cache/packages/Python-Flask/Python-Flask-0.11.x.1209.0/RHEL5_64/DEV.STD.PTHREAD/build/lib/python3.6/site-packages/flask/_compat.py", line 33, in reraise raise value File "/local/home/sumdeb/bra-pkg-cache/packages/Python-Flask/Python-Flask-0.11.x.1209.0/RHEL5_64/DEV.STD.PTHREAD/build/lib/python3.6/site-packages/flask/app.py", line 1988, in wsgi_app response = self.full_dispatch_request() File "/local/home/sumdeb/bra-pkg-cache/packages/Python-Flask/Python-Flask-0.11.x.1209.0/RHEL5_64/DEV.STD.PTHREAD/build/lib/python3.6/site-packages/flask/app.py", line 1641, in full_dispatch_request rv = self.handle_user_exception(e) File "/local/home/sumdeb/bra-pkg-cache/packages/Python-Flask/Python-Flask-0.11.x.1209.0/RHEL5_64/DEV.STD.PTHREAD/build/lib/python3.6/site-packages/flask/app.py", line 1544, in handle_user_exception reraise(exc_type, exc_value, tb) File "/local/home/sumdeb/bra-pkg-cache/packages/Python-Flask/Python-Flask-0.11.x.1209.0/RHEL5_64/DEV.STD.PTHREAD/build/lib/python3.6/site-packages/flask/_compat.py", line 33, in reraise raise value File "/local/home/sumdeb/bra-pkg-cache/packages/Python-Flask/Python-Flask-0.11.x.1209.0/RHEL5_64/DEV.STD.PTHREAD/build/lib/python3.6/site-packages/flask/app.py", line 1639, in full_dispatch_request rv = self.dispatch_request() File "/local/home/sumdeb/bra-pkg-cache/packages/Python-Flask/Python-Flask-0.11.x.1209.0/RHEL5_64/DEV.STD.PTHREAD/build/lib/python3.6/site-packages/flask/app.py", line 1625, in dispatch_request return self.view_functionsrule.endpoint File "/local/home/sumdeb/workplace/IeeWebPortal/src/IeeWebPortal/src/iee_web_portal/osUpgrade.py", line 262, in longtask print (result.result()) File "/local/home/sumdeb/bra-pkg-cache/packages/CPython36Runtime/CPython36Runtime-release.276154.0/RHEL5_64/DEV.STD.PTHREAD/build/python3.6/lib/python3.6/concurrent/futures/_base.py", line 432, in result return self.get_result() File "/local/home/sumdeb/bra-pkg-cache/packages/CPython36Runtime/CPython36Runtime-release.276154.0/RHEL5_64/DEV.STD.PTHREAD/build/python3.6/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result raise self._exception File "/local/home/sumdeb/bra-pkg-cache/packages/CPython36Runtime/CPython36Runtime-release.276154.0/RHEL5_64/DEV.STD.PTHREAD/build/python3.6/lib/python3.6/concurrent/futures/thread.py", line 56, in run result = self.fn(*self.args, **self.kwargs) File "/local/home/sumdeb/workplace/IeeWebPortal/src/IeeWebPortal/src/iee_web_portal/osUpgrade.py", line 224, in long_stage_task send_msg('step1', {"nodename":i,"status":"completed"}) File "/local/home/sumdeb/workplace/IeeWebPortal/src/IeeWebPortal/src/ieewebportal/osUpgrade.py", line 217, in send_msg emit(event, msg, broadcast=True, namespace='/') File "/local/home/sumdeb/bra-pkg-cache/packages/Python-Flask-SocketIO/Python-Flask-SocketIO-4.x.4.0/RHEL5_64/DEV.STD.PTHREAD/build/lib/python3.6/site-packages/flask_socketio/init.py", line 761, in emit socketio = flask.current_app.extensions['socketio'] File "/local/home/sumdeb/bra-pkg-cache/packages/Python-Werkzeug/Python-Werkzeug-0.11.x.206470.0/RHEL5_64/DEV.STD.PTHREAD/build/lib/python3.6/site-packages/werkzeug/local.py", line 343, in getattr__ return getattr(self._get_current_object(), name) File "/local/home/sumdeb/bra-pkg-cache/packages/Python-Werkzeug/Python-Werkzeug-0.11.x.206470.0/RHEL5_64/DEV.STD.PTHREAD/build/lib/python3.6/site-packages/werkzeug/local.py", line 302, in _get_current_object return self.__local() File "/local/home/sumdeb/bra-pkg-cache/packages/Python-Flask/Python-Flask-0.11.x.1209.0/RHEL5_64/DEV.STD.PTHREAD/build/lib/python3.6/site-packages/flask/globals.py", line 51, in _find_app raise RuntimeError(_app_ctx_err_msg) RuntimeError: Working outside of application context.
This typically means that you attempted to use functionality that needed to interface with the current application object in a way. To solve this set up an application context with app.app_context(). See the documentation for more information.
This test code works smoothly..
from flask import Flask, render_template, request, flash
from flask_socketio import SocketIO, emit, disconnect
import time
import multiprocessing
from multiprocessing import Lock
import threading
# import gevent
# from gevent.pool import Pool
# from gevent.pool import Group
# from gevent.greenlet import Greenlet
from concurrent.futures import ThreadPoolExecutor
app = Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
# socketio.on
def send_msg(event, msg):
socketio.emit(event, msg, broadcast=True)
num_of_nodes = ['node1', 'node2','node3','node4','node5', 'node6','node7','node8']
@app.route('/')
def home():
return render_template('index.html', num_of_nodes=num_of_nodes)
def long_task(i):
print "started executing for {}".format(i)
time.sleep(5)
send_msg('step1', {"nodename":i,"status":"completed"})
time.sleep(5)
send_msg('step2', {"nodename":i,"status":"completed"})
# else:
# raise("Aborted")
if running_threads[i] == False:
raise Exception("Thread Aborted by the user")
time.sleep(5)
# if run_threads[i] == True:
send_msg('step3', {"nodename":i,"status":"completed"})
# else:
# raise("Aborted")
time.sleep(5)
# if run_threads[i] == True:
send_msg('step4', {"nodename":i,"status":"failed"})
# else:
# raise("Aborted")
time.sleep(5)
# if run_threads[i] == True:
send_msg('step5', {"nodename":i,"status":"failed"})
# else:
# raise("Aborted")
print "I am finshed with all the stpes {}".format(i)
# group = Group()
# pool = Pool(4)
running_threads = {}
@app.route('/longtask', methods=['POST', 'GET'])
def longtask():
# lock = Lock()
global pool
if request.method == "POST":
node = request.form["node"]
# Code for python multithreading
# for i in node.split(','):
# p = threading.Thread(target=long_task, args=(i,))
# process_list.append(p)
# for p in process_list:
# p.start()
# for p in process_list:
# p.join()
# python gevent , kill specfic thread
# for i in node.split(','):
# # threads = [gevent.spawn(long_task, i)]
# p = gevent.spawn(long_task, i)
# process_list.append(p)
# time.sleep(5)
# process_list[0].kill()
# gevent.joinall(threads)
executor = ThreadPoolExecutor(4)
for i in node.split(','):
temp_dict = {}
temp_dict[i] = True
running_threads.update(temp_dict)
executor.submit(long_task, i)
print running_threads
abort("node1")
def abort(i):
running_threads[i] = False
if __name__ == "__main__":
#app.run()
socketio.run(app)
I guess i have reproduced the problem with the working test module.. if replace either socket.emit with only emit, remove socket.run with app.run , it's giving me context error. So i guess am doing wrong in importing something may be.. Am sorry, am an beginner in flask stuff...
Sorry, but you are still not explaining the problem properly. All this "if I change A to B and C to D" only confuses things even more.
If you get an "out of request context" error, then you are missing arguments in the emit call. If you get an "out of application context" error, then you just need to install an app context before you call emit. If you don't get any errors, but messages aren't received on the other side, then you need to get the logs that I requested before. Finally, you continue to ignore monkey patching. If you are using eventlet or gevent and don't know what monkey patching is, then likely this is your problem. Please go to eventlet's or gevent's site and read on monkey patching, as that is required for threading work with these frameworks.
AM getting application context error. RuntimeError: Working outside of application context. Can you provide me a link or doc for install a app context ?
Now i can see it's working.. I can't thank you enough, you helped me like anything.. Thank you for your patience..
or testing purpose i was using the generic flask start methods. cat app.py
and it's working as expected..
But the problem happens when am trying to use it in our stage env where we already have existing project structure. may be am unable to use or set it properly
Stage project structure :- cd myproject bin build build-tools Config configuration DEVELOPMENT.md doc etc manage.py README.md runpy setup.cfg setup.py setup.pybak src test var
all the src codes are inside src folder
cat manage.py
in src/projectname directory alarming_class.py constant.py get_variables_dualcontroller.py main.py nsm_client.py static wibutils.py carnaval_alarm.py dbconn.py get_variables.py mcm_generate.py osUpgrade.py templates config_generator.py get_dualcontroller_vlan.py init.py monitor_portal.py pycache utils.py
the init.py has all the existing routes functions. for my work i have created a blueprint (osUpgrade.py)
cat init.py
..... from .osUpgrade import blueprint as osupgrade app.register_blueprint(osupgrade, url_prefix='') ...... cat osUpgrade.py
from flask_socketio import SocketIO, emit, disconnect def send_msg(event, msg): print ("working") emit(event, msg) I have also setup the ajax javascript... Now when i send request in the browser with debugger mode..i see clients getting connected to socketio and same with flask but am unable to see those messages in the UI.
May i am not able to understand how to set this up. Any help or suggestion would be appreciated.