Closed d96b17f4-fadc-43d7-ad4e-30f76abdc4d3 closed 13 years ago
I have recently begun using multiprocessing for a variety of batch jobs. It's a great library, and it's been quite useful. However, I have been bitten several times by situations where a worker process in a Pool will unexpectedly die, leaving multiprocessing hanging in a wait. A simple example of this is produced by the following: """
import multiprocessing, sys def foo(x): sys.exit(1) multiprocessing.Pool(1).apply(foo, [1]) """ The child will exit and the parent will hang forever. A similar occurrence happens if one pushes C-c while a child process is running (this special case is noted in http://bugs.python.org/issue8296) or killed by a signal.
Attached is a patch to handle unexpected terminations of children processes and prevent the parent process from hanging. A test case is included. (Developed and tested on 64-bit Ubuntu.) Please let me know what you think. Thanks!
thanks greg; I'm going to take a look and think about this. I'd like to resolve bug 9207 first though
Cool, thanks. I'll note that with this patch applied, using the test program from 9207 I consistently get the following exception:
"""
Exception in thread Thread-1 (most likely raised during interpreter shutdown):
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
File "/usr/lib/python2.6/threading.py", line 484, in run
File "/home/gdb/repositories/multiprocessing/pool.py", line 312, in _handle_workers
File "/home/gdb/repositories/multiprocessing/pool.py", line 190, in _maintain_pool
File "/home/gdb/repositories/multiprocessing/pool.py", line 158, in _join_exited_workers
<type 'exceptions.TypeError'>: 'NoneType' object is not callable
"""
This is line 148 in the unpatched source, namely the 'reversed(range(len(self._pool)))' line of _join_exited_workers. Looks like the same issue, where instead reversed/range/len have been set to None.
So I think by changing how much time the worker_handler spends in various functions, I've made it possible (or just more likely?) that if we lose the race with interpreter shutdown the worker_handler will be in the middle of _join_exited_workers. This may mean that someone should keep around a local reference to reversed/range/len... not sure if there's a better solution.
Ugh. I'm going to have to think about the cleanest way of handling this case of functions vanishing from us since this is going to be more widespread inside the code. Suggestions welcome.
What about just catching the exception? See e.g. the attached patch. (Disclaimer: not heavily tested).
A+ for creativity; I wouldn't have thought of that ;)
termination.patch, in the result handler you've added:
while cache and thread._state != TERMINATE and not failed
why are you terminating the second pass after finding a failed process?
Unpickleable errors and other errors occurring in the worker body are not exceptional cases, at least not now that the pool is supervised by _handle_workers. I think the result should be set also in this case, so the user can inspect the exception after the fact.
I have some other suggestions too, so I will review this patch tomorrow.
For shutdown.patch, I thought this only happened in the worker handler, but you've enabled this for the result handler too? I don't care about the worker handler, but with the result handler I'm worried that I don't know what ignoring these exceptions actually means. For example, is there a possibility that we may lose results at shutdown?
Thanks much for taking a look at this!
why are you terminating the second pass after finding a failed process? Unfortunately, if you've lost a worker, you are no longer guaranteed that cache will eventually be empty. In particular, you may have lost a task, which could result in an ApplyResult waiting forever for a _set call.
More generally, my chief assumption that went into this is that the unexpected death of a worker process is unrecoverable. It would be nice to have a better workaround than just aborting everything, but I couldn't see a way to do that.
Unpickleable errors and other errors occurring in the worker body are not exceptional cases, at least not now that the pool is supervised by _handle_workers. I could be wrong, but that's not what my experiments were indicating. In particular, if an unpickleable error occurs, then a task has been lost, which means that the relevant map, apply, etc. will wait forever for completion of the lost task.
I think the result should be set also in this case, so the user can inspect the exception after the fact. That does sound useful. Although, how can you determine the job (and the value of i) if it's an unpickleable error? It would be nice to be able to retrieve job/i without having to unpickle the rest.
For shutdown.patch, I thought this only happened in the worker handler, but you've enabled this for the result handler too? I don't care about the worker handler, but with the result handler I'm worried that I don't know what ignoring these exceptions actually means. You have a good point. I didn't think about the patch very hard. I've only seen these exceptions from the worker handler, but AFAICT there's no guarantee that bad luck with the scheduler wouldn't result in the same problem in the result handler. One option would be to narrow the breadth of the exceptions caught by _make_shutdown_safe (do we need to catch anything but TypeErrors?). Another option would be to enable only for the worker handler. I don't have a particularly great sense of what the Right Thing to do here is.
Greg - I asked Ask to take a look - his celery package is a huge consumer of multiprocessing, and so I tend to run things past him as well.
That said - to both of you - the fundamental problem the shutdown patch is trying to scratch is located in bpo-9207 - greg's termination patch just exposes the problem in 9207 a lot more.
Focusing specifically on the shutdown patch; our issue is that during interpreter shutdown, sys.modules is iterated, and entries are set to None - for threads which "live on" well into that cycle can end up losing imported functions/modules/etc. The multiple daemon threads in the Pool code are exposing this as code which executed imported functions (such as the debug() statement in handle_workers) which will fire after the pool has exited and the interpreter is shut down.
We can work around the shutdown issue (really, bug 9207) by ignoring the exception such as shutdown.patch does, or passing in references/adding references to the functions those methods need. Or (as Brett suggested) converting them to class methods and adding references to the class. Or passing them in via the signature like this _handle_workers(arg, _debug=debug), etc.
Unfortunately, if you've lost a worker, you are no longer guaranteed that cache will eventually be empty. In particular, you may have lost a task, which could result in an ApplyResult waiting forever for a _set call.
More generally, my chief assumption that went into this is that the unexpected death of a worker process is unrecoverable. It would be nice to have a better workaround than just aborting everything, but I couldn't see a way to do that.
It would be a problem if the process simply disappeared, But in this case you have the ability to put a result on the queue, so it doesn't have to wait forever.
For processes disappearing (if that can at all happen), we could solve that by storing the jobs a process has accepted (started working on), so if a worker process is lost, we can mark them as failed too.
I could be wrong, but that's not what my experiments were indicating. In particular, if an unpickleable error occurs, then a task has been lost, which means that the relevant map, apply, etc. will wait forever for completion of the lost task.
It's lost now, but not if we handle the error... For a single map operation this behavior may make sense, but what about someone running the pool as s long-running service for users to submit map operations to? Errors in this context are expected to happen, even unpickleable errors.
I guess that the worker handler works as a supervisor is a side effect, as it was made for the maxtasksperchild feature, but for me it's a welcome one. With the supervisor in place, multiprocessing.pool is already fairly stable to be used for this use case, and there's not much to be done to make it solid (Celery is already running for months without issue, unless there's a pickling error...)
That does sound useful. Although, how can you determine the job (and the value of i) if it's an unpickleable error? It would be nice to be able to retrieve job/i without having to unpickle the rest.
I was already working on this issue last week actually, and I managed to do that in a way that works well enough (at least for me): http://github.com/ask/celery/commit/eaa4d5ddc06b000576a21264f11e6004b418bda1#diff-1
For processes disappearing (if that can at all happen), we could solve that by storing the jobs a process has accepted (started working on), so if a worker process is lost, we can mark them as failed too. Sure, this would be reasonable behavior. I had considered it but decided it as a larger change than I wanted to make without consulting the devs.
I was already working on this issue last week actually, and I managed to do that in a way that works well enough (at least for me): If I'm reading this right, you catch the exception upon pickling the result (at which point you have the job/i information already; totally reasonable). I'm worried about the case of unpickling the task failing. (Namely, the "task = get()" line of the "worker" method.) Try running the following: """
!/usr/bin/env python
import multiprocessing p = multiprocessing.Pool(1) def foo(x): pass p.apply(foo, [1]) """ And if "task = get()" fails, then the worker doesn't know what the relevant job/i values are.
Anyway, so I guess the question that is forming in my mind is, what sorts of errors do we want to handle, and how do we want to handle them? My answer is I'd like to handle all possible errors with some behavior that is not "hang forever". This includes handling children processes dying by signals or os._exit, raising unpickling errors, etc.
I believe my patch provides this functionality. By adding the extra mechanism that you've written/proposed, we can improve the error handling in specific recoverable cases (which probably constitute the vast majority of real-world cases).
I think I misunderstood the purpose of the patch. This is about handling errors on get(), not on put() like I was working on. So sorry for that confusion.
What kind of errors are you having that makes the get() call fail?
If the queue is not working, then I guess the only sensible approach is to shutdown the pool like suggested. I'll open up another issue for unpickleable errors then.
For reference I opened up a new issue for the put() case here: http://bugs.python.org/issue9244
What kind of errors are you having that makes the get() call fail? Try running the script I posted. It will fail with an AttributeError (raised during unpickling) and hang.
I'll note that the particular issues that I've run into in practice are:
This AttributeError problem is one that I discovered while generating test cases for the patch.
While looking at your patch in bpo-9244, I realized that my code fails to handle an unpickleable task, as in: """
import multiprocessing foo = lambda x: x p = multiprocessing.Pool(1) p.apply(foo, [1]) """ This should be fixed by the attached pickling_error.patch (independent of my other patches).
There's one more thing
if exitcode is not None:
cleaned = True
if exitcode != 0 and not worker._termination_requested:
abnormal.append((worker.pid, exitcode))
Instead of restarting crashed worker processes it will simply bring down the pool, right?
If so, then I think it's important to decide whether we want to keep the supervisor functionality, and if so decide on a recovery strategy.
Some alternatives are:
A) Any missing worker brings down the pool.
B) Missing workers will be replaced one-by-one. A maximum-restart-frequency decides when the supervisor should give up trying to recover the pool, and crash it.
C) Same as B, except that any process crashing when trying to get() will bring down the pool.
I think the supervisor is a good addition, so I would very much like to keep it. It's also a step closer to my goal of adding the enhancements added by Celery to multiprocessing.pool.
Using C is only a few changes away from this patch, but B would also be possible in combination with my accept_callback patch. It does pose some overhead, so it depends on the level of recovery we want to support.
accept_callback: this is a callback that is triggered when the job is reserved by a worker process. The acks are sent to an additional Queue, with an additional thread processing the acks (hence the mentioned overhead). This enables us to keep track of what the worker processes are doing, also get the PID of the worker processing any given job (besides from recovery, potential uses are monitoring and the ability to terminate a job (ApplyResult.terminate?). See http://github.com/ask/celery/blob/master/celery/concurrency/processes/pool.py
Jesse wrote,
We can work around the shutdown issue (really, bug 9207) by ignoring the exception such as shutdown.patch does, or passing in references/adding references to the functions those methods need. Or (as Brett suggested) converting them to class methods and adding references to the class. Or passing them in via the signature like this _handle_workers(arg, _debug=debug), etc.
Greg wrote,
Another option would be to enable only for the worker handler. I don't have a particularly great sense of what the Right Thing to do here is.
I don't think _make_shutdown_safe should be added to the result handler. If the error can indeed happen there, then we need to solve it in a way that enables it to finish the work.
Jesse, how hard is it to fix the worker handler by passing the references? Note that _worker_handler is not important to complete shutdown at this point, but it may be in the future (it seems termination.patch already changes this)
Passing the references seems to be a losing game; for _handle_workers - we only need 1 function (debug) - for others (say _join_exited_workers), we need references to reversed/range/len.
A possible alternative is to make those threads non-daemon threads; but I'd have to test that.
Before I forget, looks like we also need to deal with the result from a worker being un-unpickleable: """
import multiprocessing def foo(x): global bar def bar(x): pass return bar p = multiprocessing.Pool(1) p.apply(foo, [1]) """
This shouldn't require much more work, but I'll hold off on submitting a patch until we have a better idea of where we're going in this arena.
Instead of restarting crashed worker processes it will simply bring down the pool, right? Yep. Again, as things stand, once you've lost an worker, you've lost a task, and you can't really do much about it. I guess that depends on your application though... is your use-case such that you can lose a task without it mattering? If tasks are idempotent, one could have the task handler resubmit them, etc.. But really, thinking about the failure modes I've seen (OOM kills/user-initiated interrupt) I'm not sure under what circumstances I'd like the pool to try to recover.
The idea of recording the mapping of tasks -> workers seems interesting. Getting all of the corner cases could be hard (e.g. making removing a task from the queue and recording which worker did the removing atomic, detecting if the worker crashed while still holding the queue lock) and doing this would require extra mechanism. This feature does seem to be useful for pools running many different jobs, because that way a crashed worker need only terminate one job.
Anyway, I'd be curious to know more about the kinds of crashes you've encountered from which you'd like to be able to recover. Is it just Unpickleable exceptions, or are there others?
Greg,
Before I forget, looks like we also need to deal with the result from a worker being un-unpickleable:
This is what my patch in bug 9244 does...
Yep. Again, as things stand, once you've lost an worker, you've lost a task, and you can't really do much about it. I guess that depends on your application though... is your use-case such that you can lose a task without it mattering? If tasks are idempotent, one could have the task handler resubmit them, etc.. But really, thinking about the failure modes I've seen (OOM kills/user-initiated interrupt) I'm not sure under what circumstances I'd like the pool to try to recover.
Losing a task is not fun, but there may still be other tasks running that are just as important. I think you're thinking from a map_async perspective here.
user-initiated interrupts, this is very important to recover from, think of some badly written library code suddenly raising SystemExit, this shouldn't terminate other jobs, and it's probably easy to recover from, so why shouldn't it try?
The idea of recording the mapping of tasks -> workers seems interesting. Getting all of the corner cases could be hard (e.g. making removing a task from the queue and recording which worker did the removing atomic, detecting if the worker crashed while still holding the queue lock) and doing this would require extra mechanism. This feature does seem to be useful for pools running many different jobs, because that way a crashed worker need only terminate one job.
I think I may have an alternative solution. Instead of keeping track of what the workers are doing, we could simply change the result handler so it gives up when there are no more alive processes.
while state != TERMINATE:
result = get(timeout=1)
if all_processes_dead():
break;
Ok. I implemented my suggestions in the patch attached (multiprocessing-trunk@82502-termination2.patch) What do you think?
Greg, Maybe we could keep the behavior in termination.patch as an option for map jobs? It is certainly a problem that map jobs won't terminate until the pool is joined.
> Before I forget, looks like we also need to deal with the > result from a worker being un-unpickleable: This is what my patch in bug 9244 does... Really? I could be misremembering, but I believe you deal with the case of the result being unpickleable. I.e. you deal with the put(result) failing, but not the get() in the result handler. Does my sample program work with your patch applied?
while state != TERMINATE: result = get(timeout=1) if all_processes_dead(): break; Will this sort of approach work with the supervisor, which continually respawns workers?
user-initiated interrupts, this is very important to recover from, think of some badly written library code suddenly raising SystemExit, this shouldn't terminate other jobs, and it's probably easy to recover from, so why shouldn't it try? To be clear, in this case I was thinking of KeyboardInterrupts.
I'll take a look at your patch in a bit. From our differing use-cases, I do think it could make sense as a configuration option, but where it probably belongs is on the wait() call of ApplyResult.
Just some small cosmetic changes to the patch. (added multiprocessing-trunk@82502-termination3.patch)
Really? I could be misremembering, but I believe you deal with the case of the result being unpickleable. I.e. you deal with the put(result) failing, but not the get() in the result handler.
Your example is demonstrating the pickle error on put(), not on get().
Does my sample program work with your patch applied?
Yeah, check this out:
/opt/devel/Python/trunk(master)$> patch -p1 < multiprocessing-trunk@82502-handle_worker_encoding_errors2.patch
patching file Lib/multiprocessing/pool.py
patching file Lib/test/test_multiprocessing.py
/opt/devel/Python/trunk(master)$> ./python.exe
Python 2.7 (unknown, Jul 13 2010, 13:28:35)
[GCC 4.2.1 (Apple Inc. build 5659)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import multiprocessing
>>> def foo():
... return lambda: 42
...
>>> p = multiprocessing.Pool(2)
>>> p.apply_async(foo).get()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/devel/Python/trunk/Lib/multiprocessing/pool.py", line 518, in get
raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '<function <lambda> at 0x1005477d0>'. Reason: 'Can't pickle <type 'function'>: attribute lookup __builtin__.function failed'
>>> import operator
>>> p.apply_async(operator.add, (2, 2)).get()
4
To be clear, in this case I was thinking of KeyboardInterrupts.
In termination2.patch I handle BaseExceptions, by exiting the worker process, and then letting the _worker_handler replace the process.
It's very useful, because then people can kill -INT the worker process if they want to cancel the job, and without breaking other jobs running.
From our differing use-cases, I do think it could make sense as a configuration option, but where it probably belongs is on the wait() call of ApplyResult.
Indeed! This could be done by adding listeners for this type of errors.
pool.add_worker_missing_callback(fun)
So MapResults could install a callback like this:
def __init__():
...
_pool.add_worker_missing_callback(self._on_worker_missing)
...
def _on_worker_missing(self):
err = WorkerLostError(
"Worker lost while running map job")
self._set(None, (False, err))
What do you think about that?
IMHO, even though the worker lost could be unrelated to the map job in question, it would still be a better alternative than crashing the whole pool.
Actually, the program you demonstrate is nonequivalent to the one I posted. The one I posted pickles just fine because 'bar' is a global name, but doesn't unpickle because it doesn't exist in the parent's namespace. (See http://docs.python.org/library/pickle.html#what-can-be-pickled-and-unpickled.) Although, if you're able to run my test program verbatim, then it's entirely possible I'm just missing something.
Anyway, I do think that adding a 'worker_missing_callback' could work. You'd still have to make sure the ApplyResult (or MapResult) can crash the pool if it deems necessary though.
Started looking at your patch. It seems to behave reasonably, although it still doesn't catch all of the failure cases. In particular, as you note, crashed jobs won't be noticed until the pool shuts down... but if you make a blocking call such as in the following program, you'll get a hang: """
import multiprocessing, os, signal def foo(x): os.kill(os.getpid(), signal.SIGKILL) multiprocessing.Pool(1).apply(foo, [1]) """
The tests also occasionally hang in e.g. test_job_killed_by_signal (main.WithProcessesTestPoolSupervisor) ...
but if you make a blocking call such as in the following program, you'll get a hang
Yeah, and for that we could use the same approach as for the maps.
But, I've just implemented the accept callback approach, which should be superior. Maps/Apply fails instantly as soon as a worker process crashes, but the pool remains fully functional. Patch multiprocessing-trunk@82502-termination-trackjobs.patch added.
There seems to be some race conditions left, because some of the tests breaks from time to time. Maybe you can pinpoint it before me.
At first glance, looks like there are a number of sites where you don't change the blocking calls to non-blocking calls (e.g. get()). Almost all of the get()s have the potential to be called when there is no possibility for them to terminate.
I might recommend referring to my original termination.patch... I believe I tracked down the majority of such blocking calls.
In the interest of simplicity though, I'm beginning to think that the right answer might be to just do something like termination.patch but to conditionalize crashing the pool on a pool configuration option. That way the behavior would no worse for your use case. Does that sound reasonable?
At first glance, looks like there are a number of sites where you don't >change the blocking calls to non-blocking calls (e.g. get()). Almost >all of the get()s have the potential to be called when there is no >possibility for them to terminate.
I might recommend referring to my original termination.patch... I >believe I tracked down the majority of such blocking calls.
I thought the EOF errors would take care of that, at least this has been running in production on many platforms without that happening.
In the interest of simplicity though, I'm beginning to think that the >right answer might be to just do something like termination.patch but >to conditionalize crashing the pool on a pool configuration option. >That way the behavior would no worse for your use case. Does that >sound reasonable?
How would you shut down the pool then? And why is that simpler?
Btw, the current problem with termination3.patch seems to be that the MainProcess somehow appears in self._pool. I have no idea how it gets there. Maybe some unrelated issue that appears when forking that late in the tests.
I thought the EOF errors would take care of that, at least this has been running in production on many platforms without that happening. There are a lot of corner cases here, some more pedantic than others. For example, suppose a child dies while holding the queue read lock... that wouldn't trigger an EOF error anywhere. Would a child being OOM-killed raise an EOF error? (It very well could, but I seem to recall that it does not.)
I've said most of this before, but I still believe it's relevant, so here goes. In the context where I'm using this library, I'll often run jobs that should complete in O(10 minutes). I'll often start a job, realize I did something wrong and hit C-c (which could catch the workers anywhere). I've seen workers be OOM killed, silently dropping the tasks they had. As we've established, at the moment any of these failures results in a hang; I'd be very happy to see any sort of patch that improves my chances of seeing the program terminate in a finite amount of time. (And I'd be happiest if this is guaranteed.)
It's possible that my use case isn't supported... but I just want to make sure I've made clear how I'm using the library. Does that make sense?
How would you shut down the pool then? A potential implementation is in termination.patch. Basically, try to shut down gracefully, but if you timeout, just give up and kill everything.
And why is that simpler? It's a lot less code (one could write an even shorter patch that doesn't try to do any additional graceful error handling), doesn't add a new monitor thread, doesn't add any more IPC mechanism, etc.. FWIW, I don't see any of these changes as bad, but I don't feel like I have a sense of how generally useful they would be.
A potential implementation is in termination.patch. Basically, try to shut down gracefully, but if you timeout, just give up and kill everything.
You can't have a sensible default timeout, because the worker may be processing something important...
It's a lot less code (one could write an even shorter patch that doesn't try to do any additional graceful error handling), doesn't add a new monitor thread, doesn't add any more IPC mechanism, etc.. FWIW, I don't see any of these changes as bad, but I don't feel like I have a sense of how generally useful they would be.
Not everything can be simple. Getting this right may require a bit of code. I think we can get rid of the ack_handler thread by making the result handler responsible for both acks and results, but I haven't tried it yet, and this code is already running in production by many so didn't want to change it unless I had to.
You can't have a sensible default timeout, because the worker may be processing something important... In my case, the jobs are either functional or idempotent anyway, so aborting halfway through isn't a problem. In general though, I'm not sure what kinds of use cases would tolerate silently-dropped jobs. And for example, if an OOM kill has just occurred, then you're already in a state where a job was unexpectedly terminated... you wouldn't be violating any more contracts by aborting.
In general, I can't help but feel that the approach of "ignore errors and keep going" leads to rather unexpected bugs (and in this case, it leads to infinite hangs). But even in languages where errors are ignored by default (e.g. sh), there are mechanisms for turning on abort-on-error handlers (e.g. set -e).
So my response is yes, you're right that there's no great default here. However, I think it'd be worth (at least) letting the user specify "if something goes wrong, then abort". Keep in mind that this will only happen in very exceptional circumstances anyway.
Not everything can be simple. Sure, but given the choice between a simple solution and a complex one, all else being equal the simple one is desirable. And in this case, the more complicated mechanism seems to introduce subtle race conditions and failures modes.
Anyway, Jesse, it's been a while since we've heard anything from you... do you have thoughts on these issues? It would probably be useful to get a fresh opinion :).
You two are bigger users of this then I currently am (the curse/blessing of switching jobs), which is why I've let you hash it out.
Let me point out: my goal is to deal with errors in a way which does not cause a total crash, a lockup, or hanging processes. Whether or not we lose jobs is another thing entirely, and something I'm torn on.
I also need to mention - I think we can add fixes to the behavior to 2.7 - we can not, however, change the API. If we change the API, this fix will be only on Python 3.2 which is not what I suspect either of you want.
Thanks for the comment. It's good to know what constraints we have to deal with.
we can not, however, change the API. Does this include adding optional arguments?
(sorry, I thought I had replied to your comment when I hadn't!) I think we can get away with a new optional kwarg.
I'll take another stab at this. In the attachment (assign-tasks.patch), I've combined a lot of the ideas presented on this issue, so thank you both for your input. Anyway:
Some notes:
Anyway, please let me know what you think of this approach/sample implementation. If we decide that this seems promising, I'd be happy to built it out further.
@greg
Been very busy lately, just had some time now to look at your patch. I'm very ambivalent about using one SimpleQueue per process. What is the reason for doing that?
Thanks for looking at it! Basically this patch requires the parent process to be able to send a message to a particular worker. As far as I can tell, the existing queues allow the children to send a message to the parent, or the parent to send a message to one child (whichever happens to win the race; not a particular one).
I don't love introducing one queue per child either, although I don't have a sense of how much overhead that would add.
Does the problem make sense/do you have any ideas for an alternate solution?
Does the problem make sense/do you have any ideas for an alternate solution?
Well, I still haven't given up on the trackjobs patch. I changed it to use a single queue for both the acks and the result (see new patch attached: multiprocessing-trunk@82502-termination-trackjobs2.patch)
Been running it in production for a few days now, and it seems to work. But the tests still hangs from time to time, it seems they hang more frequent now than in the first patch (this may actually be a good thing:)
Would you like to try and identify the cause of this hang? Still haven't been able to.
I'm not sure about the overhead of using one queue per process either, but I'm usually running about 8 processes per CPU core for IO bound jobs (adding more processes after that usually doesn't affect performance in positive ways). There's also the overhead of the synchronization (ACK). Not sure if this is important performance-wise, but at least this makes it harder for me to reason about the problem.
Hmm, a few notes. I have a bunch of nitpicks, but those can wait for a later iteration. (Just one style nit: I noticed a few unneeded whitespace changes... please try not to do that, as it makes the patch harder to read.)
New patch attach (termination-trackjobs3.patch).
Hmm, a few notes. I have a bunch of nitpicks, but those can wait for a later iteration. (Just one style nit: I noticed a few unneeded whitespace changes... please try not to do that, as it makes the patch harder to read.)
Yeah, nitpicks can wait. We need a satisfactory solution first. I forgot about the whitespace, the reason is that the patch was started from the previous trackjobs patch.
- Am I correct that you handle a crashed worker by aborting all running jobs?
No. The job's result is marked with the WorkerLostError, the process is replaced by a new one, and the pool continue to be functional.
- If you're going to the effort of ACKing, why not record the mapping of tasks to workers so you can be more selective in your >termination?
I does have access to that. There's ApplyResult.worker_pids(). It doesn't terminate anything, it just clean up after whatever terminated. The MapResult could very well discard the job as a whole, but my patch doesn't do that (at least not yet).
Otherwise, what does the ACKing do towards fixing this particular issue?
It's what lets us find out what PID is processing the job. (It also happens to be a required feature to reliably take advantage of external ack semantics (like in AMQP), and also used by my job timeout patch to know when a job was started, and then it shows to be useful in this problem.
- I think in the final version you'd need to introduce some interthread locking, because otherwise you're going to have weird race > conditions. I haven't thought too hard about whether you can get away with just catching unexpected exceptions, but it's probably better to do the locking.
Where is this required?
- I'm getting hangs infrequently enough to make debugging annoying, and I don't have time to track down the bug right now.
Try this:
for i in 1 2 3 4 5; ./python.exe test.regrtest -v test_multiprocessing
it should show up quickly enough (at least on os x)
Why don't you strip out any changes that are not needed (e.g. AFAICT, > the ACK logic), make sure there aren't weird race conditions, and if we start converging on a patch that looks right from a high > level we can try to make it work on all the corner case?
See the updated patch. I can't remove the ACK, but I removed the accept_callback, as it's not strictly needed to solve this problem.
Ah, you're right--sorry, I had misread your code. I hadn't noticed the usage of the worker_pids. This explains what you're doing with the ACKs. Now, the problem is, I think doing it this way introduces some races (which is why I introduced the ACK from the task handler in my most recent patch). What happens if:
You're now reading from self._cache in one thread but writing it in another. What happens if a worker sends a result and then is killed? Again, I haven't thought too hard about what will happen here, so if you have a correctness argument for why it's safe as-is I'd be happy to hear it.
Also, I just noted that your current way of dealing with child deaths doesn't play well with the maxtasksperchild variable. In particular, try running: """ import multiprocessing def foo(x): return x multiprocessing.Pool(1, maxtasksperchild=1).map(foo, [1, 2, 3, 4]) """ (This should be an easy fix.)
- A worker removes a job from the queue and is killed before sending an ACK.
Yeah, this may be a problem. I was thinking we could make sure the task is acked before child process shutdown. Kill -9 is then not safe, but do we really want to guarantee that in multiprocessing? In celery we're safe by using AMQP's ack trasnaction anyway. The same could be said if there's a problem with the queue though. Maybe using ack timeouts? We know how many worker processes are free already.
A worker removes a job from the queue, sends an ACK, and then is killed. Due to bad luck with the scheduler, the parent cleans the worker before the parent has recorded the worker pid. Guess we need to consume from the result queue until it's empty.
You're now reading from self._cache in one thread but writing it in another.
Yeah, I'm not sure if SimpleQueue is managed by a lock already. Should maybe use a lock if it isn't.
What happens if a worker sends a result and then is killed? In the middle of sending? Or, if not I don't think this matters.
By the way, I'm also working on writing some simple benchmarks for the multiple queues approach, just to see if theres at all an overhead to worry about.
Issue bpo-11663 has been marked as a duplicate.
The problem with this approach is that it won't help concurrent.futures. Detection of killed endpoints should ideally happen at a lower level, e.g. in Process or Queue or Connection objects.
Speaking of which, I wonder why we have both multiprocessing.Pool and concurrent.futures.ProcessPoolExecutor. They seem to fill the exact same needs, with different APIs...
On Thu, Mar 31, 2011 at 8:25 AM, Antoine Pitrou \report@bugs.python.org\ wrote:
Antoine Pitrou \pitrou@free.fr\ added the comment:
Speaking of which, I wonder why we have both multiprocessing.Pool and concurrent.futures.ProcessPoolExecutor. They seem to fill the exact same needs, with different APIs...
Brian and I need to work on the consolidation we intend(ed) to occur as people got comfortable with the APIs. My eventual goal is to remove anything but the basic multiprocessing.Process/Queue stuff out of MP and into concurrent.* and support threading backends for it.
Possible plan for POSIX, where a connection uses a pipe() or socketpair(): exploit the fact that an endpoint becomes ready for reading (indicating EOF) when the other endpoint is closed:
>>> r, w = os.pipe()
>>> select.select([r], [], [r], 0)
([], [], [])
>>> os.close(w)
>>> select.select([r], [], [r], 0)
([4], [], [])
>>> a, b = socket.socketpair()
>>> select.select([b], [], [b], 0)
([], [], [])
>>> a.close()
>>> select.select([b], [], [b], 0)
([<socket.socket object, fd=8, family=1, type=1, proto=0>], [], [])
So, each Process could have a sentinel fd in the parent process, which becomes ready when the process exits. These sentinel fds can be used in the various select() calls underlying Queue.get().
(I don't understand why _multiprocessing/socket_connection.c in written in C. Rewriting it in Python would make improvements much easier)
(certainly not easy, sorry)
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields: ```python assignee = None closed_at =
created_at =
labels = ['type-bug', 'library']
title = 'Parent process hanging in multiprocessing if children terminate unexpectedly'
updated_at =
user = 'https://bugs.python.org/gdb'
```
bugs.python.org fields:
```python
activity =
actor = 'dan.oreilly'
assignee = 'jnoller'
closed = True
closed_date =
closer = 'pitrou'
components = ['Library (Lib)']
creation =
creator = 'gdb'
dependencies = ['11743', '12040']
files = ['17905', '17934', '17987', '18015', '18026', '18513', '18657', '18664', '21865', '21923', '21928', '21937', '22266', '36454']
hgrepos = []
issue_num = 9205
keywords = ['patch']
message_count = 78.0
messages = ['109585', '109867', '109885', '109910', '109922', '109936', '110129', '110136', '110139', '110142', '110152', '110169', '110174', '110197', '110207', '110256', '110283', '110285', '110288', '110353', '110366', '110369', '110370', '110386', '110387', '110399', '110428', '110979', '111025', '111028', '111124', '111559', '111690', '111696', '111706', '111915', '113828', '114423', '114449', '115065', '115107', '115114', '115118', '115125', '115128', '132646', '132660', '132661', '132664', '132665', '132666', '135013', '135021', '135025', '135026', '135027', '135028', '135461', '135493', '135502', '135513', '135544', '135624', '135895', '135896', '135901', '135904', '135968', '136090', '137767', '137855', '137912', '137913', '138065', '225833', '226699', '226794', '226806']
nosy_count = 16.0
nosy_names = ['jcea', 'bquinlan', 'pitrou', 'vstinner', 'jnoller', 'hongqn', 'asksol', 'vlasovskikh', 'neologix', 'gdb', 'Albert.Strasheim', 'aljungberg', 'python-dev', 'sbt', 'gkcn', 'dan.oreilly']
pr_nums = []
priority = 'normal'
resolution = 'fixed'
stage = 'resolved'
status = 'closed'
superseder = None
type = 'behavior'
url = 'https://bugs.python.org/issue9205'
versions = ['Python 3.3']
```