codemation / easyjobs

A celery like jobs framework for managing and distributing async / non-async tasks
https://pypi.org/project/easyjobs/
MIT License
35 stars 3 forks source link

Orchestrating tasks #8

Open avico78 opened 2 years ago

avico78 commented 2 years ago

first, GREAT(!) project and I believe it should deserve much more attention .

I pass through the documentation and the example but still have questions as i couldnt make it work as i expect,

so first for triggering a tasks flow without schedule it (by request) i.e lets say i have basic flow :

             task2
          / 
task1 
         \
           taske 3  - task4

task2/task4 depends on task1 task3 depends on task3 and task2

worker:


server = FastAPI()

@server.on_event('startup')
async def setup():
    worker = await EasyJobsWorker.create(
        server,
        server_secret='abcd1234',
        manager_host='0.0.0.0',
        manager_port=8222,
        manager_secret='abcd1234',
        jobs_queue='ETL',
        max_tasks_per_worker=5
    )

    @worker.task()
    async def task1 (run_before=['task2', 'task3']):
        print(f"task1  - starting")
        await asyncio.sleep(5)
        print(f"task1 - finished")
        return f"task1!"

    @worker.task()
    async def task2(run_after=['task1']):
        print(f"task1 - starting")
        await asyncio.sleep(5)
        print(f"task2 - finished")
        return f"task2!"

    @worker.task(run_after=['task1'])
    async def task3(run_after=['task1']):
        print(f"task3 - starting")
        await asyncio.sleep(5)
        print(f"task3 - finished")
        return f"task3!"

    @worker.task()
    async def task4(run_after=['task3'],run_before=['task3']):
        print(f"task4 - starting")
        await asyncio.sleep(5)
        print(f"task4 - finished")
        return f"task4"

Based on the tasks plan i described above , should both run_after and run_before required? As schedule is not set for none of the tasks , I expect that triggering task1 - will trigger the depended tasks automatically but it's not , It trigger just task1 .

04-30 20:31 EasyRpc-server /ws/jobs WARNING  worker 5fc6e54c_c8aa_11ec_bd8d_252c84f8bbc8 - pulled job {'job_id': '4d0c86ae-c8ab-11ec-bd8d-252c84f8bbc8', 'namespace': 'ETL', 'node_id': '5fc6e54c_c8aa_11ec_bd8d_252c84f8bbc8-REQ-60fcb1bc-c8aa-11ec-bd8d-252c84f8bbc8', 'status': 'reserved', 'name': 'task1', 'args': {'args': []}, 'kwargs': {'run_before': '[ "task2", "task3" ]'}, 'retry_policy': 'retry_once', 'on_failure': None, 'run_before': {'run_before': []}, 'run_after': {'run_after': []}, 'last_update': '2022-04-30T20:31:00'}
04-30 20:31 EasyRpc-server /ws/jobs WARNING  WORKER_MONITOR: working 1 / 5
task1  - starting
task1 - finished

Question regarding the pipline tasks: is it possible to pipeline data between tasks - in example , the return values of task1 will use in task2? Is there an option to reuse task in different nodes ? while providing kwargs dynamically so in I can trigger same task with different run_before and run_after ?

Suggest adding discussion tab and if it possible adding example folder that could be really helpful . great project!

codemation commented 2 years ago

@avico78 glad that you are enjoying the project so far, I wish I could update it more as well.

To accomplish what you are describing, should be doable via the following:

server = FastAPI()

@server.on_event('startup')
async def setup():
    worker = await EasyJobsWorker.create(
        server,
        server_secret='abcd1234',
        manager_host='0.0.0.0',
        manager_port=8222,
        manager_secret='abcd1234',
        jobs_queue='ETL',
        max_tasks_per_worker=5
    )

    @worker.task(run_after=['task4'])
    async def task1 ():
        print(f"task1  - starting")
        await asyncio.sleep(5)
        print(f"task1 - finished")
        return f"task1!"

    @worker.task()
    async def task2():
        print(f"task1 - starting")
        await asyncio.sleep(5)
        print(f"task2 - finished")
        return f"task2!"

    @worker.task(run_before=['task2'])
    async def task3():
        print(f"task3 - starting")
        await asyncio.sleep(5)
        print(f"task3 - finished")
        return f"task3!"

    @worker.task(run_before=['task3'])
    async def task4():
        print(f"task4 - starting")
        await asyncio.sleep(5)
        print(f"task4 - finished")
        return f"task4"

run_after - the named task is triggered once the curent task completes. run_before - the named tasks(and its current depedencies) are run before the current task completes

avico78 commented 2 years ago

@codemation - thank much for you answer ,

is both run_after and run_before providing same functionality for triggering dependencies /correct tasks order? where run_after will trigger the "from current task till" and run_before will trigger "all tasks till current one"?

for "run_after":

I tried triggering the tasks flow as you advised - it is not working , it seems each task starting the second one required at least one parameter inside it and also return statement . Not sure what is required and why but from some reason - it only work this way , Maybe there's reason for that as assumption each task must required some *args/**kwrags to preform tasks pipeline ... If you could explain more how it works and what is required .

Working example for task1--> task2

    @worker.task(run_after=['task2'],default_args=default_args)
    async def task1(*args,**kwargs):  # actually will work even without it for first task
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

    @worker.task()
    async def task2(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

for "run_before": I tried below tasks setup: task2 depends on task1 task1 depends on task0

Here I couldn't understand if it must require some parameter(ars/kwargs) as it doesn't work properly for tasks which has more than one level of dependencies , meaning:

task2--> task1--> task0

where for one level it does work for:

task2 --> task1 && task0

see code for 2 levels(not working):

    @worker.task()
    async def task0(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

    @worker.task(run_before=['task0'])
    async def task1(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

    @worker.task(run_before=['task1'] ,default_args=default_args)
    async def task2(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

    @worker.task()
    async def pipeline():
        print(f"pipline started")
        result = await task2(data={'test': 'data'})
        print(f"pipline - result is {result} - finished")
        return result

While running the run_before on one level of dependencies :

task2 -> task1 && task0

Code:

    @worker.task()
    async def task0(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

    @worker.task()
    async def task1(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

    @worker.task(run_before=['task1','task0'] ,default_args=default_args)
    async def task2(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

    @worker.task()
    async def pipeline():
        print(f"pipline started")
        result = await task2(data={'test': 'data'})
        print(f"pipline - result is {result} - finished")
        return result

it does work for 1-3 times and then it failed:

05-02 16:40 EasyRpc-server /ws/jobs WARNING  WORKER_MONITOR: working 3 / 5
pipline started
05-02 16:40 EasyRpc-server /ws/jobs WARNING  worker 44090c30_ca1d_11ec_9ae2_8d908ace95c7 - pulled job {'job_id': '766817c0-ca1d-11ec-9ae2-8d908ace95c7', 'namespace': 'ETL', 'node_id': '44090c30_ca1d_11ec_9ae2_8d908ace95c7-REQ-5ea28ca6-ca1d-11ec-9ae2-8d908ace95c7', 'status': 'reserved', 'name': 'task2', 'args': {'args': []}, 'kwargs': {'data': {'test': 'data'}}, 'retry_policy': 'retry_once', 'on_failure': None, 'run_before': {'run_before': ['task1', 'task0']}, 'run_after': {'run_after': []}, 'last_update': '2022-05-02T16:40:43'}
05-02 16:40 EasyRpc-server /ws/jobs WARNING  WORKER_MONITOR: working 4 / 5
task1 started
05-02 16:40 EasyRpc-server /ws/jobs WARNING  worker 44090c30_ca1d_11ec_9ae2_8d908ace95c7 - pulled job {'job_id': '76801348-ca1d-11ec-9ae2-8d908ace95c7', 'namespace': 'ETL', 'node_id': '44090c30_ca1d_11ec_9ae2_8d908ace95c7-REQ-5f20d41c-ca1d-11ec-9ae2-8d908ace95c7', 'status': 'reserved', 'name': 'task0', 'args': {'args': []}, 'kwargs': {}, 'retry_policy': 'retry_once', 'on_failure': None, 'run_before': {'run_before': []}, 'run_after': {'run_after': []}, 'last_update': '2022-05-02T16:40:44'}
05-02 16:40 EasyRpc-server /ws/jobs WARNING  WORKER_MONITOR: working 5 / 5
task0 started
task1 finished
task0 finished
05-02 16:40 EasyRpc-server /ws/jobs WARNING  WORKER_MONITOR: working 4 / 5
pipline started
05-02 16:41 EasyRpc-server /ws/jobs WARNING  worker 44090c30_ca1d_11ec_9ae2_8d908ace95c7 - pulled job {'job_id': '807ad806-ca1d-11ec-9ae2-8d908ace95c7', 'namespace': 'ETL', 'node_id': '44090c30_ca1d_11ec_9ae2_8d908ace95c7-REQ-779a17f6-ca1d-11ec-9ae2-8d908ace95c7', 'status': 'reserved', 'name': 'task2', 'args': {'args': []}, 'kwargs': {'data': {'test': 'data'}}, 'retry_policy': 'retry_once', 'on_failure': None, 'run_before': {'run_before': ['task1', 'task0']}, 'run_after': {'run_after': []}, 'last_update': '2022-05-02T16:41:00'}
05-02 16:41 EasyRpc-server /ws/jobs WARNING  WORKER_MONITOR: working 5 / 5
05-02 16:41 EasyRpc-server /ws/jobs ERROR    error with ws_sender
Traceback (most recent call last):
  File "/testusers/env/test/nadavp/anaconda3/lib/python3.8/site-packages/easyrpc/proxy.py", line 320, in ws_sender
    raise last_exception
easyrpc.exceptions.ServerConnectionError: (ServerConnectionError(...), 'Proxy -> Server connection error: server 0.0.0.0 - port: 8222')
05-02 16:41 EasyRpc-server /ws/jobs WARNING  started connection to server 0.0.0.0:8222
05-02 16:41 asyncio      ERROR    Task was destroyed but it is pending!
task: <Task pending name='Task-32' coro=<EasyRpcProxy.get_upstream_registered_functions() done, defined at /testusers/env/test/nadavp/anaconda3/lib/python3.8/site-packages/easyrpc/proxy.py:224> wait_for=<Future cancelled>>

Also sometimes below error show - couldn't understand why:


Traceback (most recent call last):
  File "/testusers/env/test/nadavp/anaconda3/lib/python3.8/site-packages/easyrpc/proxy.py", line 320, in ws_sender
    raise last_exception
easyrpc.exceptions.ServerConnectionError: (ServerConnectionError(...), 'Proxy -> Server connection error: server 0.0.0.0 - port: 8222')

Suggestion to add:

1.Add an endpoint for getting the tasks workflow tree (even json view) - this could help visually see the dependencies

2.for reloading changes im running the apis as:

#manager: 
python -m uvicorn --host 0.0.0.0 --port 8222 job_manager:server --reload

#workder
python -m uvicorn --host 0.0.0.0 --port 8221 job_worker:server --workers=1 --reload

but seem very slow to reload all changes, maybe adding the option to reset & reload would really help - or can suggest any other alternative ?

3.it could be really interesting if it could be more generic solution for orchestrating tasks , meaning task can dynamically configure for both run_before/run_after and also for what functionality , so user can build up his ETL flow more dynamically with better usability .

I really think u came up with great idea and hopefully continue developing this.