operand / agency

A fast and minimal framework for building agent-integrated systems
https://createwith.agency
MIT License
412 stars 21 forks source link

Improve multiprocessing and concurrency #33

Closed operand closed 1 year ago

operand commented 1 year ago

This issue is to discuss and track work on concurrency related improvements. v1.4.0 just recently added multiprocessing support, but there are more improvements that could be made.


Another multiprocessing improvement to explore would be to allow parallel processing of messages by individual agents. Currently they must complete processing of a message one at a time. See https://github.com/operand/agency/issues/95 for an example implementation from @wwj718.

hidaris commented 1 year ago

I noticed that the topic of actions being currently executed in a blocking manner was discussed in #95. To avoid disrupting your context, I'm noting down my thoughts here.

If we build a perception-action feedback loop in the agent through actions, a situation might occur where the agent, while executing a 'move' action, hears or observes other events from the environment and may stop its current movement. However, in the current blocking scenario, such a case may not be easy to construct.

This case might not be very typical, and blocking has its uses in many situations. Perhaps we could refer to the approach of pyee to provide both blocking and non-blocking spaces.

operand commented 1 year ago

This is a good point. I think that makes the case for allowing both.

wwj718 commented 1 year ago

If we build a perception-action feedback loop in the agent through actions, a situation might occur where the agent, while executing a 'move' action, hears or observes other events from the environment and may stop its current movement. However, in the current blocking scenario, such a case may not be easy to construct.

@hidaris It may not be difficult to implement the perception-action feedback loop in the current synchronization mechanism. In the implementation of SyncAgent, the message processing mechanism is parallel. When the action is blocked, a new message can enter. If the new message needs to interrupt the previous blocking action (implemented as future), any running future can be obtained through self._futures, and it can be canceled.

perception-action feedback may be better suited to be implemented as an asynchronous Agent.

operand commented 1 year ago

I just merged 1.3.0 and I wanted to add here that I was able to abstract out the threading implementation to a single class.

https://github.com/operand/agency/blob/main/agency/processors/native_thread_processor.py

This should hopefully make it simple to support more multi-processing types like green threads and the multiprocessing module.

This doesn't address the issue of serial "one-at-a-time" processing on the agent, but it could be the start of how we abstract that as well.

operand commented 1 year ago

Just wanted to note here that I'm working through the update right now that will bring multiprocessing support. I think I need a ~couple~ few more days still but it's coming soon.

operand commented 1 year ago

This has been taking longer than I thought. It's still in progress and it's close but there are still some issues to work out.

There's currently an issue with the Gradio app for example, which has been tricky and forcing me to refactor some more. I just wanted to give another update since this is taking longer than expected.

operand commented 1 year ago

Alright this ended up being a larger change than I thought but I just released 1.4.0 with multiprocessing and it addresses some of what we discussed here.

In this update, I changed it so that actions are processed immediately and in parallel using threads, rather than serially (one at a time) as we discussed above.

The only issue with the implementation is that it uses threads. I didn't go so far as to spawn processes for each action (only each agent). So action processing per-agent is not yet fully parallel, only "concurrent".

Another possibility after this update is to introduce a cancel method of some kind to achieve the real-time control characteristics that was brought up.

I'm going to leave this discussion open for now incase we run into issues with this update, or want to discuss further improvements regarding concurrency.

hidaris commented 1 year ago

Awesome, I'll read the updated documentation now. 👍

operand commented 1 year ago

I'm happy to say that I think this issue can be closed and that the concurrency and multiprocessing picture is pretty good now with 1.6.

And the sub-topic that we mentioned here regarding serial vs parallel processing is essentially supported since 1.4. In that version I changed it to where incoming actions spawn a thread immediately. Though this is subject to the GIL unfortunately, the logic is there, and can be taken advantage of if an action sleep()s or uses IO. What's not supported is making this behavior optional, but I think that's okay for now.

Pre-empting or canceling an action is also possible if you implement it using some form of loop that can be stopped. So the basic building blocks for a real-time system are there I believe. When/if we decide we need more direct support for those kinds of use cases, we can work on it then.