celery / celery

Distributed Task Queue (development branch)
https://docs.celeryq.dev
Other
24.64k stars 4.65k forks source link

Worker hangs at startup when running without mingle and gossip and many messages in queue #1847

Closed sabw8217 closed 9 years ago

sabw8217 commented 10 years ago

See test case here: https://github.com/sabw8217/celery_test

If I enqueue a number of tasks, and then start a worker with --without-mingle and --without-gossip, the worker seems to hang and not actually run any of the tasks I enqueued. I'm running this on debian. On my dev box I have pretty consistently been able to cause the worker to "wake up" and start consuming tasks by issuing a 'celery inspect' command. 'celery inspect active' will return something like: -> celery@aaron-dev.localdomain: OK

Debug output from the worker looks like: [2014-02-04 13:53:47,628: DEBUG/MainProcess] | Worker: Starting Hub [2014-02-04 13:53:47,628: DEBUG/MainProcess] ^-- substep ok [2014-02-04 13:53:47,628: DEBUG/MainProcess] | Worker: Starting Pool [2014-02-04 13:53:47,633: DEBUG/MainProcess] ^-- substep ok [2014-02-04 13:53:47,634: DEBUG/MainProcess] | Worker: Starting Consumer [2014-02-04 13:53:47,635: DEBUG/MainProcess] | Consumer: Starting Connection [2014-02-04 13:53:47,644: INFO/MainProcess] Connected to amqp://guest@localhost:5672// [2014-02-04 13:53:47,645: DEBUG/MainProcess] ^-- substep ok [2014-02-04 13:53:47,645: DEBUG/MainProcess] | Consumer: Starting Events [2014-02-04 13:53:47,659: DEBUG/MainProcess] ^-- substep ok [2014-02-04 13:53:47,661: DEBUG/MainProcess] | Consumer: Starting Heart [2014-02-04 13:53:47,663: DEBUG/MainProcess] ^-- substep ok [2014-02-04 13:53:47,663: DEBUG/MainProcess] | Consumer: Starting Tasks [2014-02-04 13:53:47,666: DEBUG/MainProcess] basic.qos: prefetch_count->4 [2014-02-04 13:53:47,667: DEBUG/MainProcess] ^-- substep ok [2014-02-04 13:53:47,667: DEBUG/MainProcess] | Consumer: Starting Control [2014-02-04 13:53:47,669: DEBUG/MainProcess] ^-- substep ok [2014-02-04 13:53:47,670: DEBUG/MainProcess] | Consumer: Starting event loop [2014-02-04 13:53:47,670: WARNING/MainProcess] celery@aaron-dev.localdomain ready. [2014-02-04 13:53:47,670: DEBUG/MainProcess] | Worker: Hub.register Pool...

ionelmc commented 10 years ago

Some syscalls while this is happening:

epoll_ctl(3, EPOLL_CTL_DEL, 25, {EPOLLWRNORM|EPOLLWRBAND|EPOLLMSG|EPOLLHUP|0x460800, {u32=0, u64=20368074947428352}}) = -1 ENOENT (No such file or directory)
gettimeofday({1391710010, 439910}, NULL) = 0
clock_gettime(CLOCK_MONOTONIC, {24859, 783344100}) = 0
gettimeofday({1391710010, 440013}, NULL) = 0
gettimeofday({1391710010, 440049}, NULL) = 0
clock_gettime(CLOCK_MONOTONIC, {24859, 783456800}) = 0
wait4(6914, 0x7fff9cc56490, WNOHANG, NULL) = 0
wait4(6913, 0x7fff9cc56490, WNOHANG, NULL) = 0
wait4(6912, 0x7fff9cc56490, WNOHANG, NULL) = 0
wait4(6911, 0x7fff9cc56490, WNOHANG, NULL) = 0
wait4(6910, 0x7fff9cc56490, WNOHANG, NULL) = 0
wait4(6909, 0x7fff9cc56490, WNOHANG, NULL) = 0
gettimeofday({1391710010, 440221}, NULL) = 0
gettimeofday({1391710010, 440243}, NULL) = 0
epoll_wait(3, {}, 1023, 4999)           = 0
epoll_ctl(3, EPOLL_CTL_DEL, 5, {EPOLLWRNORM|EPOLLWRBAND|EPOLLMSG|EPOLLHUP|0x460800, {u32=0, u64=20368074947428352}}) = -1 ENOENT (No such file or directory)
epoll_ctl(3, EPOLL_CTL_DEL, 9, {EPOLLWRNORM|EPOLLWRBAND|EPOLLMSG|EPOLLHUP|0x460800, {u32=0, u64=20368074947428352}}) = -1 ENOENT (No such file or directory)
epoll_ctl(3, EPOLL_CTL_DEL, 13, {EPOLLWRNORM|EPOLLWRBAND|EPOLLMSG|EPOLLHUP|0x460800, {u32=0, u64=20368074947428352}}) = -1 ENOENT (No such file or directory)
epoll_ctl(3, EPOLL_CTL_DEL, 17, {EPOLLWRNORM|EPOLLWRBAND|EPOLLMSG|EPOLLHUP|0x460800, {u32=0, u64=20368074947428352}}) = -1 ENOENT (No such file or directory)
epoll_ctl(3, EPOLL_CTL_DEL, 21, {EPOLLWRNORM|EPOLLWRBAND|EPOLLMSG|EPOLLHUP|0x460800, {u32=0, u64=20368074947428352}}) = -1 ENOENT (No such file or directory)
epoll_ctl(3, EPOLL_CTL_DEL, 25, {EPOLLWRNORM|EPOLLWRBAND|EPOLLMSG|EPOLLHUP|0x460800, {u32=0, u64=20368074947428352}}) = -1 ENOENT (No such file or directory)
gettimeofday({1391710015, 449391}, NULL) = 0
clock_gettime(CLOCK_MONOTONIC, {24864, 792822100}) = 0
gettimeofday({1391710015, 449477}, NULL) = 0
gettimeofday({1391710015, 449508}, NULL) = 0
clock_gettime(CLOCK_MONOTONIC, {24864, 792915800}) = 0
wait4(6914, 0x7fff9cc56490, WNOHANG, NULL) = 0
wait4(6913, 0x7fff9cc56490, WNOHANG, NULL) = 0
wait4(6912, 0x7fff9cc56490, WNOHANG, NULL) = 0
wait4(6911, 0x7fff9cc56490, WNOHANG, NULL) = 0
wait4(6910, 0x7fff9cc56490, WNOHANG, NULL) = 0
wait4(6909, 0x7fff9cc56490, WNOHANG, NULL) = 0
gettimeofday({1391710015, 449715}, NULL) = 0
gettimeofday({1391710015, 449744}, NULL) = 0
epoll_wait(3, {}, 1023, 4999)           = 0
epoll_ctl(3, EPOLL_CTL_DEL, 5, {EPOLLWRNORM|EPOLLWRBAND|EPOLLMSG|EPOLLHUP|0x460800, {u32=0, u64=20368074947428352}}) = -1 ENOENT (No such file or directory)
epoll_ctl(3, EPOLL_CTL_DEL, 9, {EPOLLWRNORM|EPOLLWRBAND|EPOLLMSG|EPOLLHUP|0x460800, {u32=0, u64=20368074947428352}}) = -1 ENOENT (No such file or directory)
epoll_ctl(3, EPOLL_CTL_DEL, 13, {EPOLLWRNORM|EPOLLWRBAND|EPOLLMSG|EPOLLHUP|0x460800, {u32=0, u64=20368074947428352}}) = -1 ENOENT (No such file or directory)
epoll_ctl(3, EPOLL_CTL_DEL, 17, {EPOLLWRNORM|EPOLLWRBAND|EPOLLMSG|EPOLLHUP|0x460800, {u32=0, u64=20368074947428352}}) = -1 ENOENT (No such file or directory)
epoll_ctl(3, EPOLL_CTL_DEL, 21, {EPOLLWRNORM|EPOLLWRBAND|EPOLLMSG|EPOLLHUP|0x460800, {u32=0, u64=20368074947428352}}) = -1 ENOENT (No such file or directory)
epoll_ctl(3, EPOLL_CTL_DEL, 25, {EPOLLWRNORM|EPOLLWRBAND|EPOLLMSG|EPOLLHUP|0x460800, {u32=0, u64=20368074947428352}}) = -1 ENOENT (No such file or directory)
gettimeofday({1391710020, 458955}, NULL) = 0
clock_gettime(CLOCK_MONOTONIC, {24869, 802396600}) = 0
gettimeofday({1391710020, 459069}, NULL) = 0
gettimeofday({1391710020, 459108}, NULL) = 0
clock_gettime(CLOCK_MONOTONIC, {24869, 802520200}) = 0
wait4(6914, 0x7fff9cc56490, WNOHANG, NULL) = 0
wait4(6913, 0x7fff9cc56490, WNOHANG, NULL) = 0
wait4(6912, 0x7fff9cc56490, WNOHANG, NULL) = 0
wait4(6911, 0x7fff9cc56490, WNOHANG, NULL) = 0
wait4(6910, 0x7fff9cc56490, WNOHANG, NULL) = 0
wait4(6909, 0x7fff9cc56490, WNOHANG, NULL) = 0
gettimeofday({1391710020, 459349}, NULL) = 0
gettimeofday({1391710020, 459385}, NULL) = 0
epoll_wait(3,
ionelmc commented 10 years ago

Stacktrace

--Call--
> /usr/lib/python2.7/contextlib.py(21)__exit__()
-> def __exit__(self, type, value, traceback):
(Pdb) w
  /home/ionel/projects/.x1/bin/celery(9)<module>()
-> load_entry_point('celery==3.1.8', 'console_scripts', 'celery')()
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/celery/__main__.py(30)main()
-> main()
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/celery/bin/celery.py(80)main()
-> cmd.execute_from_commandline(argv)
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/celery/bin/celery.py(746)execute_from_commandline()
-> super(CeleryCommand, self).execute_from_commandline(argv)))
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/celery/bin/base.py(308)execute_from_commandline()
-> return self.handle_argv(self.prog_name, argv[1:])
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/celery/bin/celery.py(738)handle_argv()
-> return self.execute(command, argv)
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/celery/bin/celery.py(692)execute()
-> ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/celery/bin/worker.py(175)run_from_argv()
-> return self(*args, **options)
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/celery/bin/base.py(271)__call__()
-> ret = self.run(*args, **kwargs)
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/celery/bin/worker.py(208)run()
-> state_db=self.node_format(state_db, hostname), **kwargs
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/celery/worker/__init__.py(206)start()
-> self.blueprint.start(self)
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/celery/bootsteps.py(123)start()
-> step.start(parent)
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/celery/bootsteps.py(373)start()
-> return self.obj.start()
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/celery/worker/consumer.py(270)start()
-> blueprint.start(self)
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/celery/bootsteps.py(123)start()
-> step.start(parent)
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/celery/worker/consumer.py(786)start()
-> c.loop(*c.loop_args())
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/celery/worker/loops.py(71)asynloop()
-> next(loop)
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/kombu/async/hub.py(296)create_loop()
-> events = poll(poll_timeout)
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/kombu/utils/eventio.py(65)poll()
-> return self._poll(timeout)
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/kombu/utils/eventio.py(93)_poll()
-> return self._epoll.poll(timeout if timeout is not None else -1)
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/celery/apps/worker.py(360)rdb_handler()
-> set_trace(frame)
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/celery/contrib/rdb.py(180)set_trace()
-> return debugger().set_trace(frame)
  /home/ionel/projects/.x1/local/lib/python2.7/site-packages/celery/contrib/rdb.py(160)set_trace()
-> Pdb.set_trace(self, frame)
> /usr/lib/python2.7/contextlib.py(21)__exit__()
-> def __exit__(self, type, value, traceback):
(Pdb)
ionelmc commented 10 years ago

Nevermind.

ionelmc commented 10 years ago

Ok so I've patched kombu a bit to have some extra logging:

CELERY_RDBSIG=1 MP_LOG=1 strace -s 6500 -o STRACE .ve/bin/celery -A hang1847 worker --without-gossip --without-mingle --without-heartbeat -Q celery_test --loglevel=DEBUG -c 2
[2014-02-06 20:53:50,088: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2014-02-06 20:53:50,097: DEBUG/MainProcess] | Worker: Building graph...
[2014-02-06 20:53:50,097: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Queues (intra), Pool, Autoreloader, Beat, Autoscaler, StateDB, Consumer}
[2014-02-06 20:53:50,107: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2014-02-06 20:53:50,107: DEBUG/MainProcess] | Consumer: Building graph...
[2014-02-06 20:53:50,117: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Agent, Gossip, Heart, event loop}
[2014-02-06 20:53:50,118: WARNING/MainProcess] /home/ionel/projects/celery/celery/apps/worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.

  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

 -------------- celery@dkbox v3.1.8 (Cipater)
---- **** -----
--- * ***  * -- Linux-3.8.0-33-generic-x86_64-with-Ubuntu-12.04-precise
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x168e110
- ** ---------- .> transport:   amqp://guest@localhost:5672//
- ** ---------- .> results:     disabled
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery_test      exchange=celery_test(direct) key=celery_test

[tasks]
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . hang1847.add

[2014-02-06 20:53:50,121: DEBUG/MainProcess] | Worker: Starting Hub
[2014-02-06 20:53:50,121: DEBUG/MainProcess] ^-- substep ok
[2014-02-06 20:53:50,121: DEBUG/MainProcess] | Worker: Starting Pool
[2014-02-06 20:53:50,125: INFO/Worker-1] child process 12039 calling self.run()
[2014-02-06 20:53:50,125: DEBUG/MainProcess] ^-- substep ok
[2014-02-06 20:53:50,126: DEBUG/MainProcess] | Worker: Starting Consumer
[2014-02-06 20:53:50,127: INFO/Worker-2] child process 12040 calling self.run()
[2014-02-06 20:53:50,126: DEBUG/MainProcess] | Consumer: Starting Connection
[2014-02-06 20:53:50,137: DEBUG/MainProcess] Start from server, version: 0.9, properties: {u'information': u'Licensed under the MPL.  See http://www.rabbitmq.com/', u'product': u'RabbitMQ', u'copyright': u'Copyright (C) 2007-2013 GoPivotal, Inc.', u'capabilities': {u'exchange_exchange_bindings': True, u'connection.blocked': True, u'authentication_failure_close': True, u'basic.nack': True, u'consumer_priorities': True, u'consumer_cancel_notify': True, u'publisher_confirms': True}, u'platform': u'Erlang/OTP', u'version': u'3.2.3'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
[2014-02-06 20:53:50,138: DEBUG/MainProcess] Open OK!
[2014-02-06 20:53:50,138: INFO/MainProcess] Adding 14 (<socket._socketobject object at 0x18b1f30>) from Hub from hub.py:168:add < hub.py:220:add_reader < pyamqp.py:127:register_with_event_loop < consumer.py:378:connect
[2014-02-06 20:53:50,139: INFO/MainProcess] Connected to amqp://guest@127.0.0.1:5672//
[2014-02-06 20:53:50,139: DEBUG/MainProcess] ^-- substep ok
[2014-02-06 20:53:50,139: DEBUG/MainProcess] | Consumer: Starting Events
[2014-02-06 20:53:50,145: DEBUG/MainProcess] Start from server, version: 0.9, properties: {u'information': u'Licensed under the MPL.  See http://www.rabbitmq.com/', u'product': u'RabbitMQ', u'copyright': u'Copyright (C) 2007-2013 GoPivotal, Inc.', u'capabilities': {u'exchange_exchange_bindings': True, u'connection.blocked': True, u'authentication_failure_close': True, u'basic.nack': True, u'consumer_priorities': True, u'consumer_cancel_notify': True, u'publisher_confirms': True}, u'platform': u'Erlang/OTP', u'version': u'3.2.3'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
[2014-02-06 20:53:50,147: DEBUG/MainProcess] Open OK!
[2014-02-06 20:53:50,147: INFO/MainProcess] Adding 15 (<socket._socketobject object at 0x18c0130>) from Hub from hub.py:168:add < hub.py:220:add_reader < pyamqp.py:127:register_with_event_loop < consumer.py:378:connect
[2014-02-06 20:53:50,148: DEBUG/MainProcess] using channel_id: 1
[2014-02-06 20:53:50,149: DEBUG/MainProcess] Channel open
[2014-02-06 20:53:50,149: DEBUG/MainProcess] ^-- substep ok
[2014-02-06 20:53:50,149: DEBUG/MainProcess] | Consumer: Starting Tasks
[2014-02-06 20:53:50,153: DEBUG/MainProcess] using channel_id: 1
[2014-02-06 20:53:50,154: DEBUG/MainProcess] Channel open
[2014-02-06 20:53:50,156: DEBUG/MainProcess] basic.qos: prefetch_count->8
[2014-02-06 20:53:50,157: DEBUG/MainProcess] ^-- substep ok
[2014-02-06 20:53:50,157: DEBUG/MainProcess] | Consumer: Starting Control
[2014-02-06 20:53:50,157: DEBUG/MainProcess] using channel_id: 2
[2014-02-06 20:53:50,157: DEBUG/MainProcess] Channel open
[2014-02-06 20:53:50,159: DEBUG/MainProcess] ^-- substep ok
[2014-02-06 20:53:50,160: DEBUG/MainProcess] | Consumer: Starting event loop
[2014-02-06 20:53:50,161: WARNING/MainProcess] celery@dkbox ready.
[2014-02-06 20:53:50,161: DEBUG/MainProcess] | Worker: Hub.register Pool...
[2014-02-06 20:53:50,161: INFO/MainProcess] Adding 12 (12) from Hub from hub.py:168:add < hub.py:220:add_reader < asynpool.py:410:register_with_event_loop < prefork.py:134:register_with_event_loop
[2014-02-06 20:53:50,161: INFO/MainProcess] Adding 13 (13) from Hub from hub.py:168:add < hub.py:220:add_reader < asynpool.py:410:register_with_event_loop < prefork.py:134:register_with_event_loop
[2014-02-06 20:53:50,162: INFO/MainProcess] Adding 10 (10) from Hub from hub.py:168:add < hub.py:220:add_reader < asynpool.py:414:register_with_event_loop < prefork.py:134:register_with_event_loop
[2014-02-06 20:53:50,162: INFO/MainProcess] Adding 6 (6) from Hub from hub.py:168:add < hub.py:220:add_reader < asynpool.py:414:register_with_event_loop < prefork.py:134:register_with_event_loop
[2014-02-06 20:53:50,162: INFO/MainProcess] Adding 10 (10) from Hub from hub.py:168:add < hub.py:220:add_reader < asynpool.py:238:_recv_message < asynpool.py:263:on_result_readable
[2014-02-06 20:53:50,162: INFO/MainProcess] Adding 6 (6) from Hub from hub.py:168:add < hub.py:220:add_reader < asynpool.py:238:_recv_message < asynpool.py:263:on_result_readable
[2014-02-06 20:53:50,163: INFO/MainProcess] Removing 9 (9) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:53:50,163: INFO/MainProcess] Removing 5 (5) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:53:55,167: INFO/MainProcess] Removing 9 (9) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:53:55,167: INFO/MainProcess] Removing 5 (5) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:00,176: INFO/MainProcess] Removing 9 (9) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:00,177: INFO/MainProcess] Removing 5 (5) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:05,186: INFO/MainProcess] Removing 9 (9) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:05,186: INFO/MainProcess] Removing 5 (5) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:10,196: INFO/MainProcess] Removing 9 (9) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:10,196: INFO/MainProcess] Removing 5 (5) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:15,205: INFO/MainProcess] Removing 9 (9) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:15,205: INFO/MainProcess] Removing 5 (5) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:20,211: INFO/MainProcess] Removing 9 (9) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:20,211: INFO/MainProcess] Removing 5 (5) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:25,220: INFO/MainProcess] Removing 9 (9) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:25,220: INFO/MainProcess] Removing 5 (5) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:30,230: INFO/MainProcess] Removing 9 (9) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:30,230: INFO/MainProcess] Removing 5 (5) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:35,239: INFO/MainProcess] Removing 9 (9) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:35,239: INFO/MainProcess] Removing 5 (5) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:40,249: INFO/MainProcess] Removing 9 (9) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:40,249: INFO/MainProcess] Removing 5 (5) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:45,254: INFO/MainProcess] Removing 9 (9) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:45,255: INFO/MainProcess] Removing 5 (5) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:50,264: INFO/MainProcess] Removing 9 (9) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:50,264: INFO/MainProcess] Removing 5 (5) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:55,273: INFO/MainProcess] Removing 9 (9) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:54:55,274: INFO/MainProcess] Removing 5 (5) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:55:00,279: INFO/MainProcess] Removing 9 (9) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:55:00,279: INFO/MainProcess] Removing 5 (5) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:55:05,288: INFO/MainProcess] Removing 9 (9) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
[2014-02-06 20:55:05,289: INFO/MainProcess] Removing 5 (5) from Hub from hub.py:184:remove < asynpool.py:655:on_poll_start < hub.py:286:create_loop < loops.py:72:asynloop
ionelmc commented 10 years ago

Fds to worker processes are from 4 to 13 (inclusive). It looks none of those are added in the hub for writing purpose (thus tasks don't get run).

ionelmc commented 10 years ago

It appears that the amqp client is stalling (has data in internal buffers, but the hub doesn't know about it). I have identified the commit adding buffering in pyamqp Connection object to this commit: https://github.com/celery/py-amqp/commit/737fa589bfdea60d158ecceb8d388b0aba1a9164

ionelmc commented 10 years ago

I would appear that librabbitmq has the same stalling issue but looks harder to fix as it just reads up to the buffer's size, see https://github.com/ask/rabbitmq-c/blob/bfbe693e88f8495073f43571904cdbb817dc50ae/librabbitmq/amqp_socket.c#L211

ionelmc commented 10 years ago

@sabw8217 You can test this fix by using 'pyamqp://' broker protocol (in case you have librabbitmq installed). Don't forget to install the code from the 2 PRs mentioned above.

sabw8217 commented 10 years ago

I think there is still an issue here - it looks to me like what it is happening is that this call in asynloop() on line 41 of celery/worker/loops.py can receive messages up to the entire prefetch count of the worker in addition to the basic_consume_ok.

consumer.consume()

And when that happens the messages end up in the method_queue in the amqp Channel. If the entire prefetch count has been received, RMQ won't send anything to us until a message gets acked, but because RMQ isn't sending anything the socket FD never becomes ready to read, and we never end up calling the task handler(which would call drain_events and kick us out of this bad state, where there are prefetch_count basic_deliver messages in the pyamqp's internal method queue). It looks to me like doing this will fix it, but I'm guessing there should be a timeout on this call, and possibly this whole issue should be being addressed at a lower level:

diff --git a/celery/worker/loops.py b/celery/worker/loops.py
index 0891f51..a5ac57a 100644
--- a/celery/worker/loops.py
+++ b/celery/worker/loops.py
@@ -49,6 +49,11 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
     if not obj.restart_count and not obj.pool.did_start_ok():
         raise WorkerLostError('Could not start worker processes')

+    # consumer.consume() may have prefetched up to our
+    # limit - drain an event so we are in a clean state
+    # prior to starting our event loop.
+    connection.drain_events()
+
     # FIXME: Use loop.run_forever
     # Tried and works, but no time to test properly before release.
     hub.propagate_errors = errors
ionelmc commented 10 years ago

@sabw8217 are you still having the issue or is this a different problem ?

sabw8217 commented 10 years ago

It's the same issue. The worker hangs on startup, and you can wake it up by issuing a celery inspect active command, which I think causes RabbitMQ to deliver another message to the worker, and then the socket becomes readable and we go through drain_events and consume all the messages.

I added some debugging code to amqp in connection.Connection._wait_method and got output like this:

[2014-03-07 08:06:44,093: WARNING/MainProcess] called _wait_method for channel 2, there was 0 in the queue [2014-03-07 08:06:44,096: WARNING/MainProcess] Queueing up (60, 60) for later [2014-03-07 08:06:44,096: WARNING/MainProcess] Queueing up (60, 60) for later [2014-03-07 08:06:44,097: WARNING/MainProcess] Queueing up (60, 60) for later [2014-03-07 08:06:44,097: WARNING/MainProcess] Queueing up (60, 60) for later [2014-03-07 08:06:44,097: WARNING/MainProcess] Queueing up (60, 60) for later [2014-03-07 08:06:44,097: WARNING/MainProcess] Queueing up (60, 60) for later [2014-03-07 08:06:44,098: WARNING/MainProcess] Queueing up (60, 60) for later [2014-03-07 08:06:44,098: WARNING/MainProcess] Queueing up (60, 60) for later [2014-03-07 08:06:44,098: WARNING/MainProcess] Queueing up (60, 60) for later [2014-03-07 08:06:44,099: WARNING/MainProcess] Queueing up (60, 60) for later [2014-03-07 08:06:44,099: WARNING/MainProcess] Queueing up (60, 60) for later [2014-03-07 08:06:44,099: WARNING/MainProcess] Queueing up (60, 60) for later [2014-03-07 08:06:44,099: WARNING/MainProcess] Queueing up (60, 60) for later [2014-03-07 08:06:44,100: WARNING/MainProcess] Queueing up (60, 60) for later [2014-03-07 08:06:44,100: WARNING/MainProcess] Queueing up (60, 60) for later [2014-03-07 08:06:44,100: WARNING/MainProcess] Queueing up (60, 60) for later [2014-03-07 08:06:44,101: WARNING/MainProcess] called _wait_method for channel 1, there was 16 in the queue [2014-03-07 08:06:45,105: WARNING/MainProcess] called _wait_method for channel 1, there was 0 in the queue [2014-03-07 08:06:45,105: WARNING/MainProcess] called _wait_method for channel 0, there was 0 in the queue [2014-03-07 08:06:45,106: WARNING/MainProcess] called _wait_method for channel 1, there was 16 in the queue [2014-03-07 08:06:45,106: WARNING/MainProcess] called _wait_method for channel 0, there was 0 in the queue

I was running with CELERYD_PREFETCH_MULTIPLIER = 16, and I can see 16 messages go into that method_queue property, and I don't ever see them get read out of there until I trigger a read from the socket with a management command.

ionelmc commented 10 years ago

@sabw8217 can you try different client library ? (eg: uninstall or install rabbitmq)

Also, you're not using SSL right?

sabw8217 commented 10 years ago

Reinstalling the rabbitmq-server package(which I assume is what you meant) didn't have any affect. I'm not using SSL, I'm just connecting to local RabbitMQ: pyamqp://localhost:5672/

ionelmc commented 10 years ago

@sabw8217 This may sound silly, but are you using the latest packages of kombu/billiard/celery/pyamqp ?

sabw8217 commented 10 years ago

Sorry about the slow response. Here's the output of pip freeze in the ve I am running this in: amqp==1.4.2 anyjson==0.3.3 argparse==1.2.1 billiard==3.3.0.14 celery==3.1.9 distribute==0.6.24 kombu==3.0.12 librabbitmq==1.0.3 pytz==2013.9 thrift==0.9.1 wsgiref==0.1.2

bialecki commented 10 years ago

Having the same problem. Running celery=3.1.10, latest versions of amqp/billiard/kombu. Last log line is Worker: Hub.register Pool.... Same behavior that a management command call to inspect queues frees them up.

I can also confirm that adding the call to connection.drain_events() fixes the issue. I've also been able to reproduce, so let me know if there's something else you want.

pykler commented 10 years ago

having the same issue, inspect active seems to get things going. What version of celery is this supposed to be fixed in?

smurfix commented 10 years ago

Hmm. The "fix" in py-amqp is to never read more from the socket than absolutely necessary.

Doesn't Celery have a way to tell its scheduler to mark a task as ready, without having a readable/writeable socket? I really don't like the inefficiency caused by not buffering any more.

ask commented 10 years ago

What do you mean by mark as ready?

I'm currently in the progress of rewriting py-amqp, and that includes buffering like crazy

Sent from my iPhone

On Jun 29, 2014, at 2:01 AM, Matthias Urlichs notifications@github.com wrote:

Hmm. The "fix" in py-amqp is to never read more from the socket than absolutely necessary.

Doesn't Celery have a way to tell its scheduler to mark a task as ready, without having a readable/writeable socket? I really don't like the inefficiency caused by not buffering any more.

— Reply to this email directly or view it on GitHub.

ask commented 10 years ago

There shouldn't be any method queue, I'm not sure why amqplib has one in the first place but I've removed it in amqp master.

Note also that the socket read is blocking, so while the event loop is async it blocks once the amqp socket is readable.

This is also something I'm fixing in amqp master/callbacks branches. The goal is that the worker should have conn.setblocking(0) and async ack + publish by 3.2, which are the only remaining blocking sections in the worker unless you also count task error emails

smurfix commented 10 years ago

Hi,

Ask Solem Hoel:

What do you mean by mark as ready?

Buffering is good. The problem of some systems is that they tie the reading task's executability to the readiness of the file descriptor. So I can send two packets' worth of data and only one will be processed while the other languishes in the buffer.

The converse problem is when you decide to schedule the reader when the buffer is non-empty, without regard to whether there's enough data in there. That would cause the code to busy-loop.

-- Matthias Urlichs

lra commented 10 years ago

Hi,

We have hundreds of celery workers running, and we experience this problem: Stuck workers doing nothing on a queue.

Some of our workers are alone on a given queue, some have brothers working on the same queue. The problem seems to appear when a worker is alone on one given queue. As soon as we launch another one on the same queue, it unstuck the 1st one.

In our cluster, we upgraded celery with:

celery==3.1.12
billiard==3.3.0.18
kombu==3.0.19
amqp==1.4.5
anyjson==0.3.3

We launch celery like this:

celery worker --without-gossip --without-heartbeat --without-mingle

We don't really need the fancy stuff...

Our fix now is to have a cron run this:

*/5 * * * * ubuntu /usr/local/bin/celery inspect active_queues > /dev/null

It fixed it...

boffbowsh commented 10 years ago

We ran into this issue too. After a lot of experimentation, using the threaded worker solved our problems. Our theory is that celery will then use separate threads for RabbitMQ communication and actual work. We're running with -c 1 -P threads and then scaling out with processes using celery multi. We don't want the Python GIL interfering with our workers, but I guess if you have the right workload then larger thread concurrency per worker may work for you.

One thing to bear in mind is that we also had to switch from librabbitmq to py-amqp for this to work, I guess due to calling C code wrapping everything in a giant Mutex or something.

Hope this helps others.

public commented 9 years ago

What @boffbowsh said isn't entirely accurate. Turned out that librabbitmq + -P threads was a sufficient work around in this case.

Rigdon commented 9 years ago

Any recent discoveries on this issue? We're experiencing this problem intermittently and a celery inspect active will usually resolve it, but sometimes it doesn't and we have to restart the worker processes.

thedrow commented 9 years ago

No, we're really sorry but we're suffering from low number of maintainers and @ask is pretty busy lately. I'm gonna try to dive into celery this weekend, hopefully resolving some issues.

Rigdon commented 9 years ago

Thanks for the update!

thedrow commented 9 years ago

@Rigdon feel free to contribute. We need pull requests fixing bugs like these.

ask commented 9 years ago

Closed by #2823 Thanks everyone :)

tumb1er commented 9 years ago

Applying patch to 3.1.18 doesn't work for me: celery runs with keys:

celery worker ... --without-gossip -c 1
  1. First celery task is successfully consumed and then retries with delay 10 seconds
  2. When second celery task appears in queue, RabbitMQ admin shows 1 message unacknowledged and 1 ready.
  3. After that nothing happens until celery worker restart.