uqfoundation / pathos

parallel graph management and execution in heterogeneous computing
http://pathos.rtfd.io
Other
1.38k stars 89 forks source link

BrokenPipeError: [Errno 32] Broken pipe #143

Closed ShaohongBai closed 5 years ago

ShaohongBai commented 6 years ago

I am getting the error of BrokenPipeError: [Errno 32] Broken pipe when using ProessPool module. Could you let me know what is likely to be the issue?

Process ForkPoolWorker-9: Traceback (most recent call last): File "/usr/lib/python3.5/multiprocessing/pool.py", line 125, in worker put((job, i, result)) File "/home/ubuntu/.local/lib/python3.5/site-packages/sklearn/externals/joblib/pool.py", line 386, in put return send(obj) File "/home/ubuntu/.local/lib/python3.5/site-packages/sklearn/externals/joblib/pool.py", line 372, in send self._writer.send_bytes(buffer.getvalue()) File "/usr/lib/python3.5/multiprocessing/connection.py", line 200, in send_bytes self._send_bytes(m[offset:offset + size]) File "/usr/lib/python3.5/multiprocessing/connection.py", line 404, in _send_bytes self._send(header + buf) File "/usr/lib/python3.5/multiprocessing/connection.py", line 368, in _send n = write(self._handle, buf) BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap self.run() File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.5/multiprocessing/pool.py", line 130, in worker put((job, i, (False, wrapped))) File "/home/ubuntu/.local/lib/python3.5/site-packages/sklearn/externals/joblib/pool.py", line 386, in put return send(obj) File "/home/ubuntu/.local/lib/python3.5/site-packages/sklearn/externals/joblib/pool.py", line 372, in send self._writer.send_bytes(buffer.getvalue()) File "/usr/lib/python3.5/multiprocessing/connection.py", line 200, in send_bytes self._send_bytes(m[offset:offset + size]) File "/usr/lib/python3.5/multiprocessing/connection.py", line 404, in _send_bytes self._send(header + buf) File "/usr/lib/python3.5/multiprocessing/connection.py", line 368, in _send n = write(self._handle, buf) BrokenPipeError: [Errno 32] Broken pipe Process ForkPoolWorker-8: Traceback (most recent call last): File "/usr/lib/python3.5/multiprocessing/pool.py", line 125, in worker put((job, i, result)) File "/home/ubuntu/.local/lib/python3.5/site-packages/sklearn/externals/joblib/pool.py", line 386, in put return send(obj) File "/home/ubuntu/.local/lib/python3.5/site-packages/sklearn/externals/joblib/pool.py", line 372, in send self._writer.send_bytes(buffer.getvalue()) File "/usr/lib/python3.5/multiprocessing/connection.py", line 200, in send_bytes self._send_bytes(m[offset:offset + size]) File "/usr/lib/python3.5/multiprocessing/connection.py", line 404, in _send_bytes self._send(header + buf) File "/usr/lib/python3.5/multiprocessing/connection.py", line 368, in _send n = write(self._handle, buf) BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap self.run() File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.5/multiprocessing/pool.py", line 130, in worker put((job, i, (False, wrapped))) File "/home/ubuntu/.local/lib/python3.5/site-packages/sklearn/externals/joblib/pool.py", line 386, in put return send(obj) File "/home/ubuntu/.local/lib/python3.5/site-packages/sklearn/externals/joblib/pool.py", line 372, in send self._writer.send_bytes(buffer.getvalue()) File "/usr/lib/python3.5/multiprocessing/connection.py", line 200, in send_bytes self._send_bytes(m[offset:offset + size]) File "/usr/lib/python3.5/multiprocessing/connection.py", line 404, in _send_bytes self._send(header + buf) File "/usr/lib/python3.5/multiprocessing/connection.py", line 368, in _send n = write(self._handle, buf) BrokenPipeError: [Errno 32] Broken pipe Process ForkPoolWorker-7: Traceback (most recent call last): File "/usr/lib/python3.5/multiprocessing/pool.py", line 125, in worker put((job, i, result)) File "/home/ubuntu/.local/lib/python3.5/site-packages/sklearn/externals/joblib/pool.py", line 386, in put return send(obj) File "/home/ubuntu/.local/lib/python3.5/site-packages/sklearn/externals/joblib/pool.py", line 372, in send self._writer.send_bytes(buffer.getvalue()) File "/usr/lib/python3.5/multiprocessing/connection.py", line 200, in send_bytes self._send_bytes(m[offset:offset + size]) File "/usr/lib/python3.5/multiprocessing/connection.py", line 404, in _send_bytes self._send(header + buf) File "/usr/lib/python3.5/multiprocessing/connection.py", line 368, in _send n = write(self._handle, buf) BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap self.run() File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.5/multiprocessing/pool.py", line 130, in worker put((job, i, (False, wrapped))) File "/home/ubuntu/.local/lib/python3.5/site-packages/sklearn/externals/joblib/pool.py", line 386, in put return send(obj) File "/home/ubuntu/.local/lib/python3.5/site-packages/sklearn/externals/joblib/pool.py", line 372, in send self._writer.send_bytes(buffer.getvalue()) File "/usr/lib/python3.5/multiprocessing/connection.py", line 200, in send_bytes self._send_bytes(m[offset:offset + size]) File "/usr/lib/python3.5/multiprocessing/connection.py", line 404, in _send_bytes self._send(header + buf) File "/usr/lib/python3.5/multiprocessing/connection.py", line 368, in _send n = write(self._handle, buf) BrokenPipeError: [Errno 32] Broken pipe

mmckerns commented 6 years ago

Can you provide a simple bit of code that reproduces your error? A simple bit of code that produces the error is more important than the traceback.

I think the most likely case is that you have an error in your code that is shipped to multiprocess, and the code dies... and thus breaks the pipe. A broken pipe can occur if you don't terminate all processes before returning. Also, there's some additional causes on Windows, but it looks like you are on ubuntu. Regardless, this isn't a normal error... it usually happens when there's an error in the code that causes a breakage elsewhere in the pipeline.

ShaohongBai commented 6 years ago

Hi @mmckerns,

Thank you for the suggestions. The structure of codes are looking like this:

from pathos.pools import ProcessPool pool = ProcessPool(5) try: segment_lists = pool.map(segmentModelMaster, list(args_iteration)) segment_grid_list, segment_evl_list = zip(*segment_lists) except Exception: logger.error('Multi-process individual task failed, aborting!') pool.close() pool.terminate()

where segmentModelMaster is a ML training process that are applied to batch segment we established from data preparation stage. The codes are running fine for other data streams, it only errors for the biggest stream, however the size of total data should not matter as we are segmented the total data in chunks before feeding into the multiprocess training. Each segment has an index within args_iteration, day 1, 2, ..., n, this will be the key inputs that help the function to retrieve data chunk from db and then start modelling.

The machine is 120GB with 16 cores in AWS, we are using ncores =5 in the multiprocess and njobs = 3 within the training model. Hope this helps. I will try to see whether i can give you something else to look at, as it is very big proportion of codes.

Regards

Shaohong

mmckerns commented 6 years ago

@ShaohongBai: the code you are providing above is generic. I meant that it helps if you provide minimal code that I can run that produces the error you are seeing. If you gist a huge sample of code, that's also not terribly helpful (generally). However, if your code works for your other data streams, and only fails on the one data stream... it's not likely an issue of the code itself, it could be something like too big of a data chunk, or trying to pass an empty chunk of data, or there's an error of some sort in the data, or something like that.

You can try things to test... like doing a run that uses multiprocess but only uses one worker... or doing the same with a different kind of map or pool... or looking at dill.copy and dill.check to see if there's some pickling issue.

Again, a broken pipe is typically an indication that there's some other error that's happening on one of the worker processes that breaks the expected sequence of worker process shutdown, so trying the code out serially, or maybe even investigating with pathos.profile might help you uncover what's the root cause.

ShaohongBai commented 6 years ago

@mmckerns, thank you for the tips. I have tried the loop version of the codes, it is running fine, and now is testing ProcessingPool from pathos.multiprocessing.

My gut feeling is might be something to do with long running memory cache in the worker, as for this failed case we are doing 300 segments in the run. Do you think I need do something to fresh the worker after say 20 iterations? I remember we have some maxworksperchild parameter?

mmckerns commented 6 years ago

pathos.pools.ProcessPool and pathos.multiprocessing.ProcessingPool are the same... (the latter is just the deprecated API).

If you think it's a memory issue, it might be worthwhile to try pathos.pools._ProcessPool. The difference there is that the ProcessPool isn't terminated after a close, which can lead to memory leaks when there's too much reuse, and sometimes its better to do a pool._clear() and/or pool.restart(). The _ProcessPool has an exact match to multiprocessing.Pool, so there's a maxtasksperchild setting and the like, and does terminate after a close.

ShaohongBai commented 6 years ago

@mmckerns update from issue.

What basically happen was:

What else you could suggest in this regard?

mmckerns commented 6 years ago

So if you are seeing a memory spike, then yes, that's probably root cause of the broken pipe. In that case you definitely want to use maxtasksperchild and _ProcessPool (or reset/_clear and ProcessPool). Depending on how long each one of the worker tasks takes (and possibly what you need to serialize), you might try one of the other pools -- so there's ParallelPool which has weaker serialization, but the pool is launched through a subprocessing.Popen call, so it's a tad slower but the memory handling is cleaner. There's also a package called pyina (it was originally included in pathos) that provides a MPI-based pool, which also shouldn't have some of the same memory issues, but again, it's typically for larger parallel jobs. Or there's another route, where you could use multiprocess (which is what pathos uses under the covers) and go down to the Process level of control over your workers. In the latter case, you can greatly customize how you manage your workers.

If this sufficiently addresses your concerns, please close the issue.