Open rd-marcel-haugwitz opened 3 years ago
How many PEs did you use to run the above code? Entry methods will run to completion or until the coroutine running the entry method voluntarily yields execution. If the above is running on one PE, one entry method will run forever. Here you will probably want at least 3 PEs: one for each Ox/Ax and one for the main chare. To ensure the Ox/Ax chares are placed on different PEs, you can change the chare creation to the following:
ax_proxy = Chare(Ax, onPE=1)
ox_proxy = Chare(Ox, onPE=2)
Hi ZwFink,
Hmm, I applied your suggestions and tried 3 PEs and it still does not work...
import time
from charm4py import Chare, charm, coro, Array
starttime = time.time()
class Ax(Chare):
@coro
def run(self):
while True:
print('AX - I am element', self.thisIndex, 'on PE', charm.myPe())
time.sleep(1)
class Ox(Chare):
@coro
def run(self):
while True:
print('OX - I am element', self.thisIndex, 'on PE', charm.myPe())
time.sleep(1)
def main(args):
ax_proxy = Chare(Ax, onPE=1)
ox_proxy = Chare(Ox, onPE=2)
combined_proxies = charm.combine(ax_proxy, ox_proxy)
combined_proxies.run()
charm.start(main)
In IntelliJ the output looks like this
C:\Users\a12f206\.conda\envs\py36_scos\python.exe -m charmrun.start +p3 C:/dev/IdeaProjects/sco-backend-dummy/src/charm4py_test.py
C:\Users\a12f206\.conda\envs\py36_scos\lib\site-packages\numpy\_distributor_init.py:32: UserWarning: loaded more than 1 DLL from .libs:
C:\Users\a12f206\.conda\envs\py36_scos\lib\site-packages\numpy\.libs\libopenblas.PYQHXLVVQ7VESDPUVUADXEVJOBGHJPAY.gfortran-win_amd64.dll
C:\Users\a12f206\.conda\envs\py36_scos\lib\site-packages\numpy\.libs\libopenblas.QVLO2T66WEPI7JZ63PS3HMOHFEY472BC.gfortran-win_amd64.dll
stacklevel=1)
C:\Users\a12f206\.conda\envs\py36_scos\lib\site-packages\numpy\_distributor_init.py:32: UserWarning: loaded more than 1 DLL from .libs:
C:\Users\a12f206\.conda\envs\py36_scos\lib\site-packages\numpy\.libs\libopenblas.PYQHXLVVQ7VESDPUVUADXEVJOBGHJPAY.gfortran-win_amd64.dll
C:\Users\a12f206\.conda\envs\py36_scos\lib\site-packages\numpy\.libs\libopenblas.QVLO2T66WEPI7JZ63PS3HMOHFEY472BC.gfortran-win_amd64.dll
stacklevel=1)
Charmrun> started all node programs in 1.555 seconds.
Converse/Charm++ Commit ID: v6.10.0-beta1-17-ga5b6b3259
Charm++> Disabling isomalloc because mmap() does not work.
Charm++> scheduler running in netpoll mode.
CharmLB> Load balancer assumes all CPUs are same.
Charm4py> Running Charm4py version 1.0 on Python 3.6.12 (CPython). Using 'cython' interface to access Charm++
Charm++> Running on 1 hosts (1 sockets x 2 cores x 2 PUs = 4-way SMP)
Charm++> cpu topology info is gathered in 0.001 seconds.
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
Hi @ZwFink, this issue is related to the one you helped me a few days ago #194. @rd-marcel-haugwitz and me are working on this together.
The last hint you gave me was this
class ClientMap(ArrayMap):
def procNum(self, index):
return 0
...
myMap = Group(ClientMap)
controller = Array(Controller, args=[num_clients, num_cameras_per_client], map=myMap)
controller.run_forever()
This works perfectly fine. Now we have a second Controller with totally different tasks and functions. Both Controllers have a run() function with no arguments. So we tried to combine both Controller proxies into one, just like in the example Marcel provided. Maybe this helps to get a bigger picture.
Is this bad practice? Is there a better way to achieve this?
Hi @ZwFink and @martinkoenig,
with the ClientMap it works when I am starting with at least 2 PEs :-)
import time
from charm4py import Chare, charm, coro, Array, ArrayMap, Group
starttime = time.time()
class ClientMap(ArrayMap):
pNum = -1
def __init__(self, pNum):
self.pNum = pNum
def procNum(self, index):
return self.pNum
class Ax(Chare):
@coro
def run(self):
while True:
print('AX - I am element', self.thisIndex, 'on PE', charm.myPe())
time.sleep(1)
class Ox(Chare):
@coro
def run(self):
while True:
print('OX - I am element', self.thisIndex, 'on PE', charm.myPe())
time.sleep(1)
def main(args):
my_map1 = Group(ClientMap, args=[0])
controllerAx = Array(Ax, 1, args=[], map=my_map1)
my_map2 = Group(ClientMap, args=[1])
controllerOx = Array(Ox, 1, args=[], map=my_map2)
controllerOx.run()
controllerAx.run()
charm.start(main)
Thank you both for your help!
Marcel
Hi,
the client map helped, but unfortunately we still have a problem.
In our project we are making use of Reducers and Futures. In combination with endless loops this is not working. The method Ax.runAx() never finishes...
Do you have any idea? Are we doing something wrong?
import time
from charm4py import Chare, charm, coro, Array, ArrayMap, Group, Future, Reducer
class ClientMap(ArrayMap):
pNum = -1
def __init__(self, pNum):
self.pNum = pNum
def procNum(self, index):
return self.pNum
class Wax(Chare):
def test(self, args):
print("I NEVER REACH THIS POINT :-(")
delta = args[0][0]
future = args[0][1]
future(delta + 1)
@coro
def run(self, future):
print('WAX START - I am element', self.thisIndex, 'on PE', charm.myPe())
delta = 5
self.reduce( self.thisProxy[0].test, [delta, future], Reducer.gather)
class Ax(Chare):
def __init__(self):
self.wax = Chare(Wax)
@coro
def runAx(self):
print('AX START - I am element', self.thisIndex, 'on PE', charm.myPe())
future = Future()
self.wax.run(future)
results = future.get()
print('I NEVER REACH THIS POINT :-( .... Result of wax is', results)
class Ox(Chare):
@coro
def run_endless(self):
while True:
print('OX - I am element', self.thisIndex, 'on PE', charm.myPe())
time.sleep(1)
def main(args):
my_map1 = Group(ClientMap, args=[0])
controllerAx = Array(Ax, 1, args=[], map=my_map1)
controllerAx.runAx()
my_map2 = Group(ClientMap, args=[1])
controllerOx = Array(Ox, 1, args=[], map=my_map2)
controllerOx.run_endless()
charm.start(main)
That's interesting that using a PE map solves the problem. I think there are two problems here:
print()
is changed in Charm4Py to use that of Charm++. You can read more about it here. It appears that when chares do not voluntarily yield execution the output is buffered locally in some cases. I'm not sure yet if there is a bug causing one chare's output to be seen, while the other does not. I think this issue was first a case of number 1, but then perhaps became a case of number 2. If we change the original example you provided to use charm.sleep(time)
, which, when called from a coroutine, suspends the chare that is running for at least time
seconds, you will see the output of both chares regardless of whether they are put on a single PE or not:
import time
from charm4py import Chare, charm, coro, Array, Future
starttime = time.time()
class Ax(Chare):
def __init__(self, done_future):
self.done_future = done_future
@coro
def run(self):
iter_num = 0
while iter_num < 1000:
iter_num += 1
print(f'AX - I am element with counter {iter_num} ', self.thisIndex, 'on PE', charm.myPe())
if not iter_num % 100:
charm.sleep(1)
self.done_future()
class Ox(Chare):
def __init__(self, done_future):
self.done_future = done_future
@coro
def run(self):
iter_num = 0
while iter_num < 1000:
iter_num += 1
print(f'OX - I am element with counter {iter_num} ', self.thisIndex, 'on PE', charm.myPe())
if not iter_num % 100:
charm.sleep(1)
self.done_future()
def main(args):
done_future = Future(2)
ax_proxy = Chare(Ax, onPE=1, args=[done_future])
ox_proxy = Chare(Ox, onPE=1, args=[done_future])
combined_proxies = charm.combine(ax_proxy, ox_proxy)
combined_proxies.run()
done_future.get()
charm.exit()
charm.start(main)
I also changed the code so a finite number of iterations are done, but this was just to control the amount of output put to the screen. In the output you will see that both Ax and Ox are printing. Also, if you change the above code so Ax/Ox are on separate PEs, you will see that both run at the same time but one always displays its output before the other. Also, changing the code so Ax/Ox write to a file rather than stdout shows that both run in parallel when used in different PEs.
And regarding your most recent code, it looks like reductions might currently require all PEs to participate in some form, even if the PE's chares do not contribute anything. This might be a technical limitation of Charm++, which I will look into. For now, after each loop you can call charm.sleep(0)
(or some small amount of time) for example, and this will get @rd-marcel-haugwitz 's latest example to run correctly. For example, I changed the run_endless
function to the following:
@coro
def run_endless(self):
while True:
print('OX - I am element', self.thisIndex, 'on PE', charm.myPe())
time.sleep(1)
charm.sleep(0)
And the output contains:
AX START - I am element (0,) on PE 0
WAX START - I am element (0,) on PE 0
OX - I am element (0,) on PE 1
OX - I am element (0,) on PE 1
I NEVER REACH THIS POINT :-(
I NEVER REACH THIS POINT :-( .... Result of wax is 6
OX - I am element (0,) on PE 1
OX - I am element (0,) on PE 1
It's important to remember to use charm.sleep
in a coroutine for this to work, otherwise time.sleep
is used, which will not fix this problem.
Hi @ZwFink,
Thanks for help!
The code sample I provided is just dummy code. In our real code we do not have the endless loop with time.sleep() but a stream of camera images that have to be processed. That means that charm.sleep
unfortunately won't help in our case.
(@martinkoenig)
Of course, but I'm saying that charm.sleep(0) could be used to overcome the issues with reductions outlined above at the cost of a few microseconds per camera image. You could do something like:
while True:
image_data = get_image(image_source)
image_processor.process(image_data)
charm.sleep(0)
Here the runtime won't actually sleep but will do the bookkeeping necessary to make progress on reductions happening on other PEs make progress and continue the next loop iteration. Does that make sense? It's a bit wonky but will fix that particular issue.
I should have specified earlier that charm.sleep(n)
, when called from a coroutine, is not "do nothing until n seconds have passed", but is rather "do any useful work you might have until at least n seconds have passed". Charm4Py will always check for other useful work, even when n=0
. So the charm.sleep(0)
above allows the runtime to do what it needs for the reductions to make progress on other PEs. charm.sleep
will result in sending one message, which I have done a quick measurement on the default Charm4Py build to find that each call to charm.sleep(0)
takes 25-30 microseconds.
Hi @ZwFink ,
thanks a lot for your quick help. That makes sense. And it works in the sample app I provided.
But unfortunately it is still more complicated on our side...
Our application needs some rest endpoints. What we are trying is to run a flask web sever on one PE.
The endless loop is not in our controll but somewhere deep in the flask code. We cannot simply add charm.sleep
somewhere.
I added the webserver to the sample code (Installation of flask is required):
import time
from charm4py import Chare, charm, coro, Array, ArrayMap, Group, Future, Reducer
import flask
from flask import Flask
### web server classes
##############################################
class EndpointAction(object):
def __init__(self, action):
self.action = action
def __call__(self, *args):
# Perform the action
answer = self.action()
# Create the answer (bundle it in a correctly formatted HTTP answer)
self.response = flask.Response(answer, status=200, headers={})
# Send it
return self.response
class FlaskAppWrapper(Chare):
app = None
def __init__(self, name):
self.app = Flask(name)
# Add root endpoint
self.add_endpoint(endpoint="/", endpoint_name="/", handler=self.indexAction)
# You can also add options here : "... , methods=['POST'], ... "
@coro
def run(self):
self.app.run() ##### I can't add charm.sleep here :-/
def add_endpoint(self, endpoint=None, endpoint_name=None, handler=None):
self.app.add_url_rule(endpoint, endpoint_name, EndpointAction(handler))
def indexAction(self):
# Dummy action
return "Web server is running..."
# Test it with curl 127.0.0.1:5000
### charm4py application
##############################################
class ClientMap(ArrayMap):
pNum = -1
def __init__(self, pNum):
self.pNum = pNum
def procNum(self, index):
return self.pNum
class Wax(Chare):
def test(self, args):
print("I NEVER REACH THIS POINT :-(")
delta = args[0][0]
future = args[0][1]
future(delta + 1)
@coro
def run(self, future):
print('WAX START - I am element', self.thisIndex, 'on PE', charm.myPe())
delta = 5
self.reduce( self.thisProxy[0].test, [delta, future], Reducer.gather)
class Ax(Chare):
def __init__(self):
self.wax = Chare(Wax)
@coro
def runAx(self):
print('AX START - I am element', self.thisIndex, 'on PE', charm.myPe())
future = Future()
self.wax.run(future)
results = future.get()
print('I NEVER REACH THIS POINT :-( .... Result of wax is', results)
def main(args):
my_map1 = Group(ClientMap, args=[0])
controllerAx = Array(Ax, 1, args=[], map=my_map1)
controllerAx.runAx()
my_map2 = Group(ClientMap, args=[1])
web_server = Array(FlaskAppWrapper, 1, args=["my_web_server"], map=my_map2)
print("Starting server on http://127.0.0.1:5000/")
web_server.run()
charm.start(main)
@martinkoenig
Hi, I would like to run two classes with endlessly running loops in parallel. But only one endless loop is starting.... Am I missing something? I am seeing only the output of one method in the console...