the-wondersmith / celery-aio-pool

Celery worker pool with support for asyncio coroutines as tasks
GNU Affero General Public License v3.0
48 stars 6 forks source link

Long running tasks blocking Kombu heartbeats #11

Open Skorpyon opened 1 year ago

Skorpyon commented 1 year ago

Hello, @the-wondersmith !

I found some issue in your pool (and in celery-pool-asyncio it is same):

Long running tasks blocked pool (?) and Kombu don't sending AMQP heartbeats till task completed.

Regular heartbeats looks like:

[2023-04-20 00:23:00,575: DEBUG/MainProcess] heartbeat_tick : for connection 8a76d4c9f12f42a08bf401ce434eea51
[2023-04-20 00:23:00,576: DEBUG/MainProcess] heartbeat_tick : Prev sent/recv: None/None, now - 28/58, monotonic - 1202601.45058425, last_heartbeat_sent - 1202601.450567083, heartbeat int. - 60 for connection 8a76d4c9f12f42a08bf401ce434eea51
[2023-04-20 00:23:20,578: DEBUG/MainProcess] heartbeat_tick : for connection 8a76d4c9f12f42a08bf401ce434eea51
[2023-04-20 00:23:20,579: DEBUG/MainProcess] heartbeat_tick : Prev sent/recv: 28/58, now - 28/89, monotonic - 1202621.45346225, last_heartbeat_sent - 1202601.450567083, heartbeat int. - 60 for connection 8a76d4c9f12f42a08bf401ce434eea51
[2023-04-20 00:23:40,581: DEBUG/MainProcess] heartbeat_tick : for connection 8a76d4c9f12f42a08bf401ce434eea51
[2023-04-20 00:23:40,584: DEBUG/MainProcess] heartbeat_tick : Prev sent/recv: 28/89, now - 28/119, monotonic - 1202641.459304791, last_heartbeat_sent - 1202601.450567083, heartbeat int. - 60 for connection 8a76d4c9f12f42a08bf401ce434eea51

and it sent each 20-30s with default settings (60s timeout in RabbitMQ).

So after long task (>1min) Celery trying ACK task, fail with broken pipe and closed connection, restart connection, retrieve same task again and encircle to infinity loop )))

[2023-04-20 00:29:05,760: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...

If you want reproduce it, just run

@app.task()
async def long_task() -> None:
    import asyncio
    await asyncio.sleep(300)

PS. It block Basic.Ack also, so Celery's feature by default acks_late=False becomes True ))) Worker send Ack for task message only after task execution. Especially it annoying together with long tasks, because not acked task return to queue and after connection lost worker start it again and again.

h0rn3t commented 11 months ago

same thing