conductor-sdk / conductor-python

Conductor OSS SDK for Python programming language
Apache License 2.0
59 stars 30 forks source link

Allow sharing task definitions amongst a worker process in order to conserve memory #219

Closed matteius closed 10 months ago

matteius commented 11 months ago

Background: When invoking a conductor worker that has many tasks, some of which have work to do infrequently, and leveraging a Django monolith to utilize the ORM, it ends up consuming a lot of memory. With this optimization we can group tasks into lists that are shared and round-robined for that worker process, thus conserving memory.

Running an example worker set of tasks on a large Django monolith today:

image

Running the same worker tasks all within one worker using this optimization:

image
matteius commented 11 months ago

I think you are right -- I've pushed a refactor commit, I think now by leveraging cached_property and clearing it at the start of run_once we can be sure the name is consistent across those operations.

matteius commented 11 months ago

Actually -- I am trying to test this with the cached_property change and not quite working yet, let me give it another take.

matteius commented 11 months ago

OK -- The latest commit I pushed, I was able to test a 13 simple task worker where 3 of the tasks I was trying to execute and they did execute successfully and within short time.

matteius commented 11 months ago

Results might be accurate now but we still will end up referencing wrong task_definition_name for logging and metrics. See __execute_task and __update_task, they have this - task_definition_name = self.worker.get_task_definition_name(). I think we might need to pass the task_definition_name from run_once and use it inside the two methods mentioned above.

I think those only get executed from within run_once? That is why I added the caching, so it remains consistent for each iteration of the loop -- but maybe I am not seeing the issue yet.

coderabhigupta commented 11 months ago

Results might be accurate now but we still will end up referencing wrong task_definition_name for logging and metrics. See __execute_task and __update_task, they have this - task_definition_name = self.worker.get_task_definition_name(). I think we might need to pass the task_definition_name from run_once and use it inside the two methods mentioned above.

I think those only get executed from within run_once? That is why I added the caching, so it remains consistent for each iteration of the loop -- but maybe I am not seeing the issue yet.

Can you turn on debug as check the logs to confirm what task_defination_name is getting logged.

matteius commented 11 months ago

@coderabhigupta It wasn't quite working as I found when writing a unit test, but I simplified how I was doing the caching and now it is working.

The logs were consistent with this change, but integration wise I am testing with a private worker and should not disclose what those logs look like.

matteius commented 11 months ago

Also fwiw I think the integration tests failed on the last Actions run because I lack some required env variables in my fork settings.

matteius commented 10 months ago

@coderabhigupta Thanks for your approval -- is anything else needed from me to see this change make it into a release?

coderabhigupta commented 10 months ago

@coderabhigupta Thanks for your approval -- is anything else needed from me to see this change make it into a release?

@matteius I will merge and push this as part of our next release which should happen sometime next week.