ChimeraPy / Engine

Distributed computing framework for Multimodal data written in Python
https://chimerapy-engine.readthedocs.io
GNU General Public License v3.0
9 stars 0 forks source link

Node creation and destruction slow. Closes #233 #263

Closed edavalosanaya closed 1 year ago

edavalosanaya commented 1 year ago

Closes #233

Objective

This PR aims to make the Node creation and destruction (along with other Node operations like P2P connections) operations yield better speed and performance. This meant using an Executor to have async methods to control subprocess start and stop. Previously we would useAsyncLoopThread as a bridge to combine sync/async APIs. However, this had the negative affect of multiple eventloops that cannot await other eventloop's tasks. This prevented the re-use of resources, like aiohttp.ClientSession. Through this refactoring, a single eventloop can be used and resources are better managed.

TODO:

Major Breaking Changes:

Before using a Manager or Worker instance, it is now required to run aserve or serve to start their services. This change was necessary to make async version of the Manager/Worker use the pre-existing eventloop and avoid resource duplication, as await cannot be used within a __init__.py.

Before

# Create Manager and Worker
manager = cpe.Manager(logdir=CWD / "runs")
manager.zeroconf()
worker = cpe.Worker(name="local", id="local")
worker.connect(host=manager.host, port=manager.port)

After

# Create default manager and desired graph
manager = cpe.Manager(logdir=CWD / "runs")
await manager.aserve() # or manager.serve() if in sync
await manager.async_zeroconf()
worker = cpe.Worker(name="local", id="local")
await worker.aserve() # or worker.serve() if in sync
await worker.async_connect(host=manager.host, port=manager.port)

Minor Breaking Changes

This aserve/serve change also affect the Node -- this should not affect external dependencies that much, but it does impact an approach to debugging the Node implementations.

Before

# Create resource
thread = cpe.AsyncLoopThread()
thread.start()
eventbus = cpe.EventBus(thread=thread)
node = cpe.Node(name="example")

# Running
node.run(blocking=False, eventbus=eventbus)

# Wait
time.sleep(0.5)
eventbus.send(Event("start")).result()

time.sleep(0.5)
eventbus.send(Event("record")).result()

time.sleep(0.5)
eventbus.send(Event("stop")).result()

time.sleep(0.5)
eventbus.send(Event("collect")).result()
node.shutdown()

After

# Create resources
eventbus = cpe.EventBus()
node = cpe.Node(name="example")

# Running
await node.arun(eventbus=eventbus)

# Wait
await eventbus.asend(Event("start"))
await asyncio.sleep(0.5)

await eventbus.asend(Event("record"))
await asyncio.sleep(0.5)

await eventbus.asend(Event("stop"))
await asyncio.sleep(0.5)

await eventbus.asend(Event("collect"))
await node.ashutdown()
edavalosanaya commented 1 year ago

Created benchmarks for node creation from the Worker's NodeHandlerService and the Manager's WorkerHandlerService. These are the average times:

Node Handler
(singe)
Create time: 0.23887269528006072
Destroy time: 1.9214153029000136

(10)
Create time: 0.5530374649000805
Destroy time: 12.352607617899775

Worker Handler
(single)
Create time: 0.9725189961399974
Destroy time: 2.345808336850014

(10)
Create time: 10.788499685900002
Destroy time: 23.04709231760007

It seems like the WorkerHandlerService is not working correct when creating multiple nodes. Need to determine why.

edavalosanaya commented 1 year ago

This is the latest benchmark results (after successful tests)

Node Handler
(10)
Create time: 0.382928
Destroy time: 0.012464

Worker Handler
(10)
Create time: 0.900446
Destroy time: 0.322753