Closed ef6b2a61-f027-4805-a66a-cde4eee277c3 closed 5 years ago
It would be terribly nice if the Popen class in the subprocess module would allow a programmer to easily say "send some data right now (if I have some to send) and receive whatever information is available right now". Essentially the equivalent of asyncore.loop(count=1), only that it returns the data received, instead of placing it in a buffer.
Why would this functionality be useful? Currently, when using the subprocess module with pipes, the interaction with a pipe is limited to "send data if desired, close the subprocess' stdin, read all output from the subprocess' stdout, ...".
Certainly one can pull the pipes out of the Popen instance, and perform the necessary functions on posix systems (with select or poll), but the additional magic on WIndows is a little less obvious (but still possible).
There is a post by Paul Du Bois with an example using win32pipe.PeekNamedPipe: http://groups-beta.google.com/group/comp.lang.python/msg/115e9332cc1ca09d?hl=en
And a message from Neil Hodgeson stating that PeekNamedPipe works on anonymous pipes: http://mail.python.org/pipermail/python-dev/2000-May/004200.html
With this modification, creating Expect-like modules for any platform, as well as embedded shells inside any GUI with a text input widget, becomes easy. Heck, even Idle could use it rather than a socket for its interactive interpreter.
Logged In: YES user_id=473331
More control of sub-processes would be great. Several times in the past few years I've done stuff with os.system, popenN, or spawn* and it would be useful to have even more cross-platform consistency and support in this area. Anyway, this is just a vote to encurage other developers.
Logged In: YES user_id=341410
I've got a version of subprocess that has this functionality with pywin32. Making it work on *nix systems with usable select is trivial.
About the only question is whether the functionality is desireable, and if so, what kind of API is reasonable.
Perhaps adding an optional argument 'wait_for_completion' to the communicate method, which defaults to True for executing what is currently in the body of communicate.
If the 'wait_for_completion' argument is untrue, then it does non-waiting asynchronous IO, returning either a 2 or 3-tuple on completion. If input is None, it is a 2-tuple of (stdout, stderr). If input is a string, it is a 3-tuple of (bytes_sent, stdout, stderr).
How does that sound?
Logged In: YES user_id=341410
I suppose I should mention one side-effect of all this. It requires three more functions from pywin32 be included in the _subprocess driver; win32file.ReadFile, win32file.WriteFile, and win32pipe.PeekNamedPipe . Read and Peek are for reading data from stdout and stderr, and Write is for the support for partial writes to stdin.
Logged In: YES user_id=341410
How about if subprocesses have 3 new methods, send(input), recv(maxlen), and recv_stderr(maxlen).
send(input) would perform like socket.send(), sending as much as it currently can, returning the number of bytes sent. recv(maxlen) and recv_stderr(maxlen) would recieve up to the provided number of bytes from the stdout or stderr pipes respectively.
I currently have an implementation of the above on Windows and posix. I include the context diff against revision 1.20 of subprocess.py in current CVS.
Logged In: YES user_id=341410
I've implemented this as a subclass of subprocess.Popen in the Python cookbook, available here: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
Essentially this is a request for inclusion in the standard library for Python 2.5 .
Logged In: YES user_id=341410
I've implemented this as a subclass of subprocess.Popen in the Python cookbook, available here: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554
I would also like to see this feature. I'm using Josiah Carlson's recipe for the time being. I'm agnostic about whether the asynchronicity is a "mode" or just a case of using different functions. However, if the former is chosen, then one should be able to switch modes at will, because sometimes I want blocking and sometimes I don't.
The way subprocess is currently written, if you were to use a blocking semantic, you would no longer be able to use an asynchronous semantic afterwards (it will read the result until there is nothing more to read). Note that this is the case whether it is mode or method based.
If there were a blocking "read x bytes" type call, could you not do some non-blocking stuff afterwards? If you use the "read" method, with a byte limit, directly on the stdout member of Popen, that could return with your bytes, and then you do some non-blocking stuff afterwards. That's the external view of course, I suspect that you're saying there's some internal lower-level reason this is currently not possible.
Thanks for your interim class BTW, it is proving to be very useful.
Unless you use PeekNamedPipe on Windows, there is no guarantee that the pipe will return *any* data until the process ends, even if you say pipe.read(1) and there is 1k of data already sent.
Note that the two async reading methods that I provide (recv() and recv_err()) take a 'maximum bytes' argument. Just like I have written a 'send_all()' utility function, I (or you) can easily write a 'recv_exact()' utility function that receives an exact number of bytes before returning. That functionality would be more or less required for one of the expected use-cases I specify in the recipe, "writing a multi-platform 'expect' module".
Stick with async calls (with the utility recv_exact()) until you need to use the .communicate() method.
Josiah: can this be closed?
I don't believe this should be closed. The functionality is still desired by me and others who have posted on and off since the patch was created. This patch definitely needs some love (tests mostly).
Hello, I am working on this patch and some additional features for my Google Summer of Code project (subdev.blogspot.com) and will eventually attempt to get the code committed to Python 3.1 or 3.2 and Python 2.7. I will have the unit tests completed shortly and will post a patch for Python 2.7 and / or Python 3.1rc1.
Retargeting to 3.2 since 3.1 is now feature-frozen.
PEP-3145 has been written in response to this request.
What is the status? What is required to get this into 3.3? See also bpo-14872.
Comments on Josiah's patch:
It uses pywin32 for PeekNamedPipe -- this is now available from _winapi.
I don't think send(), recv() and recv_exact() will work correctly if buffering is used -- an error should be raised in this case.
I think send(), recv(), recv_exact() should raise errors if the associated stream is None instead of returning 0 or ''.
Additional comments on http://code.google.com/p/subprocdev
In a few places the code does "raise Exception(...)" -- some other exception class should be used.
It is not clear to me why TextIOWrapper should try to implement seek() and tell(). (The file-like wrapper for a socket does not.)
I don't like the hardwired timeouts for faking asynchronicity: Popen.asyncread(), Popen.asyncwrite() use 0.1 seconds, and TextIOWrapper.read() uses 1.25 seconds.
There is no binary counterpart to TextIOWrapper.
Using fcntl.fcntl() to switch back and forth between blocking and unblocking is unnecessary. select() guarantees the read/write on a pipe will succeed without blocking. (The same is not necessarily true with a socket.)
__closecheck() should be renamed _closecheck().
FileWrapper is just a wrapper for TextIOWrapper, except that it has an ignored newlines argument.
I can't see any documentation.
Personally, I would factor out the code for Popen.communicate() in to a Communicator class which wraps a Popen object and has a method
communicate(input, timeout=None) -> (bytes_written, output, error)
On Windows this would use threads, and on Unix, select.
There's documentation, but you have to switch to the Python 3k branch -- http://code.google.com/p/subprocdev/source/browse/?name=python3k#hg%2Fdoc.
As for the other criticisms, I'm sure there are plenty of things that need to be improved upon; I was not a very experienced when I started the project.
Personally, I would factor out the code for Popen.communicate() in to a > Communicator class which wraps a Popen object and has a method
communicate(input, timeout=None) -> (bytes_written, output, error)
How would this differ from the normal communicate()?
It seems like there are two different ideas for why people want an "asynchronous subprocess":
One is that they want to use communicate() but not be limited by memory issues. I think a good API for this case is an asyncore style API or like the one from the patch in bpo-1260171.
Another use case is for an expect-type interface where you read and write based on a timeout or some kind of delimiter like a newline.
These should probably be addressed independently.
See also bpo-10482.
How would this differ from the normal communicate()?
It would block until one of the following occurs:
The normal communicate() could be implemented by running this in a loop. The amount of data returned at once could be limited by an argument of the constructor.
Can anybody write a sum up on this?
Due to some rumblings over on the mentors list and python-dev, this is now getting some love.
Guido has stated that something should make it into the subprocess module for 3.5 in this email: https://groups.google.com/d/msg/dev-python/I6adJLIjNHk/Usrvxe_PVJIJ
His suggested API is: proc.write_nonblocking(data) data = proc.read_nonblocking() data = proc.read_stderr_nonblocking()
I've implemented the API for Windows and Linux, and below are my findings.
On the Linux side of things, everything seems to be working more or less as expected, though in writing tests I did need to add an optional argument to qcat.py in order to have it flush its stdout to be able to read from the parent process. Odd, but whatever.
Also, Richard Oudkerk's claims above about not needing to use fcntl to swap flags is not correct. It's necessary to not block on reading, even if select is used. Select just guarantees that there is at least 1 byte or a closed handle, not that your full read will complete.
On the Windows side of things, my assumptions about WriteFile() were flawed, and it seems that the only way to make it actually not block if/when the outgoing buffer is full is to create a secondary thread to handle the writing*. I've scoured the MSDN docs and there doesn't seem to be an available API for interrogating the pipe to determine whether the buffer is full, how full it is, or whether the write that you'd like to do will succeed or block.
Considering that the Windows communicate() method already uses threads to handle reading and writing, I don't believe it is a big deal to add it for this situation too. Any major objections?
Using asyncio and the IOCP eventloop it is not necessary to use threads. (Windows may use worker threads for overlapped IO, but that is hidden from Python.) See
https://code.google.com/p/tulip/source/browse/examples/child_process.py
for vaguely "expect-like" interaction with a child python process which works on Windows. It writes commands to stdin, and reads results/tracebacks from stdout/stderr.
Of course, it is also possible to use overlapped IO directly.
Had some time to work on this today.
I was missing something in my earlier versions of the code, and have managed to get overlapped IOs to work, so at least I'm not quite so far behind the dozen or so core developers who know more about the Windows pieces than I do. Richard, thank you for the post, I wasn't looking hard enough for how to get overlapped IOs to work, and your message made me look harder.
On Linux, it is trivial to support the blocking communicate() and non-blocking additions. There's only one weirdness, and that's the fcntl bit flipping during write.
On Windows, it's not all that difficult to switch to using overlapped IOs for *all writes, and maybe even for communicate()-based reads, which would remove the need for threads. Ironically enough, non-blocking reads actually *don't require overlapped IO thanks to PeekNamedPipe, which could even be used to cut the number of extra threads from 2 to 1 in communicate().
Now that I've got it working, I can do one of the following (from least changes to the most):
Unless someone brings up an important counterpoint, I'll work on #3 tonight or tomorrow evening to get an initial code/test patch, with docs coming shortly after (if not immediately).
multiprocessing.connection uses the _winapi module for overlapped I/O and named pipes in case you're looking for examples:
http://hg.python.org/cpython/file/3.4/Lib/multiprocessing/connection.py
I would recommended using _overlapped instead of _winapi. I intend to move multiprocessing over in future.
Also note that you can do nonblocking reads by starting an overlapped read then cancelling it immediately if it fails with "incomplete". You will need to recheck if it completes anyway because of a race.
Quick update before I head to bed.
Thank you for the input, I had gotten the individual async calls working a couple days ago, and I was just working to replace the communicate() method for Windows.
Yes, I'm using asyncio._overlapped, though asyncio uses subprocessing, so I needed to defer import of _overlapped until one of a couple crucial methods were being called in order to prevent issues relating to circular imports.
I also ended up moving asyncio.windows_utils.pipe out to subprocess, as copying/pasting it for a third 'create an overlapped-io pipe' implementation for the standard library just doesn't make a lot of sense, and another circular import seemed like a bad idea. If/when subprocess goes away completely, someone else can move the function back.
With overlapped IOs able to be cancelled, it's not all that bad to make a completely re-entrant communicate() without threads, though I made a few mistakes in my first pass tonight that I need to fix tomorrow.
All of the standard tests plus another few that I added all pass on Windows and Linux. I've got some cleanup and a couple more tests to add tomorrow, then I'll post a patch.
I ended up not using any overlapped IO cancellation in the Windows variant of communicate(), as there is no benefit to doing so, and the function is as re-entrant as the original.
Should have uploaded this yesterday, but I got caught up with typical weekend activities. The docs probably need more, and I hear that select.select() is disliked, so that will probably need to be changed too.
Some initial comments here: http://bugs.python.org/review/1191964/#msg1. Also, if I think about integrating this into an IO loop it seems natural to me to think about timeouts. How complicated would it be to do this?
data = proc.read_nonblocking(timeout=0)
data = proc.read_stderr_nonblocking(timeout=0)
Also, Richard Oudkerk's claims above about not needing to use fcntl to swap flags is not correct. It's necessary to not block on reading, even if select is used. Select just guarantees that there is at least 1 byte or a closed handle, not that your full read will complete.
On Linux, fcntl
is not necessary if you use os.read(pipe, bufsize)
after select
instead of pipe.read(bufsize)
. os.read
may just return less than bufsize
bytes if they are not available.
Could read_nonblocking()
, write_nonblocking()
raise OSError(EAGAIN) (it could be named ResourceTemporarilyUnavailableError
) if read/write would block?
It would allow to distinguish EOF (permanent condition) from "read/write would block" (temporarily condition) without additional API calls.
Submitting an updated patch addressing Giampaolo's comments.
Can you explain why you write in 512 byte chunks. Writing in one chunk should not cause a deadlock.
I added the chunking for Windows because in manual testing before finishing the patch, I found that large sends on Windows without actually waiting for the result can periodically result in zero data sent, despite a child process that wants to read.
Looking a bit more, this zero result is caused by ov.cancel() followed by ov.getresult() raising an OSError, specifically: [WinError 995] The I/O operation has been aborted because of either a thread exit or an application request
That causes us to observe on the Python side of things that zero data was sent for some writes, but when looking at what the child process actually received, we discover that some data was actually sent. How much compared to what we thought we sent? That depends. I observed in testing today that the client could receive \~3.5 megs when we thought we had sent \~3 megs.
To make a long story short-ish, using Overlapped IO with WriteFile() and Overlapped.cancel(), without pausing between attempts with either a sleep or something else results in a difference in what we think vs. reality roughly 87% of the time with 512 byte chunks (87 trials out of 100), and roughly 100% of the time with 4096 byte chunks (100 trials out of 100). Note that this is when constantly trying to write data to the pipe. (each trial is as many Popen.write_nonblocking() calls as can complete in .25 seconds)
Inducing a 1 ms sleep between each overlapped.WriteFile() attempt drops the error rate to 0/100 trials and 1/100 trials for 512 byte and 4096 byte writes, respectively. Testing for larger block sizes suggests that 2048 bytes is the largest send that we can push through and actually get correct results.
So, according to my tests, there isn't a method by which we can both cancel an overlapped IO while at the same time guaranteeing that we will account exactly for the data that was actually sent without adding an implicit or explicit delay. Which makes sense as we are basically trying to interrupt another process in their attempts to read data that we said they could read, but doing so via a kernel call that interrupts another kernel call that is doing chunk-by-chunk copies from our write buffer (maybe to some kernel memory then) to their read buffer.
Anyway, by cutting down what we attempt to send at any one time, and forcing delays between attempted sends, we can come pretty close to guaranteeing that child processes don't have any sends that we can't account for. I'll try to get a patch out this weekend that encompasses these ideas with a new test that demonstrates the issue on Windows (for those who want to verify my results).
I added some comments.
Your problem with lost data may be caused by the fact you call ov.cancel() and expect ov.pending to tell you whether the write has/will succeed. Instead you should use ov.getresult() and expect either success or an "aborted" error.
No, the problem is that that ov.cancel() will attempt to cancel the IO, but a subsequent ov.getresult(True) doesn't always return what was *actually written to the pipe unless you explicitly wait for the result to be available. But even if you explicitly wait for ov.pending to be False, even if you alternate between checking for ov.pending to be False and for WaitForSingleObject() to return that the event was signaled (which may never actually happen, according to my tests), ov.getresult(True) could *still raise an exception (that same WinError 995), and we *still* don't know how much data was sent.
As one of your comments on subprocess_2.patch, you said:
Should probably add an assertion that 512 bytes was written.
That's not always the case. I found several odd byte writes, and some writes that were whatever blocksize I'd chosen minus 32 bytes (though not reported on the write side due to the aforementioned exception/counting error, but the child process read an odd number of bytes).
How about you take a look at the patch I'm uploading now. It gets rid of the loop in write_nonblocking(), as per Giampaolo's request adds a blocksize parameter on reading, and because I was in there, I added a timeout to writing.
If in this new patch I'm doing something wrong, and you can explain via code how to guarantee that ProcessTestCase._test_multiple_passes doesn't fail, I'd really appreciate it.
I ended up eliminating the overlapped IO cancel call on Windows. Better to be correct than to minimize internal state. Instead, we keep a reference to the overlapped IO object, and any attempts to write to the child stdin before the previous overlapped IO completes are kicked back as a 0 byte write. The communicate() method does the right thing with pre-existing non-blocking writes, whether input is passed or not.
I also eliminated universal_newline support for non-blocking reads and writes to prevent error conditions on edge cases:
On the write side of things, you could end up with a partial multi-byte character write, which with universal newlines, would be impossible to finish the send using the public API without first modifying state attributes on the Popen object (disabling universal newlines for a subsequent bytes write_nonblocking() call).
On the read side of things, if you get a partial read of a multi-byte character, then the subsequent decode operation would fail with a UnicodeDecodeError. Though you could pull the original bytes from the exception, that's an awful API to create.
I suggest to change the title of the issue to: "subprocess: add non-blocking read and write methods" to avoid the confusion with asyncio subprocess module which runs read and write "in the background" for you.
I started to review the patch 5: http://bugs.python.org/review/1191964/#ps11598
When I read unit tests, I realized that I don't like "write_nonblocking" name. It's too generic. A process has many files (more than just stdin, stdout, stderr: see pass_fds parameter of Popen). I would like an explicit "write_stdin_nonblocking" and "read_stdout_nonblocking".
Victor, I addressed the majority of your comments except for a couple stylistic pieces. Your annoyance with the short poll time for Windows made me re-read the docs from MS, which made me realize that my interpretation was wrong. It also made me confirm Richard Oudkerk's earlier note about ReadFile on overlapped IOs.
I left similar notes next to your comments.
On the method naming side of things, note that you can only write to "stdin", and you can only read from "stdout" or "stderr". This is documented behavior, so I believe the method names are already quite reasonable (and BDFL approved ;)).
If you use the short timeouts to make the wait interruptible then you can use waitformultipleobjects (which automatically waits on an extra event object) instead of waitforsingleobject.
Richard: short timeouts are no longer an issue. Nothing to worry about :)
First off, thank you everyone who has reviewed and commented so far. I very much appreciate your input and efforts.
Does anyone have questions, comments, or concerns about the patch? If no one mentions anything in the next week or so, I'll ping the email thread.
@Josiah: Modifications of the asyncio module should be done first in the Tulip project because we try to keep the same code base in Tulip, Python 3.4 and 3.5. You may duplicate the code the avoid this dependency?
For the documentation, I don't think that you need ".. note" and ".. warning", just write paragraphs of text.
I submitted an issue to the tulip/asyncio bug tracker: https://code.google.com/p/tulip/issues/detail?id=170
And I am uploading a new patch that only includes non-tulip/asyncio related changes, as tulip/asyncio changes will eventually be propagated to Python.
FWIW while the Tulip changes should indeed go into the Tulip repo first, the rest should be committed to the CPython 3.5 tree first. Then I'll commit the Tulip changes, first to the Tulip repo, then to the CPython 3.4 branch (yes!) and then merge that into the 3.5 (default) branch. (Yes, I know it's complicated, but it beats other ways of keeping the Tulip and CPython trees in sync. And I have a script in the Tulip tree that automates most of the work.)
Does anyone have questions, comments, or concerns about the patch?
It seems the current API doesn't distinguish between BlockingIOError (temporary error), BrokenPipeError (permanent error) and EOF (permanent non-error condition) -- everything is treated as EOF. Is it intentional?
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields: ```python assignee = None closed_at =
created_at =
labels = ['type-feature', 'library', 'expert-asyncio']
title = 'add non-blocking read and write methods to subprocess.Popen'
updated_at =
user = 'https://github.com/josiahcarlson'
```
bugs.python.org fields:
```python
activity =
actor = 'vstinner'
assignee = 'none'
closed = True
closed_date =
closer = 'josiahcarlson'
components = ['Library (Lib)', 'asyncio']
creation =
creator = 'josiahcarlson'
dependencies = []
files = ['8252', '34743', '34774', '34794', '34851', '34861', '34941', '35407', '38691']
hgrepos = []
issue_num = 1191964
keywords = ['patch']
message_count = 64.0
messages = ['54505', '54506', '54507', '54508', '54509', '54510', '54511', '54512', '54513', '54514', '54515', '84692', '84719', '89309', '89433', '114495', '161326', '161352', '161356', '161362', '161400', '161409', '177139', '215171', '215178', '215411', '215420', '215426', '215496', '215582', '215668', '215708', '215709', '215711', '215826', '215833', '215956', '215969', '215990', '216226', '216442', '216445', '216665', '216669', '216692', '218826', '218861', '219373', '219374', '219432', '219616', '219656', '223799', '223827', '223828', '238874', '238887', '239277', '239280', '239282', '239299', '239366', '239372', '341245']
nosy_count = 21.0
nosy_names = ['gvanrossum', 'josiahcarlson', 'astrand', 'parameter', 'vstinner', 'techtonik', 'giampaolo.rodola', 'ajaksu2', 'ooooooooo', 'v+python', 'r.david.murray', 'cvrebert', 'ericpruitt', 'akira', 'Andrew.Boettcher', 'rosslagerwall', 'sbt', 'martin.panter', 'janzert', 'yselivanov', 'eryksun']
pr_nums = []
priority = 'normal'
resolution = 'out of date'
stage = 'resolved'
status = 'closed'
superseder = None
type = 'enhancement'
url = 'https://bugs.python.org/issue1191964'
versions = ['Python 3.5']
```