ovh / celery-dyrygent

Celery extension which allows to orchestrate 100/1000/10000 tasks combined into a complex workflow
Other
97 stars 16 forks source link

on_error callbacks don't fire in task chains #20

Open kriberg opened 2 years ago

kriberg commented 2 years ago

I'm having some issues with triggering of the on_error task callbacks from chains. I've created a test project here: https://github.com/kriberg/dyrygent-test

This defines three tasks:

@app.task
def normal_task():
    log.info("normal task")
    time.sleep(2)

@app.task(throws=(Exception,))
def failing_task():
    log.info("failing task")
    time.sleep(2)
    raise Exception("failure")

@app.task
def callback(msg, *args, **kwargs):
    log.error(f"error called: {msg} {args} {kwargs}")

These are put into a chain:

chain1 = chain(normal_task.si(), normal_task.si(), failing_task.si())
chain1.on_error(callback.si(f"Leaf chain 1 failed"))

Calling this with celery-dyrygent:

wf = Workflow()
wf.set_retry_policy("random", 1, 3)
wf.add_celery_canvas(chain1)
result = wf.apply_async(options={"link_error": callback.si("wf error")})

This produces the following log:

[2021-11-08 12:55:24,163: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:24,168: INFO/ForkPoolWorker-8] Scheduling execution of task 15b38035-8e7b-4857-be5c-9c6e44f4f438 with options {'link_error': {'task': 'test.celery.callback', 'args': ['wf error'], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': True}}
[2021-11-08 12:55:24,180: INFO/ForkPoolWorker-8] Tick done, took 0.015532ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:24,180: INFO/MainProcess] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] received
[2021-11-08 12:55:24,181: INFO/ForkPoolWorker-1] normal task
[2021-11-08 12:55:24,183: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:24,184: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:26,197: INFO/ForkPoolWorker-1] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] succeeded in 2.0154468250002537s: None
[2021-11-08 12:55:26,201: INFO/ForkPoolWorker-8] Task 15b38035-8e7b-4857-be5c-9c6e44f4f438 is done, success
[2021-11-08 12:55:26,204: INFO/ForkPoolWorker-8] Scheduling execution of task 504684c4-257e-495f-8419-237c7442d954 with options {'link_error': {'task': 'test.celery.callback', 'args': ['wf error'], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': True}}
[2021-11-08 12:55:26,206: INFO/ForkPoolWorker-8] Tick done, took 0.005649ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:26,207: INFO/MainProcess] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] received
[2021-11-08 12:55:26,208: INFO/ForkPoolWorker-1] normal task
[2021-11-08 12:55:26,211: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:26,212: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:28,212: INFO/ForkPoolWorker-1] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] succeeded in 2.004066970999702s: None
[2021-11-08 12:55:28,217: INFO/ForkPoolWorker-8] Task 504684c4-257e-495f-8419-237c7442d954 is done, success
[2021-11-08 12:55:28,220: INFO/ForkPoolWorker-8] Scheduling execution of task 3f02678a-c8b8-4e89-b348-4e4ef2ad2251 with options {'link_error': {'task': 'test.celery.callback', 'args': ['wf error'], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': True}}
[2021-11-08 12:55:28,222: INFO/ForkPoolWorker-8] Tick done, took 0.006836ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:28,224: INFO/MainProcess] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] received
[2021-11-08 12:55:28,225: INFO/ForkPoolWorker-1] failing task
[2021-11-08 12:55:28,227: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:28,228: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:30,230: ERROR/ForkPoolWorker-1] error called: wf error () {}
[2021-11-08 12:55:30,230: INFO/ForkPoolWorker-1] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] raised expected: Exception('failure')
[2021-11-08 12:55:30,233: INFO/ForkPoolWorker-8] Tick done, took 0.000878ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:30,237: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:30,238: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 1s
[2021-11-08 12:55:32,425: INFO/ForkPoolWorker-8] Tick done, took 0.001262ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:32,431: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:32,433: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:34,458: INFO/ForkPoolWorker-8] Tick done, took 0.000845ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:34,462: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:34,463: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 1s
[2021-11-08 12:55:36,429: INFO/ForkPoolWorker-8] Task 3f02678a-c8b8-4e89-b348-4e4ef2ad2251 has final state after 4 checks
[2021-11-08 12:55:36,429: INFO/ForkPoolWorker-8] Task 3f02678a-c8b8-4e89-b348-4e4ef2ad2251 is done, failure, result '<class 'Exception'>(failure)'
[2021-11-08 12:55:36,429: INFO/ForkPoolWorker-8] Tick done, took 0.000660ms, workflow finished after 7 ticks
[2021-11-08 12:55:36,430: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] succeeded in 0.0014809360000072047s: None

Here we see the callback linked to the overall workflow triggers as intended, but the callback set to the chain never fires.

Calling the same chain with celery apply_async:

chain1.on_error(callback.si("master error"))
chain1.apply_async()

Produces this:

[2021-11-08 12:55:46,447: INFO/MainProcess] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] received
[2021-11-08 12:55:46,449: INFO/ForkPoolWorker-8] normal task
[2021-11-08 12:55:48,455: INFO/MainProcess] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] received
[2021-11-08 12:55:48,455: INFO/ForkPoolWorker-8] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] succeeded in 2.0059917730031884s: None
[2021-11-08 12:55:48,456: INFO/ForkPoolWorker-8] normal task
[2021-11-08 12:55:50,462: INFO/ForkPoolWorker-8] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] succeeded in 2.0055490619997727s: None
[2021-11-08 12:55:50,463: INFO/MainProcess] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] received
[2021-11-08 12:55:50,465: INFO/ForkPoolWorker-8] failing task
[2021-11-08 12:55:52,470: ERROR/ForkPoolWorker-8] error called: Leaf chain 1 failed () {}
[2021-11-08 12:55:52,471: ERROR/ForkPoolWorker-8] error called: master error () {}
[2021-11-08 12:55:52,471: INFO/ForkPoolWorker-8] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] raised expected: Exception('failure')

Here both callbacks are triggered correctly. Now, as we know, celery doesn't do well with a large complex canvas, so just using celery isn't a good option.

Is this a limitation with dyrygent or is it a bug?

brabiega commented 2 years ago

Hello, Thanks for the report. This looks like a design flaw or a bug in dyrygent.

I'll take a closer look into it once I have a spare moment.

brabiega commented 2 years ago

So this issue is a limitation of current version of dyrygent.

When chain on_error is defined as follows

chain1.on_error(callback.si(f"Leaf chain 1 failed"))

Celery does not immediately attach on_error to all tasks within the chain. It does that when chain1.apply_async() is executed. Now when the chain is consumed by dyrygent

wf.add_celery_canvas(chain1)

It is immediately dismantled into primitive tasks (signatures). Since the signatures do not yet have the on_error attached the information is lost.

This could potentially be fixed by attaching on_error to each signature when add_celery_canvas is executed. However I'm not yet sure if this will always work consistently.

It should be simple when you want to have on_error on a simple chain:

chain = A-> B -> C

A, B, C - simple tasks

But when we consider more complex example:

chain = A -> B -> C

A - group
B - chain
C - chord

  A1           C1
 /  \         /  \
A-A2-B-B1-B2-C-C2-C3
 \  /
  A3

In this case doing chain.on_error would have to attach on_error to all tasks (A1, A2, A3, B1, B2, C1, C2, C3)

I think you could try to workaround this limitation by doing:

chain1 = chain(normal_task.si(), normal_task.si(), failing_task.si())
for task in chain1.tasks:
    task.on_error(callback.si(f"Leaf chain 1 failed"))
kriberg commented 2 years ago

I switched to

    chain1 = chain(
        normal_task.si(), 
        normal_task.si(), 
        failing_task.si(),
    )
    for task in chain1.tasks:
        task.on_error(callback.si("Leaf chain 1 failed"))

    wf = Workflow()
    wf.set_retry_policy("random", 1, 3)
    wf.add_celery_canvas(chain1)
    result = wf.apply_async(options={"link_error": callback.si("wf error")})
    result.get()

but it doesn't make any difference. I also tried:

    chain1 = chain(
        normal_task.si().on_error(callback.si("Leaf chain 1 failed")), 
        normal_task.si().on_error(callback.si("Leaf chain 1 failed")), 
        failing_task.si().on_error(callback.si("Leaf chain 1 failed")),
    ).on_error(callback.si("Leaf chain 1 failed"))

Still same behaviour

brabiega commented 2 years ago

I've slightly modified your last piece of code.

chain1 = chain(
    normal_task.si(), 
    normal_task.si(),
    failing_task.si(),
)   
for task in chain1.tasks:
    task.on_error(callback.si("Leaf chain 1 failed"))

wf = Workflow()
wf.set_retry_policy("random", 1, 3)
wf.add_celery_canvas(chain1)
# result = wf.apply_async(options={"link_error": callback.si("wf error")})
result = wf.apply_async()
result.get()

Now it seems to be working as desired.

This most certainly needs further investigation.