celery / celery

Distributed Task Queue (development branch)
https://docs.celeryq.dev
Other
24.96k stars 4.68k forks source link

serializer="pickle" -> worker hang #5035

Open jedie opened 6 years ago

jedie commented 6 years ago

apply_async() with serializer="pickle" hang, but with serializer="json" it worked.

Simple example:

@shared_task(bind=True)
def sleep_task(self, sleep_time=0):
    time.sleep(sleep_time)
    return "I have wait %s sec." % sleep_time

Test code:

@pytest.mark.celery(
    result_backend="cache+memory:///", # or "rpc" -> same
    broker_url="memory://"
)
def test_no_wait(celery_worker):
    #
    # this pass:
    r = sleep_task.apply_async(
        kwargs={"sleep_time": 0.1},
        serializer="json",
    )
    assert r.get(propagate=True) == "I have wait 0.1 sec."

    #
    # this hang forever:
    r = sleep_task.apply_async(
        kwargs={"sleep_time": 0.1},
        serializer="pickle",
    )
    assert r.get(propagate=True) == "I have wait 0.1 sec."

report:

software -> celery:4.2.1 (windowlicker) kombu:4.2.1 py:3.6.3
            billiard:3.5.0.3 memory:N/A
platform -> system:Linux arch:64bit, ELF imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:memory results:cache+memory:///

worker_hijack_root_logger: False
worker_log_color: False
accept_content: {'json'}
enable_utc: True
timezone: 'UTC'
broker_url: 'memory://localhost//'
result_backend: 'cache+memory:///'
broker_heartbeat: 0

Maybe related to: https://github.com/celery/celery/issues/5013 ?

auvipy commented 5 years ago

it is still happening wit new release?

jedie commented 5 years ago

I don't known... I switched to huey ;)

tko commented 4 years ago

@auvipy I can confirm this is happening with newly created test project, python 3.7.7, celery==4.4.6 and pytest==5.4.3

Writing the sleep_time argument to a file in the beginning of the task shows the task is never called.

jiahuei commented 3 years ago

Hi there, I managed to make it work with pickle, by simply allowing the pickle content type. I feel that the documentation can be clearer on this.

app.conf.update(
    task_serializer="pickle",
    accept_content=["pickle", "json"],
    result_serializer="pickle",
)
jiahuei commented 3 years ago

@thedrow in my case, adding the line accept_content=["pickle", "json"], is key to making it work

jiahuei commented 3 years ago

Thanks!