Open njsmith opened 5 years ago
I literally just wrote one of these yesterday, with the interface
@attr.s(auto_attribs=True)
class BufferedReceiveStream(trio.abc.AsyncResource):
transport_stream: trio.abc.ReceiveStream
chunk_size: int = 4096
async def aclose(self) -> None: ...
async def receive(self, num_bytes: int) -> bytes: ...
async def receive_all_or_none(self, num_bytes: int) -> Optional[bytes]: ...
async def receive_exactly(self, num_bytes: int) -> bytes: ...
class TextReceiveStream(trio.abc.AsyncResource):
transport_stream: trio.abc.ReceiveStream
encoding: str
errors: Optional[str]
newlines: Union[str, Tuple[str, ...], None]
chunk_size: int
def __init__(
self,
transport_stream: trio.abc.ReceiveStream
encoding: Optional[str] = None,
*,
errors: Optional[str] = None,
newline: Optional[str] = "",
chunk_size: int = 8192
): ...
async def aclose(self) -> None: ...
async def __aiter__(self) -> AsyncIterator[str]: ...
async def receive_line(self, max_chars: int = -1) -> str: ...
I haven't tested it yet and I'm not totally sure about the interface, but I'll post code once I'm more convinced it works if people think either of these interfaces would be useful. The three receive methods in BufferedReceiveStream only differ in how they handle EOF; I wrote it to help with "you have a length and then that many bytes of data" type binary protocols.
This may go without saying, but just in case I'll say it anyway :-): I'm being a bit cautious in the issue because if we add something directly to trio itself, then we want to make sure it's the right thing we can support for a long time. But for helpers and utilities and stuff that's outside trio, that doesn't apply, so everyone should at the least feel free to implement what they find useful, share it, whatever.
In fact it can only be useful to see examples of what people come up with...
Speaking of which, @oremanj, what's your use case? (If you can share.)
Any movement on this? I'm looking for an equivalent for StreamReader.readuntil
so I can continuously listen for special termination chars in websocket messages.
@pawarren I think this is up to date at the moment. I'm a bit confused about how readuntil
and websockets fit together, though – usually websocket messages are small and you read them in as single bytes
or str
objects? Are you using trio-websocket, or something else?
I'm currently using asyncio.open_connection() and am referring to https://docs.python.org/3/library/asyncio-stream.html#asyncio.StreamReader.readuntil
I connect to 58 web sockets, keep the connections permanently open, and for each web socket continuously loop over the following steps asynchronously: 1) send one of three different types of messages asking for something 2) listen for one of four different types of responses (one per request type plus a heartbeat) 3) do something with response
The responses themselves are typically small XML messages or images.
It seems like there's a default EOF for most websockets, which I imagine is why libraries like websockets can do things like "for message in socket". But these message generally have a separate end-of-message separator, b'\r\n\r\n'., and normal practice for dealing with that seems to be maintaining a buffer while repeatedly reading chunks of size ~4096 bytes while looking for the separator. reader.readuntil(b'\r\n\r\n') was a nice way of avoiding that.
I'm new to async + websockets, so I might be missing an obvious solution or best practice.
I hadn't seen trio-websockets before; thank you for the reference.
Ah, OK, I think you mean regular TCP sockets :-). "Web socket" is a specific somewhat complicated protocol that you can use on top of regular sockets. For some reason the people designing WebSocket decided to give it a really confusing name.
It sounds like you have your own simple ad-hoc protocol on top of sockets. You might say you're transmitting a series of "frames", and each frame is terminated by \r\n\r\n
. Which is, indeed, exactly the sort of thing that this issue is about :-).
While you're waiting for us to get our ducks in a row and provide a more comprehensive solution, here's some code you can use:
import trio
_RECEIVE_SIZE = 4096 # pretty arbitrary
class TerminatedFrameReceiver:
"""Parse frames out of a Trio stream, where each frame is terminated by a
fixed byte sequence.
For example, you can parse newline-terminated lines by setting the
terminator to b"\n".
This uses some tricks to protect against denial of service attacks:
- It puts a limit on the maximum frame size, to avoid memory overflow; you
might want to adjust the limit for your situation.
- It uses some algorithmic trickiness to avoid "slow loris" attacks. All
algorithms are amortized O(n) in the length of the input.
"""
def __init__(self, stream, terminator, max_frame_length=16384):
self.stream = stream
self.terminator = terminator
self.max_frame_length = max_frame_length
self._buf = bytearray()
self._next_find_idx = 0
async def receive(self):
while True:
terminator_idx = self._buf.find(
self.terminator, self._next_find_idx
)
if terminator_idx < 0:
# no terminator found
if len(self._buf) > self.max_frame_length:
raise ValueError("frame too long")
# next time, start the search where this one left off
self._next_find_idx = max(0, len(self._buf) - len(self.terminator) + 1)
# add some more data, then loop around
more_data = await self.stream.receive_some(_RECEIVE_SIZE)
if more_data == b"":
if self._buf:
raise ValueError("incomplete frame")
raise trio.EndOfChannel
self._buf += more_data
else:
# terminator found in buf, so extract the frame
frame = self._buf[:terminator_idx]
# Update the buffer in place, to take advantage of bytearray's
# optimized delete-from-beginning feature.
del self._buf[:terminator_idx+len(self.terminator)]
# next time, start the search from the beginning
self._next_find_idx = 0
return frame
def __aiter__(self):
return self
async def __anext__(self):
try:
return await self.receive()
except trio.EndOfChannel:
raise StopAsyncIteration
################################################################
# Example
################################################################
from trio.testing import memory_stream_pair
async def main():
sender_stream, receiver_stream = memory_stream_pair()
async def sender():
await sender_stream.send_all(b"hello\r\n\r\n")
await trio.sleep(1)
await sender_stream.send_all(b"split-up ")
await trio.sleep(1)
await sender_stream.send_all(b"message\r\n\r")
await trio.sleep(1)
await sender_stream.send_all(b"\n")
await trio.sleep(1)
await sender_stream.send_all(b"goodbye\r\n\r\n")
await trio.sleep(1)
await sender_stream.aclose()
async def receiver():
chan = TerminatedFrameReceiver(receiver_stream, b"\r\n\r\n")
async for message in chan:
print(f"Got message: {message!r}")
async with trio.open_nursery() as nursery:
nursery.start_soon(sender)
nursery.start_soon(receiver)
trio.run(main)
Wow, this is fantastic!
I particularly like the use of bytearray's optimized delete-from-beginning feature.
What do you mean by "It uses some algorithmic trickiness to avoid "slow loris" attacks."? What's the trickiness?
And thank you for pointing me towards the definition of websocket. That makes a lot of the docs I've been reading over the past few days make more sense...
On Mon, Mar 11, 2019 at 12:12 AM Nathaniel J. Smith < notifications@github.com> wrote:
Ah, OK, I think you mean regular TCP sockets :-). "Web socket" is a specific somewhat complicated protocol https://tools.ietf.org/html/rfc6455 that you can use on top of regular sockets. For some reason the people designing WebSocket decided to give it a really confusing name.
It sounds like you have your own simple ad-hoc protocol on top of sockets. You might say you're transmitting a series of "frames", and each frame is terminated by \r\n\r\n. Which is, indeed, exactly the sort of thing that this issue is about :-).
While you're waiting for us to get our ducks in a row and provide a more comprehensive solution, here's some code you can use:
import trio _RECEIVE_SIZE = 4096 # pretty arbitrary class TerminatedFrameReceiver: """Parse frames out of a Trio stream, where each frame is terminated by a fixed byte sequence. For example, you can parse newline-terminated lines by setting the terminator to b"\n". This uses some tricks to protect against denial of service attacks: - It puts a limit on the maximum frame size, to avoid memory overflow; you might want to adjust the limit for your situation. - It uses some algorithmic trickiness to avoid "slow loris" attacks. All algorithms are amortized O(n) in the length of the input. """ def init(self, stream, terminator, max_frame_length=16384): self.stream = stream self.terminator = terminator self.max_frame_length = max_frame_length self._buf = bytearray() self._next_find_idx = 0
async def receive(self): while True: terminator_idx = self._buf.find( self.terminator, self._next_find_idx ) if terminator_idx < 0: # no terminator found if len(self._buf) > self.max_frame_length: raise ValueError("frame too long") # next time, start the search where this one left off self._next_find_idx = max(0, len(self._buf) - len(self.terminator) + 1) # add some more data, then loop around more_data = await self.stream.receive_some(_RECEIVE_SIZE) if more_data == b"": if self._buf: raise ValueError("incomplete frame") raise trio.EndOfChannel self._buf += more_data else: # terminator found in buf, so extract the frame frame = self._buf[:terminator_idx] # Update the buffer in place, to take advantage of bytearray's # optimized delete-from-beginning feature. del self._buf[:terminator_idx+len(self.terminator)] # next time, start the search from the beginning self._next_find_idx = 0 return frame def __aiter__(self): return self async def __anext__(self): try: return await self.receive() except trio.EndOfChannel: raise StopAsyncIteration
################################################################# Example################################################################ from trio.testing import memory_stream_pairasync def main(): sender_stream, receiver_stream = memory_stream_pair()
async def sender(): await sender_stream.send_all(b"hello\r\n\r\n") await trio.sleep(1) await sender_stream.send_all(b"split-up ") await trio.sleep(1) await sender_stream.send_all(b"message\r\n\r") await trio.sleep(1) await sender_stream.send_all(b"\n") await trio.sleep(1) await sender_stream.send_all(b"goodbye\r\n\r\n") await trio.sleep(1) await sender_stream.aclose() async def receiver(): chan = TerminatedFrameReceiver(receiver_stream, b"\r\n\r\n") async for message in chan: print(f"Got message: {message!r}") async with trio.open_nursery() as nursery: nursery.start_soon(sender) nursery.start_soon(receiver)
trio.run(main)
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/python-trio/trio/issues/796#issuecomment-471428274, or mute the thread https://github.com/notifications/unsubscribe-auth/AD71iKCpoWeA2uynp9JT-KhBL2Pe4WO_ks5vVgH5gaJpZM4Y-HF3 .
What do you mean by "It uses some algorithmic trickiness to avoid "slow loris" attacks."? What's the trickiness?
There are two mistakes people often make, that cause this kind of code to have O(n^2) complexity:
They use a regular bytes
object as the buffer, or instead of using del
to remove stuff from the front, they copy the back (buf = buf[new_start:]
). This is especially common on Python 2, because it doesn't have the optimized bytearray
del
method built in, so you have to implement it by hand. But even on py3, a lot of people don't realize how important it is to write this operation in exactly the right way.
after a failed search, they restart the next search from the beginning, instead of tracking how much of the buffer they already searched and only searching the new part
If you make either of these mistakes, then it's easy for a peer that's malicious, or just misbehaving, to burn up a ton of your CPU time doing repeated work. For example, say someone sends you a 2,000 byte frame, but they do it one byte at a time. Now a naive implementation ends up scanning through ~1,000,000 bytes looking for the \r\n\r\n
, because it scans the first part of the buffer over and over and over.
It's surprisingly subtle!
Here's my hack on TerminatedFrameReceiver that extends the sans I/O idea, possible too far. It's similar to the sort/cmp split. Just as single definition of cmp allows lots of sort implementations, a single read & write allows interchangeable frame methods. My trio implementation leaves MUCH to be desired. It's more a POC that shows how a definition like receive_some() is sufficient for framing variations without pestering the runner (nursery) too much. Comments welcomed.
import trio
import attr
@attr.s
class Framer:
'''
a mash-up of an accumulator with a parser for frames with a terminator
does no I/O, no runner dependecies, only lazy evaluation via async calls
'''
MAX_FRAME_SIZE = 16384
terminator = attr.ib()
more = attr.ib() # async f() returns next chunk for buf
done = attr.ib() # async f(buf) put as frame, return ignored
_buf = attr.ib(factory=bytearray)
_next_find_idx = attr.ib(init=False, default=0)
async def deframe(self):
while True:
terminator_idx = self._buf.find(
self.terminator, self._next_find_idx
)
if terminator_idx < 0:
more_data = await self.more()
# paranoid that a goof in more() may return too much
if len(self._buf) + len(more_data) > self.MAX_FRAME_SIZE:
raise ValueError("frame too long")
if more_data == b"" and self._buf:
raise ValueError("incomplete frame")
self._next_find_idx = max(0, len(self._buf) - len(self.terminator) + 1)
self._buf += more_data
else:
frame = self._buf[:terminator_idx]
del self._buf[:terminator_idx+len(self.terminator)]
self._next_find_idx = 0
await self.done(frame)
# be nice to have this here, but need another done() dest
# ... and another more() source to use as a pump
async def enframe(self, msg):
return msg + self.terminator
################################################################
# Example
################################################################
_RECEIVE_SIZE = 4096 # pretty arbitrary
from trio.testing import memory_stream_pair
async def main():
# so sender.aclose() ripples to receiver
wire = trio.StapledStream(*memory_stream_pair())
async def sender():
await wire.send_all(b"split-up ")
await trio.sleep(1)
await wire.send_all(b"message\r\n\r")
await trio.sleep(1)
await wire.send_all(b"\n")
await trio.sleep(1)
await wire.aclose()
async def trio_more():
return await wire.receive_some(_RECEIVE_SIZE)
async def trio_done(msg):
print((f"Got message: {msg}"))
pump = Framer(b"\r\n\r\n", trio_more, trio_done)
try:
async with trio.open_nursery() as nursery:
nursery.start_soon(sender)
nursery.start_soon(pump.deframe)
except trio.ClosedResourceError:
print ("done")
except ValueError as e:
print (f'Unrecoverable error: {e}')
trio.run(main)
The trio-ish way would not use a done
callback. Instead, the deframe
method would be an async iterator, thus it would say yield frame
instead of self.done(frame)
, and the caller would do
async for msg in pump.deframe():
print(f"Got message: {msg}")
thus no need to start pump.deframe
as a separate task.
I think I missed a crucial point about sans I/O; it's synchronous. It needs to have no dependence on trio.
Here's another attempt at a utility for creating basic protocols. I've omitted the klunky implementation for deframe
(closures and yield from
spaghetti). I'd first like to get some feedback on this kind of approach before addressing the implementation.
It seems the buffer mgmt in wsproto, hyper-h2, et. al. is just different enough to require specialized glue code for each to use in trio. A 'standard mechanism' of how to supply bytes and get out objects for simple protocols might also help uniform adoption of these heftier protocols.
from deframe import deframe
import trio
from trio.testing import memory_stream_pair
_RECEIVE_SIZE = 4096 # pretty arbitrary
def netstring_proto(rd, max_msg=9999, **kw_decode):
''' syncronous generator using rd() from deframe() for buffer mgmt '''
max_hdr = len(str(max_msg)) + 1
while True:
size = yield from rd(b':', stop=max_hdr)
body = yield from rd(b',', exact=int(size))
yield body.decode(**kw_decode)
async def main():
wire = trio.StapledStream(*memory_stream_pair())
async def sender():
await wire.send_all(b"4:hiya,")
await wire.send_all(b"12:two ")
await wire.send_all(b"split-")
await wire.send_all(b"up,9: messages,")
await wire.send_eof()
try:
async with trio.open_nursery() as nursery:
nursery.start_soon(sender)
make_msgs = deframe(netstring_proto)
while True:
data = await wire.receive_some(_RECEIVE_SIZE)
if not data:
raise trio.ClosedResourceError
# loop though a generator of msgs, stops when no more msgs in buffer
for msg in make_msgs(data):
print(f"Got message: {msg.upper()}")
except trio.ClosedResourceError:
print ("done")
if __name__ == '__main__':
trio.run(main)
The rust tokio
crate actually has a fully-worked-out version of exactly the ideas we're talking about here:
I'm having a crisis of faith about sans-io. Well, at least a partial crisis.
Here's what happened:
I was looking again at my nascent library for writing sans-io protocols. Let's call it byoio
just to have some name to refer to it. (reference) This goal is to make it such a good way to write protocols, that no-one is tempted to write protocols directly against Trio's own APIs.
The core technical foundation is a receive buffer with O(n) algorithms for basic operations like consuming data and searching for delimiters. But on top of that foundation, we want to be able to write clean, readable, composable protocol code. For example, if we're defining a sans-io implementation of sending/receiving single lines, then (a) we want our implementation to be easy to read and reason about, (b) we want to make it easy to use it as a foundation for building new sans-io libraries on top, for line-oriented protocols like SMTP or FTP.
The basic pattern in defining a sans-io parser is that you have a buffer, and you have some kind of "read more from the buffer" operation that either returns an event description or says that it needs more data (e.g. by returning h11.NEED_DATA
), in which case the user has to feed in more data and then retry. This leads to pretty awkward code, because you might get half-way through some complicated parsing job and realize you've run out of data and have to unwind – BUT you have to store enough information about where you were when you ran out of data that you can resume in O(1) time, without re-processing all the early data, because if each attempt is O(n) and you make O(n) attempts then now you're O(n^2) and trivially DoSable.
The best trick I've seen for making this kind of code readable is to write the parser code itself as a generator, with a little wrapper to convert between the sans-io conventions and the generator conventions: see for example this version of wsproto's parser, which uses the convention that when you run out of data you yield
, the wrapper converts that into NEED_DATA
, and then after more data arrives the wrapper resumes the generator. Lomond uses a similar trick. Ohneio is a prototype for a more general sans-io parsing framework with similar ideas.
The main downside of this approach is that it's very convenient to have yield from
so you can factor out reusable logic, and Python 2 has no yield from
. That's why we ended up rewriting wsproto's parser. There are ways to hack around this (lomond has some clever tricks here), but it's definitely more awkward. But, I realized, for my sans-io library... well, that's just not very important. h11 might need to keep py2 support for a while yet but no-one is starting new protocol libraries with py2 support. In my original vision, one of the design goals for the new framework was to switch h11 to using it, but it's actually not that important... h11 works fine as it is. And if we really need py2 support, then we can invent a byoio.py2compat
submodule that uses the awkward hacks to implement the same API, and keep yield from
as the main API, and eventually deprecate py2compat
and tell everyone to move to the good py3 version.
So my sketch at this point has a generic sans-io-ish interface for users:
class ByoioParser(Protocol, Generic[EventType]):
@abstractmethod
def next_event(self) -> Union[NEED_DATA, EventType]:
...
@abstractmethod
def feed_bytes(self, new_data: BytesLike) -> None:
...
But I'm imagining you wouldn't normally implement this interface by hand, just like you don't normally implement the context manager interface by hand... instead you use a decorator:
@byoio.protocol
def receive_uint32_prefixed_message(receive_buffer):
prefix = yield from receive_exactly(receive_buffer, 4)
(length,) = struct.unpack("<I", prefix)
return yield from receive_exactly(receive_buffer, length)
proto = receive_uint32_prefixed()
assert isinstance(proto, ByoioParser)
And now composing protocols together is easy: either we provide some way for higher-level protocols to use the generator directly, like
@byoio.protocol
def higher_level_protocol(receive_buffer):
message = yield from receive_uint32_prefixed.gen(receive_buffer)
# ... do something with 'message' ...
Or better yet, provide some way to convert ByoioProtocol
objects back into generators:
message = yield from read_uint32_prefixed(receive_buffer).asgen()
But there's some awkwardness here... yield from
is really convenient for implementing parsers that return a single value, like read_exactly
or read_uint32_prefixed_message
. But most sans-io parsers return a stream of events – you can keep calling next_event
as long as you want, or at least until EOF.
If we want a generator that returns a stream of real results, that's not too hard – that's how my old wsproto parser works. We just declare that you can yield NEED_DATA
or yield event
and either way that's what next_event
returns. So that works on its own. But as soon as we want to build a more complex protocol on top, things get harder – like say parse_lines
repeatedly yields lines or NEED_DATA
. Now say we want to use parse_lines
to implement an SMTP parser. How? We can't use yield from parse_lines(...)
, because if our convention is to use the yield
channel to pass back final events, then we'll end up yielding raw lines instead of processing them. We need to write something like:
@byoio.protocol
def parse_smtp(buffer):
subprotocol = parse_lines(buffer)
while True:
next_line = yield from subprotocol.gen_for_next()
...
Except we also need some convention to signal that there is no next item... maybe we should use an exception?
Anyway point is the point where I realized that I'd just reinvented async iterators, except more awkward because generators can't use async for
.
OK, so, new plan: instead of generators, we'll define byoio
protocols using async/await. We can keep the ByoioParser
abstract interface, but now we'll write something like:
# One-shot protocol is a regular async function
@byoio.protocol
async def receive_uint32_prefixed_message(receive_buffer: bytearray):
prefix = await receive_exactly(receive_buffer, 4)
(length,) = struct.unpack("<I", prefix)
return await receive_exactly(receive_buffer, length)
# Multi-shot protocol is an async generator
@byoio.protocol
async def receive_lines(receive_buffer):
while True:
yield await receive_until(receive_buffer, b"\r\n")
Internally, byoio.protocol
would implement a tiny little coroutine runner, with just a single trap that means "return NEED_DATA
", and it iterates the coroutine each time someone calls .next_event()
.
Of course we also need some way to send data, so maybe we should have a way to do that too. For convenience maybe we should bundle these primitives together on a little interface:
class ByoioStream:
async def receive_some(self): ...
def send_all(self, data): ...
...but OK, that's basically a subset of the trio Stream
interface. send_all
is synchronous here because we assume that our caller will receive backpressure when they go to actually send out the bytes we've written, but that's a pretty trivial difference.
So it turns out that my grand plan is:
ByoioStream
interface, so you can plug in any kind of transport underneath, or test against canned data, etc.But alternatively, we could take the exact same code we wrote in step 1, pass in any trio Stream
object as our abstract ByoioStream
, and bam we have a nice async Trio API. Steps 2-4 are totally superfluous. So I'm feeling kinda silly. Apparently at the end of all that, my grand sans-io plans have looped around, and turned into "just write code directly against the Trio API" (and add then add some extra fluff that just makes things slower).
One of the major advantages of sans-io code is that because it has such a narrow and controllable interface with the outside world, it's easy to test. And of course the other advantage is that the public API is neutral and can be used anywhere, including in sync code. The byoio
idea does have some advantages on both counts: because you only get access to a very narrow set of primitives, you're guaranteed you aren't accidentally doing something hard to test or non-portable, like using trio primitives directly instead of just calling stream.receive_some
and stream.send_all
. And maybe it would have some value as a way to export sync APIs for sync code? And just generally signal that these protocol implementations really are neutral, instead of looking like a Trio-only thing.
But it does suggest that we can feel free to start writing all the obvious primitives – receive_exactly
, receive_until
, receive_until_re
, LineChannel
, NetstringChannel
, LengthPrefixedChannel
, etc. directly against the Trio Stream
API, and it would be OK to put them directly into Trio. We wouldn't lose the chance to have a byoio
– we can always extract the code into a standalone library and have Trio re-export it.
One nasty issue I don't have a good answer for: the send/receive/bidi split makes everything super annoying to type, in the mypy sense. It's super annoying to have ReceiveLineChannel
, SendLineChannel
, LineChannel
, ad infinitum. Tempting to just have LineChannel
and tell people that if they're wrapping a ReceiveStream
then that's fine, just don't call send
, it will raise an error. Or for the basic receivebuffer+stream primitive, do I wrap those up in an object that implements Stream
, or just ReceiveStream
, since it's just a receive buffer? I'm half-tempted to say screw it, it's all runtime all the way down, there's just Stream
and Channel
and that's it. But this feels like something the type system should be able to check, so I suspect erasing it won't make people happy!
On another note: as I write receive_exactly
and receive_until
and receive receive receive, I'm somewhat regretting the decision to name our primitives send
/receive
. I do like the emphasis that these are active transfers "in the moment", versus read
/write
that refer to modifying passive media – and also it's good to avoid associations with Python's conventions around read
, which are really inappropriate for Stream
s and for Channel
s, for different reasons. But receive
is just an awkward word: kinda long, annoying to spell. I'd be tempted by get
/put
, except then we'd have Stream.get_some
and that seems like a bad idea. give
/take
? put
/take
? put_all
/get_any
? Or just stick with send
/receive
... Maybe this should be a separate issue.
.You've described in a lot of words why I didn't bother with sans-io-ing any of my code yet.
There's two missing building blocks in sans-io. One is timeout handling, including the requirement to send frequent keepalives. The other is request multiplexing.
WRT put/get: Umm … thinking about it, put/get works for single messages, while send/receive makes more sense for byte/character streams. This is because I associate put/get with things like channels or queues that you can use to put some single thing in on one end and get it out the other end.
send
/receive
on the other hand is for bytestreams and similar where the focus of these methods is on partitioning the stream into semantic chunks.
Calling receive_exactly
on four bytes (and then N bytes) to de-partition a byte-encoded message stream makes sense.
Calling get_exactly
to read N HTTP requests or MQTT messages? not so much.
Yes, receive
is awkward. So … use recv
instead?
thinking about it, put/get works for single messages, while send/receive makes more sense for byte/character streams. This is because I associate put/get with things like channels or queues that you can use to put some single thing in on one end and get it out the other end.
That's why I find them attractive :-). For single objects we currently have Channel.send
and Channel.receive
, and for byte streams we have Stream.send_all
and Stream.receive_some
. So in all cases, the verb on its own is being used to refer to single objects, and then sometimes we have a modifier to make it plural.
And get_exactly
would take a Stream
specifically, not a Channel
, so I don't think there'd be any confusion about using it for MQTT.
There's two missing building blocks in sans-io. One is timeout handling, including the requirement to send frequent keepalives. The other is request multiplexing.
Yeah, one big limitation of the whole byoio
idea is that it only helps writing protocols like h11/h2/wsproto that are purely managing a single TCP connection.
There are a bunch of useful protocols that fit within this constraint – that list, plus basic protocols like line splitting, plus some others like SMTP. But there are also a lot of protocols that have some quirky extra requirement.
h2 does request multiplexing... not sure what you're missing there. But yeah, timing is a common one – protocols can specify timeouts, sometimes in odd corners. The ssh protocol doesn't seem like it cares about time, except, ssh servers need to rate-limit password checks.
Some protocols need to be able to make new connections. Think SOCKS, or anything with automatic reconnect on failure, or http redirects.
Some protocols need UDP. There's an interesting discussion happening at https://github.com/aiortc/aioquic/issues/4 about how to make a sans-io interface for QUIC.
All these could in principle be fit into the sans-io paradigm: all your need to do is be extremely explicit about what your interactions with the environment are and expose them as public API. You can even imagine a library like byoio
growing interfaces for things like time or UDP, where each protocol has to declare what resources it needs. That starts moving in the direction of anyio ... The main difference is that so far anyio has focused on providing maximal access to all possible functionality, and has actually avoided abstractions like Stream
. But here we'd be focusing much more on abstractions and minimizing the interface.
There's not going to be a single universal answer here, but I think it's helpful to try to map out the space of options.
I came to some of the same conclusions. recv_exact and recv_until work better as distinct calls. Nesting protocols is hard. With yield from
wrappers, there's a need for boilerplate conversion from sync generators to async access.
However, the last one doesn't seem so bad. If I want one await a_stream.receive_some(BIG_CHUNK)
to become one list of MsgObjs synchronously, a different conversion can do that. It seems if recv functions are async, one is forced to go through the reactor for every small field. Not sure if some benefits of cooperative scheduling are possible with such an approach. My first pass had feed
and next
as one function, which seemed to make different await tweaking easier outside of the protocol. Maybe this is a SAX vs DOM thing; different enough to necessitate different parsers.
Some of the headaches with nesting are a consequence of of the Stream/Channel distinction. Utils for byte->obj can't be easily reused for obj->bigger_obj.
So my prototype so far has the signatures:
async def receive_exactly(rstream: ReceiveStream, rbuf: bytearray, size: int): ...
async def receive_until(rstream: ReceiveStream, rbuf: bytearray, delim: BytesLike, *, max_size: int): ...
class LineChannel(Channel[bytes]):
def __init__(self, stream: Stream, *, max_size: int, eol=b"\r\n", initial_data=b""): ...
async def send(...): ...
async def receive(...): ...
def detach(self) -> Tuple[Stream, BytesLike]: ...
class LengthPrefixedChannel(Channel[bytes]):
def __init__(self, stream: Stream, prefix_format: str, *, max_size: int, initial_data=b""): ...
# ... rest of API the same as LineChannel ...
I'm assuming #1123 or something like it will be merged, so we don't have to pass in receive_size
constants everywhere.
Please notice the rbuf
argument on the free-standing functions; it's a weird part. The idea is that this has to be a mutable bytearray
object. On entry, it contains any data that's been received on this stream but hasn't been parsed yet. The function updates it in place, so that on exit it still contains that, but after removing the data that this function parsed. So if using these directly, you do something like:
# Start out with an empty bytearray
rbuf = bytearray()
# Pass it into every function call
header = await receive_exactly(stream, rbuf, 8)
length, flags = struct.decode("<HH", header)
body = await receive_exactly(stream, rbuf, length)
And the receive part of LineChannel
would be implemented like:
class LineChannel(Channel[bytes]):
def __init__(self, stream: Stream, *, max_size: int, eol=b"\r\n", initial_data=b""):
self._stream = stream
self._max_size = max_size
self._eol = eol
# If you already have a receive buffer, you can pass it in as initial_data
self._rbuf = bytearray(initial_data)
async def receive(self):
# TODO: figure out EOF handling (discussed below)
return await receive_until(self._stream, self._rbuf, self._eol, max_size=self._max_size)
def detach(self):
# Pass back out the buffer, so that others can use it
stream, rbuf = self._stream, self._rbuf
self._stream, self._rbuf = None, None
return stream, rbuf
Another obvious option would be to encapsulate the stream+buffer together in an object, like a BufferedStream
. When I tried to sketch it out this way, I ran into several small but annoying issues:
BufferedStream
expose a send_all
method? Or do we need separate BufferedReceiveStream
and BufferedStream
objects?BufferedStream.receive_exactly
and BufferedStream.receive_until
. If someone wants to implement their own reader then they need access to the raw buffer (e.g., h11 needs receive_until_re
to search for b\r?\n\r?\n
). If you want to switch protocols, e.g. to SSLStream
, you need to detach any trailing data so you can feed it into the new wrapper. So it's not exactly an abstraction in the strict sense, because all the internals need to be fully exposed.So I'm thinking, if the abstraction is that leaky, then maybe it's better to just get rid of it.
EOF handling is something of an open question, and related to the above. In my draft, if receive_exactly
or receive_until
hit EOF before finishing, then they raise an exception. receive_until
can also raise an exception if it's read max_size
bytes without seeing the delimiter. In either case, the unread data is left behind in the rbuf
. For some cases, this is just what you want: e.g. if you do header = await receive_exactly(stream, rbuf, 4)
, and there are 2 bytes and then EOF, that's obviously some kind of corrupted stream and raising an exception is right. OTOH, if you go to read the header and get 0 bytes and then EOF, that might be fine. And with this design, the way you distinguish is to check rbuf
afterwards and see how much data is in it. So I guess we would implement LineChannel.receive
like:
async def receive(self):
try:
return await receive_until(self._stream, self._rbuf, self._eol, max_size=self._max_size)
except EOFError as exc:
if self._rbuf:
raise trio.BrokenResourceError from exc
else:
raise trio.EndOfChannel
If we had a BufferedStream
quasi-abstraction, then it could potentially track the EOF state as part of its internal state, and refuse to call receive_some
again after seeing an EOF. OTOH if we're just passing around a bytearray
, we don't have a bool to store this info. I'm not sure if this is important in practice, or not?
There is at least one case where EOF is not inherently persistent: on a TTY, if someone hits control-D, then reading from the tty will report EOF, and also clear the EOF flag, so that future reads go back to reading new keyboard input. So, you generally don't want to rely on being able to call receive_some()
repeatedly and keep getting EOF each time. But, for something like the receive
code above that catches EOFError
, once receive_some
returning EOF is enough; it doesn't have any reason to store that flag anywhere.
For h11's receive buffer, it does have to store that flag because someone else is feeding in data and could feed in an EOF at any time, so we have to remember it. Here the reason we get away with potentially not needing it is that we don't call receive_some
until we're sure we want more data, so we don't get an EOF until we're ready to handle it. Maybe that's generally true? I'm not 100% sure.
I also thought about having a NetstringChannel
, but I'm not sure anyone actually uses these in real life? Maybe it's better to keep it as an example for building your own protocol classes (#472).
(I split off #1125 for the discussion about renaming send
/receive
)
One nasty issue I don't have a good answer for: the send/receive/bidi split makes everything super annoying to type, in the mypy sense. It's super annoying to have ReceiveLineChannel, SendLineChannel, LineChannel, ad infinitum. Tempting to just have LineChannel and tell people that if they're wrapping a ReceiveStream then that's fine, just don't call send, it will raise an error.
Another possibility: just have SendLineChannel
, ReceiveLineChannel
, and then StapledChannel(SendLineChannel(stream), ReceiveLineChannel(stream))
. We'd want some shorthand though probably, like
# But how do we handle different constructor args?
LineChannel = StapledChannel.new_type(SendLineChannel, ReceiveLineChannel)
or
channel = make_line_channel(stream, ...)
Or maybe I should give up and add this to the very short list of cases where implementation inheritance is OK...
On the receive side we do need a buffer, obviously; equally obviously that buffer needs to be part of the object's interface. In fact we need to be able to share the buffer between "line" objects: some protocols (IMAP?) send a CRLF-delimited line that states "the next bit consists of N opaque bytes". Thus yes it should be accessible, but still be part of the object so that the easy case Just Works. I'd simply use an optional argument to the constructor.
This also applies to a (buffered) sender. (The unbuffered case is easy …)
Yes, mypy and similar is a challenge, but frankly I'd rather add some restriction-propagation logic to mypy to recognize "when you call LineChannel(read_stream)
then this LineChannel object is in fact a ReadonlyLineChannel", than to impose that kind of cognitive load on our users. In fact I'd assume that this kind of thing happens often enough that adding some generic way to teach mypy about object type equivalence should be on their road map, if not actually possible, anyway:
FooChannel(foo: ReadStream) -> ReadonlyChannel[Foo]
FooChannel(foo: WriteStream) -> WriteonlyChannel[Foo]
FooChannel(foo: Stream) -> Channel[Foo]
(where "Foo" is the somewhat-generic type of thing you'd transmit/receive).
All that "read+write => bidirectional" stapling stuff should be restricted to the few cases where you really have two disparate streams you need to link up. I can think of only one relevant case, namely setting up two pipes for bidirectional communication, and even that is somewhat-obsolete when you have socketpair
.
On the receive side we do need a buffer, obviously; equally obviously that buffer needs to be part of the object's interface. In fact we need to be able to share the buffer between "line" objects: some protocols (IMAP?) send a CRLF-delimited line that states "the next bit consists of N opaque bytes". Thus yes it should be accessible, but still be part of the object so that the easy case Just Works. I'd simply use an optional argument to the constructor.
Part of which object? Which constructor?
We don't want to add buffer management to the Stream
interface, because there are many different classes that implement that interface and we don't want them all to be forced to add duplicate buffer management code. So the question above is exactly whether it's creating a new type of object just to hold the buffer.
This is for the lower-level tools like receive_exactly
and receive_until
. For the Channel
objects, in the sketch above they're already holding the buffer internally, and the detach
method is intended to handle those IMAP cases. (However, there is an interesting wrinkle: in the API above, the user can't directly access the buffer belong to a live Channel
object – they have to destroy the Channel
object to get back the stream state, then call receive_exactly
, then make a new Channel
object. I don't know if that's the best choice.)
Yes, mypy and similar is a challenge, but frankly I'd rather add some restriction-propagation logic to mypy to recognize "when you call
LineChannel(read_stream)
then thisLineChannel
object is in fact aReadonlyLineChannel
", than to impose that kind of cognitive load on our users. In fact I'd assume that this kind of thing happens often enough that adding some generic way to teach mypy about object type equivalence should be on their road map, if not actually possible, anyway:
Hmm. I suppose they do have @overload
. I wonder what happens today if you do something like:
@overload
def line_channel(s: Stream, ...) -> LineChannel: ...
@overload
def line_channel(s: ReceiveStream, ...) -> LineReceiveChannel: ...
@overload
def line_channel(s: SendStream, ...) -> LineSendChannel: ...
def line_channel(s, ...):
# ... actual implementation ...
I think you do still need separate LineSendChannel
, LineReceiveChannel
, LineChannel
types, because LineChannel
has some unique functionality that isn't in the abstract Channel
interface. And I suspect this won't work on class constructors right now...
We don't want to add buffer management to the Stream interface, because there are many different classes that implement that interface and we don't want them all to be forced to add duplicate buffer management code.
Right. On the other hand we have channels that definitely need a shareable buffer for the underlying stream (LineChannel
) which needs to be somewhat-accessible to the implementation (PatternDelimitedChannel
).
So, well, interpose a BufferedReadStream
with some low(ish)-level accessors to parse the buffer content and to read more data into it. (Same thing for a writer.) A make_buffered
constructor that returns its argument if that already is a buffered stream is simple enough.
I don't think we'd need any sort of interim detaching mechanism for that buffer. A "I can use/modify the buffer" lock, or even a ConflictManager-protected accessor, should be sufficient.
Slightly tangential: What would be a good trionic pattern that switches between line reader and then N bytes reader. E.g., for HTTP/SIP you want to parse by lines until Content-Length then you want N bytes. So I want to mix
line = stream.readline()
...
# until I get content-length, then
stream.readbytes(N)
In the SO line reader example, the stream bytes are sent into the generator, so after reading a few lines with some data pending in the buf
variable, what would be the trionic way to switch into N-bytes reader mode, bearing in mind that some bytes are in the generator?
I'm thinking of sending a tuple like
gen.send((more_data, False)) # continue with line reader mode
# or
gen.send((None, N)) # mean give up to N bytes from buf
# If N > len(buf), buf is safely flushed and the regular
# stream methods can be used for the rest of the data
(Actual use case: I'm looking to do a trio port (from gevent) of GreenSWITCH (https://github.com/EvoluxBR/greenswitch) which switches between line reader and N bytes reader. The application protocol is that of the PBX software FreeSWITCH which behaves like HTTP.)
@space88man I have a half written blog post about adapting @njsmith's TerminatedFrameReceiver for that - in my case to parse RESP3. In short, I ended up with the following:
class TerminatedFrameReceiver:
def __init__(
self,
buffer: bytes = b"",
stream: Optional[trio.abc.ReceiveStream] = None,
terminator: bytes = b"\r\n",
max_frame_length: int = 16384,
):
assert isinstance(buffer, bytes)
assert not stream or isinstance(stream, trio.abc.ReceiveStream)
self.stream = stream
self.terminator = terminator
self.max_frame_length = max_frame_length
self._buf = bytearray(buffer)
self._next_find_idx = 0
def __bool__(self):
return bool(self._buf)
async def receive(self):
while True:
terminator_idx = self._buf.find(self.terminator, self._next_find_idx)
if terminator_idx < 0:
self._next_find_idx = max(0, len(self._buf) - len(self.terminator) + 1)
await self._receive()
else:
return self._frame(terminator_idx + len(self.terminator))
async def receive_exactly(self, n: int) -> bytes:
while len(self._buf) < n:
await self._receive()
return self._frame(n)
async def _receive(self):
if len(self._buf) > self.max_frame_length:
raise ValueError("frame too long")
more_data = await self.stream.receive_some(_RECEIVE_SIZE) if self.stream is not None else b""
if more_data == b"":
if self._buf:
raise ValueError("incomplete frame")
raise trio.EndOfChannel
self._buf += more_data
def _frame(self, idx: int) -> bytes:
frame = self._buf[:idx]
del self._buf[:idx]
self._next_find_idx = 0
return frame
Essentially, refactored receive
to separate _receive
and _frame
, then implemented receive_exactly
off of the buffer. In short, this becomes a "buffered stream" I guess.
Just to add a data point (or anecdata point): For me, the receive_until
function (and receive_exactly
) would be extremely useful. I've been using the code on StackOverflow posted by @njsmith (thanks!) with two modifications:
Stream
-like LineReader
class, I reshuffled it into to a single receive_until
function (with the signature suggested above in the hope that one day I can do a straight swap to the Trio-supplied version).Having said that, I don't see wrapping those functions up in a Channel
-like class to be especially useful. Once you have the underlying functions, there's not a great deal of difference between:
# API 1 : use receive_until directly
buffer = bytearray()
while True:
msg = await receive_until(stream, buffer, b'x', max_size=MAX_SIZE)
await do_something(msg)
Vs:
# API 2 : Channel interface
msg_stream = LineChannel(stream, max_size=MAX_SIZE, eol=b'x')
while True:
msg = await msg_stream.receive()
await do_something(msg)
I also considered wrapping receive_until
in a simple generator function, but even that didn't seem a great deal simpler. In any case, I wanted the buffer
accessible at the top level so I could log any unused bytes in it when an error occured (including an error not related to reading).
I can imagine you might want a wrapper LineChannel
class if you want to call receive_until
in several places in your code without duplicating all the parameters, or you have a general-purpose function that accepts a ReceiveChannel
and you want to reuse it for delimited streams. I'm not sure how common those uses are in practice. Otherwise, I think it's easier just to recommend using the underlying functions directly. Certainly, if you want to mix different types of call, as in the HTTP example given above, mixing the underlying functions seems a simpler to me than using TerminatedFrameReceiver
which is a bit like a ReceiveChannel
(technically satisfies the requirements) but not quite.
Very interesting discussion about sans-io approach!
I'm more of an applied engineer and much less of an architect, so maybe not understand the problem from all sides. However, do I understand it right, that those sans-io problems could be circumwented by simply making sans-io interface async by default? In that case, timeouts would not be an issue and no AsyncIterator should be reinvented.
This is regarding comment https://github.com/python-trio/trio/issues/796#issuecomment-504746835
Most networking libraries provide some standard way to implement basic protocol building blocks like "split a stream into lines", "read exactly N bytes", or "split a stream into length-prefixed frames", e.g.:
asyncio
StreamReader.readline
,StreamReader.readexactly
,StreamReader.readuntil
The classes in
twisted.protocols.basic
The stdlib socket module's
makefile
method, that lets you get access to the full Python file API, includingreadline
and friendsTornado
IOStream
'sread_until
We don't have anything like this currently, as I was reminded by this StackOverflow question from @basak.
Note: if you're just looking for a quick way to read lines from a trio Stream, then click on that SO link, it has an example.
Use cases
LineReceiver
andLineOnlyReceiver
have subclasses implementing HTTP, IMAP, POP3, SMTP, Ident, Finger, FTP, Memcache, IRC, ... you get the idea.Int16Receiver
), though sometimes it involves lines, e.g. newline-terminated JSON, or the log parser in linehaul.readline
andread_until
are pretty useful. This particular case can also benefit from more sophisticated tools, like TTY emulation and pexpect-style pattern matching.Considerations
Our approach shouldn't involve adding new methods to
Stream
, because the point of theStream
interface is to allow for lots of different implementions, and we don't want to force everyone who implementsStream
to have to reimplement their own version of the standard frame-splitting algorithms. So this should be some helper function that acts on aStream
, or wrapper class that has-aStream
, something like that.For "real" protocols like HTTP, you definitely can implement them on top of explicit (async) blocking I/O operations like
readline
andread_exactly
, but these days I'm pretty convinced that you will be happier using Sans I/O. Some of the arguments for sans-io design are kind of pure and theoretical, like "better modularity" and "higher reusability", but having done this twice now (with h11 and wsproto), I really don't feel like it's an eat-your-vegetables thing – the benefits are super practical: like, you can actually understand your protocol code, and test it, and people with totally different use cases show up to fix bugs for you. It's just a more pleasant way to do things.OTOH, while trio is generally kind of opinionated and we should give confused users helpful nudges in the best direction we can, we don't want to be elitist. If someone's used to hacking together simple protocols using
readline
, and is comfortable doing that, we don't want to put up barriers to their using trio. And if the sans-i/O approach is harder to get started with, then for some people that will legitimately outweigh the long-term benefits.There might be one way to have our cake and eat it to: if we can make the sans-I/O version so simple and easy to get started with that even beginners and folks used to
readline
don't find it a barrier. If we can pull this off, it'd be pretty sweet, because then we can teach the better approach from the beginning, and when they move on to implementing more complex protocols, or integrated existing libraries like h11/h2/wsproto, they're already prepared to do it right.Alternatively, if we can't... there is really not a lot of harm in having a
lines_from_stream
generator, or whatever. But anything more than that is going to require exposing some kind of buffering to the user, which is the core of the sans-I/O pattern, so let's think about sans-I/O for a bit.Can we make sans-I/O accessible and easy?
The core parts of implementing a high-quality streaming line reader, a streaming length-prefixed string reader, or an HTTP parser, are actually all kind of the same:
h11 internally has a robust implementation of everything here except for specifying delimiters as a regex, and I need to add that anyway to fix https://github.com/python-hyper/h11/issues/7. So I have a plan already to pull that out into a standalone library.
And the API to a sans-I/O line reader, length-prefixed string reader, HTTP parser, or websocket parser for that matter, are also all kind of the same: you wrap them around a
Stream
, and then call areceive
method which tries to pull some "event" out of the internal buffer, while refiling the buffer as necessary.In fact, if you had sans-I/O versions of any of these, that all followed the same interface conventions, you could even have a single generic wrapper that binds them to a Trio stream, and implements the
ReceiveChannel
interface! Where the objects being received are lines, orh11.Event
objects, or whatever.So if you really just wanted a way to receive and send lines on a
Stream
, that might be:That's maybe a little bit more complicated than I'd want to use in a tutorial, but it's pretty close? Maybe we can slim it down a little more?
This approach is also flexible enough to handle more complex cases, like protocols that switch between lines-oriented and bulk data (HTTP), or that enable TLS half-way through (SMTP's STARTTLS command), which in Twisted's
LineReceiver
requires some special hooks. You can detach the sans-I/O wrapper from the underlying stream and then wrap it again in a different protocol, so long as you have some way to hand-off the buffer between them.But while it is flexible enough for that, and that approach is very elegant for Serious Robust Protocol implementations, it might be a lot to ask when someone really just wants to call
readline
twice and then read N bytes, or something like that. So maybe we'd also want something that wraps aReceiveStream
and providesread_line
,read_exactly
,read_until
, based on the same buffering code described above but without the fancy sans-I/O event layer in between?