Open njsmith opened 7 years ago
Oh, in the above discussion I forgot about send_eof
, which can also be called concurrently with the other methods. I think it's exactly like send_all
though, so the same analysis holds.
Another factor that came up in chat: many other frameworks guarantee no interleaving, so new users may expect this. (Possibly without even realizing that this is something they're expecting.)
A potentially serious issue I just realized: this interacts a lot with #54 and #32. Basically the question is: if we have automagic locking inside Stream
, then should it be a regular Lock
or a StrictFIFOLock
? Right now they're equivalent, but if we ever get a smarter scheduler or relax our fairness guarantees then they won't be. Why does this matter? Suppose you have some sans-io protocol wrapper kind of method, like:
async def send_message(self, msg):
encoded = self._sansio_conn.encode_message(msg)
await self._stream.send_all(encoded)
If send_all
uses a StrictFIFOLock
, then this is always correct: we know that the data will eventually be written to the stream in the same order as the calls to _sansio_conn.encode_message
. However, this may be inefficient: e.g. if we have a fancy scheduler and some task overruns its time slice, then not only will it get punished by being made to sleep for an appropriate amount of time, but any tasks that get stuck behind it in the StrictFIFOLock
acquisition queue will also end up being punished. This would be bad. It would be sorta-possible to work around: just put a regular Lock
around the whole method body, and if everyone holds this lock before calling send_all
then the internal StrictFIFOLock
will never be contended and thus become harmless. But I don't like this; the problem and workaround are really obscure.
OTOH if Stream.send_all
internally uses a regular Lock
that doesn't guarantee strict FIFO fairness, then the above code is not necessarily correct. In particular, suppose we have a protocol with something like a sequence number in each message. Then we could have the following sequence:
encoded = _sansio_conn.send_message(msg)
and gets back encoded message number 1, then blocks waiting to acquire the send lockencoded = _sansio_conn.send_message(msg)
and gets back encoded message number 2, then blocks waiting to acquire the send lockAnd now our protocol is broken. Again, the solution would be to put a Lock
around the whole send_message
method.
Well, sounds like StrictFIFOLock
would be consistent with the behavior with other frameworks (which is the main concern here, yes?).
Yes and no. StrictFIFOLock
would have the same behavior as say Twisted about what order data appeared on the wire. But in Twisted you can't have one task calling transport.write()
cause another task calling transport.write()
to block for an arbitrary amount of time for no good reason. StrictFIFOLock
plus a fair scheduler could do that.
One possible solution is to pass in a nursery and a max-pending-messages to the Stream object, along with a serializer sync callback. The Stream would spawn a sender task which reads from a bound queue, serializes and sends messages. It is just for convenience, and basically what the code should always be had the user opted for the lower-level socket API. This solves both the locking and backpressure problems and alleviates needing to check for interleaving or re-ordering of messages as they become 'unlocked'.
Am I missing anything here? Is a solution like that considered too heavy to be part of standard trio?
@parity3 A nursery and a queue seems like a lot of machinery to -- basically reimplement a simple Lock
? I don't think it gives you any benefits over that? (Except that it allows you to add some buffering, but I don't think buffering is useful here.)
It's an interesting idea to tell the stream how to serialize messages, so that (using my example from above) it could effectively move the call to _sansio_conn.encode_message
inside its internal Lock
. But I think this mixes layers in a way that will cause awkwardness: what if you need to pass extra arguments to the encode function, what if you want multiple encode functions for different messages types, what if the encode function needs to be async, etc. etc. Doing the locking by hand isn't hard; the hard part is doing something sensible for people who don't realize there's a problem to worry about.
Also, something to keep in mind is that Stream
is an abstract interface with multiple implementations (SocketStream
, SSLStream
, StapledStream
, memory streams for testing, etc.), so even if we do think there's value in having a reusable "serialize a message and then send it on a Stream
with appropriate locking" abstraction, it's probably best implemented as a single implementation that can be wrapped around any Stream
, rather than something that has to be redone inside each Stream
implementation.
The queue usage was supposed to address the ordering of the messages but I now realize the problem still remains once the queue becomes full; ie you're back to the StrictFIFOLock
plus fair scheduler requirement.
In terms of the serializer idea, maybe that's too magical and restrictive like you said. Perhaps something like this would do the trick:
async with stream.sending_FIFO_lock as stream_sender:
serialized = await seralize(data)
await stream_sender.send(serialized)
Yes that's an extra line required in userland but honestly, the developer really needs to be educated as to what can go wrong when they have a multiple-tasks-sending-to-the-same-stream problem. IMO it's hard for me to think of a design where a single sender 'agent' would not be in the picture, especially with stateful communications. So we're probably trying to handle a case that may not be very common, and hence we can afford an extra line in order to be more explicit to the user about what's really going on here.
For higher level abstractions and connections that are more built to have multiple senders like http/2, this isn't really a problem because the API will be built in such a way that these problems will be solved in the library (either via sans-IO or otherwise).
Huh, yeah, that's another interesting option we haven't considered: writing the API in such a way that the user is forced to take the lock before they can call send_all
. Sort of rust-ish :-)
It has some advantages, but on consideration I think it's probably a little too awkward. Probably most streams are only used from one task, in which case this would force some unnecessary complexity into user code. And I can imagine that on the other end of the scale, really complex code might want to use some bespoke synchronization strategy to work with multiple streams at once, and then being forced to acquire a lock on each stream might make things even trickier (e.g. maybe it could create spurious lock ordering problems?).
http/2
Yeah, HTTP/2 is a complicated beast that needs much more elaborate cross-task synchronization then a stream can provide by default. (In particular because of implicit channel id assignment and flow control.) One thing I'm struggling with here is to get a clear picture of exactly what the situations are where multiple tasks might be calling send_all
. There are highly stateful protocols like HTTP/1, SMTP, or the postgres/mysql/redis/etc. protocols, where you generally only want one task working with a given stream at a given time, possibly with some higher-level system like a "connection pool" for allowing streams to be handed off between tasks. There are arbitrarily complex protocols like HTTP/2. There's websockets, which have a very simple stateless framing that might benefit from automagic locking in Stream
... though even here, you might want to fragment a large message into multiple frames and allow these to be interleaved with PING/PONG frames, but not other message frames, which would require custom synchronization. (I guess you'd want a message lock and a frame lock, and the latter is the one that could potentially be handled automatically by Stream
.) Are there other popular protocols that look like websocket in that they're easy to write in a way that's correct with implicit locking inside send_all
but broken otherwise?
The way I'm understanding the purpose @njsmith desires for Streams, we want them to hide some rough edges and implementation details on top of the low-level transport layer API, so that we have a uniform interface across implementations. Now, the abstraction provided by Streams is not very distant from the one provided by, e.g., sockets, or TLS sockets. I appreciate that, because I don't have to learn a new abstraction in order to use Streams and they are actually solving some problems for me.
send_all()
is kind of an exception though, because it doesn't map to a low-level equivalent. What we see here is an abstraction mismatch. On one side, you have send()
which is your interface to fill a socket send buffer in the OS (or the equivalent for other stream types). You ask the system to try to add data to the buffer that will be used to send data. What do we have on the other hand? The suggestion of a send_all()
with implicit locking is that you want to see a Stream object that acts as a pipe where you can just pour data in, and in case data cannot be accepted in its entirety yet just wait. This introduces some additional buffering (implicit in the pending send_all()
calls), which is seen as convenient because it allows to decouple production of data and sending of data. Data producers only have to worry about, well, producing data, and awaiting send_all()
to modulate their data production rate implicitly and/or providing backpressure.
I know I'm very likely to be missing use cases and details, but it seems to me that this model is actually convenient and correct only if the following assumptions about the application are true: 1) data producers' basic unit of data is unbreakable chunks of bytes of any length 2) chunks of bytes are to be sent in the order in which they are produced 3) I don't need control over buffering behavior
Any case in which a stream is controlled by a single task at a time, or you have a single data producer for that stream, is trivially adaptable to these requirements. In those cases send_all()
behaves de facto like socket.sendall()
, or like repeating calls to send()
until all data is through. But locking is not necessary for this.
More complicated cases are those with multiple data producers for a given stream. My argument is that the case for unbreakable chunks of bytes is not so universal as it might seem at first glance. If you have multiple producers you likely handle something like frames, not bytes. If those multiple producers are not synchronized (otherwise you wouldn't have the issue in discussion) you most likely don't need strict ordering. You might be ok with implicit infinite buffering if your number of tasks is bounded (which is likely a common case, e.g. by network or monitoring code or by number of sources), but you lose some control and you might find yourself in those situations when a task is waiting for another task which is sending a batch of 1000 frames.
(I cannot envision a case with multiple producers in which you don't deal with frames, even if these frames are exactly 1-byte long. Anything I'm missing?)
A stream is not a multiplexer, and a multiplexer is a specialized component that might implement behavior specific to your application code. Serializing chunks of bytes poured down a stream is a naive form of multiplexing, and not a universal one. I'm criticizing this because this seems like an invitation to corrupt abstractions in a way which is not immediately wrong, and maybe produces acceptable results, but introduces design flaws. As an extra, this might mask bugs when a program has multiple tasks writing to a stream by accident, and not by design.
I don't have much to comment on the argument that other libraries implement the same behavior proposed here (implicitly locking). If it's dangerous, it's possibly dangerous in other libraries as well. I reject the notion that a locking send_all()
is needed for feature parity or anything like that, because implementing an explicit lock is trivial, and because there might be other ways to be explored here. Especially in the wake of the increasing interest in sans-io protocols.
When I discussed this in chat I proposed that the problem of multiplexing should be explicitly left to protocol code, which has the means to decide the best policy for prioritizing and buffering. If it turns out that this is such a common case, then trio might decide to include a basic multiplexer (as an additional layer, or built into streams) later.
tl;dr: no because it incentivizes bad separation of responsibilities and masks some bugs.
I think the resolution here might depend on the resolution of #467.
One consideration I hadn't fully realized when writing my comments above: if you have multiple tasks using the same stream, calling send_all
directly can be problematic, because as soon as any send_all
is cancelled, your whole stream is corrupted and can't be used. So I guess that's another reason why you need to think carefully when sharing a stream object between multiple tasks, and having an implicit lock wouldn't actually fix all your problems.
I came here because I was just wondering to myself why streams don't have a built-in lock. I am persuaded by the argument that automatic serialization of sends won't prevent all bugs, and cancellation with multiple senders is inherently complex.
This might be addressed better with documentation.
trio.BusyResourceError – if another task is already executing a send_all(), wait_send_all_might_not_block(), or HalfCloseableStream.send_eof() on this stream.
It might be helpful to have a note for this method that says, "calling send_all()
from multiple tasks is complicated, so we recommend that you design your code to send from a single task, if possible. But if you really want to, here are some things to keep in mind..."
Most low-level operations in trio provide a guarantee: if they raise trio.Cancelled, this means that they had no effect
I didn't understand this passage at first, because of the phrase "if they raise trio.Cancelled", which makes it sound like it got cancelled for some internal reason. From the user's point of view, it would make more sense to say, "if a call to send_all()
is cancelled..."
It might be helpful to have a note for this method that says, "calling
send_all()
from multiple tasks is complicated, so we recommend that you design your code to send from a single task, if possible. But if you really want to, here are some things to keep in mind..."
Hmm. I'd hesitated to make a generic recommendation either way, because this is complicated enough that I wasn't sure we actually knew what the generic recommendation should be :-). But "you should generally only call send_all
from a single task" does cover both the cases where a single task owns the connection (like asks.get
) and cases where multiple tasks want to use the connection in a cancel-safe way, so they have to rely on a background task to do the actual send_all
(like trio-websocket). And maybe those really are pretty much all the cases you'd want to use in practice.
I guess there is a third one, which is really the asks.get
case but just different enough that people might get confused: in libraries that do connection pooling, like asks
or a sql client library or whatever, the rule isn't actually "only one task should own the stream", it's "only one task should own the stream at a time (but it's OK if which task owns it changes)". So if we're going to make a recommendation like this, we should think about how to phrase it to avoid confusing people.
I didn't understand this passage at first, because of the phrase "if they raise trio.Cancelled", which makes it sound like it got cancelled for some internal reason. From the user's point of view, it would make more sense to say, "if a call to send_all() is cancelled..."
That's... a really good point. I opened #885 to discuss further.
It seems like we're converging on an idiom where if you want to speak a protocol over a Stream, you wrap it with some background task(s) that provide a Channel interface, and then sending and receiving from multiple tasks just works. There's definitely still a lot to be ironed out about how to provide such interfaces robustly, but I'm not sure this issue is the best place to do it. Given the cancellation issues with Stream.send_all(), requiring send_all() calls to use an implicit lock seems like it would confuse more than help. If others agree, maybe we can close this? (or change it into a doc issue about clarifying why send_all() has the semantics it does)
[Original title: Should SendStream automatically serialize calls to send_all?]
(Context: discussion with @agronholm at https://gitter.im/python-trio/general?at=59c03f58c101bc4e3ae28c81)
Currently, if two tasks call
SendStream.send_all
at the same time, then the second one raises an error, and if you want to write to the same stream from multiple tasks then you have to protect the stream with aLock
. We could alternatively require that stream implementations make this "just work", basically moving the lock insidesend_all
.Changing this wouldn't be a compat breaker for the users of the
Stream
API, because it would convert an error into something that works. But it would be a compat breaker for implementors of theStream
API, because now their users might start depending on this new behavior. So we should probably make a decision sooner rather than later.receive_some
For
receive_some
, I don't think this makes sense; if you have multiple tasks trying to read from the same stream then you generally need to rethink things. (How do you make sure that the correct bytes go to the correct task? There's no easy answer.) Of course, it's also hard to makereceive_some
actually broken even if we did allow concurrent calls – there's no equivalent to "interleaving" like can happen withsend_all
. But it will raise errors sometimes if there's no explicit locking, becausetrio.hazmat.wait_readable
raises errors if two tasks try to block in it at the same time. ...I guess really this is exactly the same cases where it raises an error now with the explicit conflict detection though, give or take acheckpoint
.send_all - is it possible?
For one task calling
send_all
and another callingwait_send_all_might_not_block
, or two tasks callingwait_send_all_might_not_block
... bleh. Giving an error is fairly reasonable, but maybe we can do better. If we allow two tasks to concurrently do:then we should probably also allow two tasks to concurrently do:
which would mean supporting that all combinations of
send_all
andwait_send_all_might_not_block
.What if we simply protected both methods with the same lock?
wait_send_all_might_not_block
is holding the lock,send_all
arrives:send_all
ends up blocking untilwait_send_all_might_not_block
returns. Ok, sure, by definition this was going to happen anyway. Thesend_all
might mean that whoever calledwait_send_all_might_not_block
is surprised because it does block, but that's part of the contract anyway (hence the wordmight
)send_all
is holding the lock,wait_send_all_might_not_block
arrives: an immediate call tosend_all
would block until it got the lock, sowait_send_all_might_not_block
should block until the lock is available. OK.wait_send_all_might_not_block
is holding the lock,wait_send_all_might_not_block
arrives: this is a little weird, but I guess it works out ok. The second one can't proceed until the first one returns. But we know that the first one will return as soon assend_all
might not block, so this can't directly cause the second one to block longer than it should have. And then when it finally gets the lock, it should detect the same stream state that the first one did, and return immediately. (Unless something else happened in the mean time to change the state, but in that case blocking longer is appropriate anyway.)is it a good idea?
So, it looks like we could potentially change this. Is it a good idea?
In theory a
trio.Lock
is a little more heavyweight than atrio._util.ConflictDetector
, but the difference is pretty minor. Mostly theLock
needs a bit more memory to hold theParkingLot
used when there's contention; aParkingLot
is just an object holding anOrderedDict
.OrderedDict
is bigger than you'd think (sys.getsizeof(OrderedDict()) == 416
on my laptop), but we could allocate it lazily if it really became an issue, andStream
s are somewhat substantial objects anyway (they generally hold kernel buffers, etc.). And acquiring/releasing an uncontendedLock
is barely any more expensive than acquiring/releasing aConflictDetector
. One difference is that blocking to get aLock
requires async context, but forsend_all
and friends this is fine.A bigger question is whether this actually gives more friendly semantics. It's certainly convenient for message-based protocols like websocket, where you might want to have a
WSConnection
object where you doawait conn.send_message("...")
which packs that string into a websocket frame and passes the frame into the underlying byte-stream in a single call -- right now this requires explicit locking if you wantsend_message
to be usable from different tasks, and ifsend_all
did its own locking then potentially it wouldn't. Specifically, it would work OK if you make sure to writesend_message
so that all the internal protocol state manipulation happens synchronously, and then thesend_all
call happens at the end. This is probably the most natural way to write this (especially if working on top of a sans-io library like wsproto), but it's certainly possible to get it wrong if you don't pay attention. OTOH if streams require users to do their own locking, then the natural way to do this locking is to put it around the wholesend_message
body, and then you don't have to worry about auditingsend_message
to make sure that it contains exactly one checkpoint.There are also ways of working with streams that are inherently task-unsafe regardless of what kind of implicit locking we do. E.g. still with the websocket example, someone could write:
Now this method is definitely not safe to call concurrently from multiple tasks. If you do it anyway, then with the current design, it may or may not raise an error and point out the problem; with implicit locking, it definitely never raises an error. So arguably the current design does better here?