python / cpython

The Python programming language
https://www.python.org
Other
63.6k stars 30.47k forks source link

Shutting down consumer on a remote queue #73640

Open d0540a6e-554e-4f37-9441-cfac3534c61d opened 7 years ago

d0540a6e-554e-4f37-9441-cfac3534c61d commented 7 years ago
BPO 29454
Nosy @rhettinger, @applio, @aptrishu

Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.

Show more details

GitHub fields: ```python assignee = 'https://github.com/applio' closed_at = None created_at = labels = ['type-feature'] title = 'Shutting down consumer on a remote queue' updated_at = user = 'https://bugs.python.org/Semi' ``` bugs.python.org fields: ```python activity = actor = 'davin' assignee = 'davin' closed = False closed_date = None closer = None components = [] creation = creator = 'Semi' dependencies = [] files = [] hgrepos = [] issue_num = 29454 keywords = [] message_count = 5.0 messages = ['287055', '288942', '288973', '288980', '288981'] nosy_count = 4.0 nosy_names = ['rhettinger', 'davin', 'Semi', 'aptrishu'] pr_nums = [] priority = 'normal' resolution = None stage = 'needs patch' status = 'open' superseder = None type = 'enhancement' url = 'https://bugs.python.org/issue29454' versions = ['Python 3.6'] ```

d0540a6e-554e-4f37-9441-cfac3534c61d commented 7 years ago

Using code adapted from the example in the docs (https://docs.python.org/3/library/multiprocessing.html#using-a-remote-manager), if you create a remote queue server, a producer which puts items in the queue and a consumer which consumes elements from the queue. If the consumer gets killed and restarted again, it misses one item from the queue. For a reproducable example see the stackoverflow reference below.

Expected: items stay in the queue until a consumer consumes it Happens: one item still gets consumed from the queue even if after a consumer gets killed

Version: Python 3.6.0 OS: Ubuntu 16.10

reference: http://stackoverflow.com/questions/42052248/remote-queue-consumer-misses-first-message-after-restart

b81cbc8b-a9e3-45eb-8b17-0521b8b6d3b2 commented 7 years ago

I'd like to work on this issue.

rhettinger commented 7 years ago

Expected: items stay in the queue until a consumer consumes it

That would be my expectation as well.

Davin, do you know why the example in the docs uses queue.Queue() instead of multiprocessing.Queue()? Would there be a difference?

Also, I'm curious about the best practice for message queues when a consumer is killed. Even if the message getting is atomic and never loses a message, what do people normally do to resurrect a task that was already underway when the consumer is killed? I presume there is no easy way to find-out whether the task had just started, was in-process and changed the state of the system, or mostly finished. Is there some sort of coding pattern for begin_transaction, commit, and rollback? ISTM, that killing consumers is a perilous business.

The only reliable pattern I can think of is for the consumer to send back messages through another queue to indicate that a task was received and underway, and to indicate that a task was completed.

applio commented 7 years ago

My understanding is that example uses a queue.Queue() to demonstrate how to create a custom, remote service from scratch. The implementation in this simple example lacks the sophistication of multiprocessing.Queue() for handling situations such as the one raised by the OP. The example was not attempting to demonstrate a comprehensive replacement for multiprocessing.Queue(), rather it was attempting to demonstrate the mechanism for creating and consuming a callable service hosted by a remote manager. The documentation currently does not introduce this example well nor describe the above motivation.

As to why this simplistic implementation of a distributed queue appears to lose an item when the client is killed, it works in the following way:

  1. Let's say a server is started to hold a queue.Queue() which is populated with 1 item.
  2. A client requests an item from the server.
  3. The server receives the request and performs a blocking q.get() (where q is the queue.Queue() object held by the server).
  4. When the q.get() releases and returns an item, q has had one item removed leaving a queue size of 0 in our scenario, and then that item is sent from the server to the client.
  5. A client requests another item from the server.
  6. The server receives the request and performs a blocking q.get() on the queue. Because there's nothing left to grab from the queue, the server blocks and waits for something to magically appear in the queue. We'll have a "producer" put something into the queue in a moment but for the time being the server is stuck waiting on the q.get() and likewise the client is waiting on a response from the server.
  7. That client is killed in an unexpected, horrible death because someone accidentally hits it with a Cntrl-C.
  8. A "producer" comes along and puts a new item into the server's queue.
  9. The server's blocking q.get() call releases, q has had one item removed leaving a queue size of 0 again, and then that item is sent from the server to the client only the client is dead and the transmission fails.
  10. A "producer" comes along and puts another new item into the server's queue.
  11. The someone who accidentally, horribly killed the client now frantically restarts the client; the client requests an item from the server and the server responds with a new item. However, this is the item introduced in step 10 and not the item from step 8. Hence the item from step 8 appears lost.

Note that in our simplistic example from the docs, there is no functionality to repopulate the queue object when communication of the item fails to complete. In general, a multiprocessing.manager has no idea what a manager will contain and has no insight on what to do when a connection to a client is severed.

Augmenting the example in the docs to cover situations like this would significantly complicate the example but there are many others to consider on the way to building a comprehensive solution -- instead a person should choose multiprocessing.Queue() unless they have something particular in mind.

I think the example should be better introduced (the intro is terse) to explain its purpose and warn that it does not offer a comprehensive replacement for multiprocessing.Queue(). It does not need to go into all of the above explanation.

applio commented 7 years ago

My understanding of other message queueing systems is that many are motivated by speed to the point that they will permit messages to be "lost" due to specific scenarios that would be overly costly to defend against. Other message queueing systems adopt a philosophy that no message should ever be lost but as a compromise to speed do not promise that a message will be immediately recovered when caught in one of these problematic scenarios, only that it will eventually be recovered and processed fully.

It appears that the philosophy adopted or really the solution requirements lead to different best practices.