celery / billiard

Multiprocessing Pool Extensions
Other
419 stars 252 forks source link

non-deamon processes in Pool #280

Open Borda opened 5 years ago

Borda commented 5 years ago

Hello, I m wondering if there is a chance to be able to run a few processes inside PoolWorker, so far it was failing because of AssertionError: daemonic processes are not allowed to have children I found a couple of workarounds for standard multiprocessing package, but it usually fails on Windows with some PickleError message... Do you have any suggestion?

graingert commented 5 years ago

@Borda make sure all code uses billiard instead of multiprocessing

habibutsu commented 3 years ago

The same story. We use DataLoader from pytorch in clery task and wouldn't like to rewrite already working code. Did following trick:

from celery.concurrency import ALIASES
from celery.concurrency.prefork import TaskPool
from billiard import get_context

ALIASES.update({
    'nodaemon-prefork': 'celery_app:NoDaemonTaskPool'
})

class NoDaemonPool(TaskPool.Pool):

    def __init__(self, *args, **kwargs):
        ctx = kwargs.get('context') or get_context()

        class Process(ctx.Process):

            @property
            def daemon(self):
                return False

            @daemon.setter
            def daemon(self, value):
                pass

        ctx.Process = Process
        return super().__init__(*args, context=ctx, **kwargs)

class NoDaemonTaskPool(TaskPool):
    Pool = NoDaemonPool

app = Celery('tasks')

if __name__ == "__main__":
    app.worker_main([
        'celery',
        '--app=celery_app.app',
        '-Q', 'celery',
        '--concurrency=1',
        '--loglevel=info',
        '--pool=nodaemon-prefork'
    ])