Originally driven by a bug found in piker where we'd get an inf recursion error due to BroadcastReceiver.receive() being recursively called when consumer tasks are awoken but no value is ready to nowait receive.
This new rework takes an approach closer to the interface and internals of trio.MemoryReceiveChannel particularly in terms of,
implementing a BroadcastReceiver.receive_nowait() and using it within the async .receive()
failing over to an internal ._receive_from_underlying() when the _nowait() call raises trio.WouldBlock
adding BroadcastState.statistics() for debugging and testing
dropping recursion from .receive()
adds an internal BroadcastReceiver._raise_on_lag: bool which can be set to avoid Lagged errors from being raised in the case where a user knows they want/need a cheap or nasty pattern
MsgStream as only stream type!
this fully drops the separation between MsgStream and ReceiveMsgStream merging the two types into the former; the only thing differing was .send() which should really be a choice not to use since (at least eventually) the underlying impl will always be a bidir stream.
this allows us to also monkey patch BroadcastReciever delivered from MsgStream.subscribe() with a .send() which delegates to the underlying stream making it very simple to support multi-task usage of any fan-out stream 🏄🏼
TODO:
[x] drop the ipython WIP commit (aac8d34) from this history
[x] add the above details to the commit msg for e3a8e45
[ ] ~hopefully a (set of) test(s) to demonstrate the original recursion error?~ => don't think it's worth it, if it shows up again then yes..
originally was with > 2 consumers and a set of tasks are cancelled simultaneously?
Originally driven by a bug found in
piker
where we'd get an inf recursion error due toBroadcastReceiver.receive()
being recursively called when consumer tasks are awoken but no value is ready tonowait
receive.trio.MemoryReceiveChannel
particularly in terms of,BroadcastReceiver.receive_nowait()
and using it within the async.receive()
._receive_from_underlying()
when the_nowait()
call raisestrio.WouldBlock
BroadcastState.statistics()
for debugging and testing.receive()
adds an internal
BroadcastReceiver._raise_on_lag: bool
which can be set to avoidLagged
errors from being raised in the case where a user knows they want/need a cheap or nasty patternMsgStream
as only stream type!MsgStream
andReceiveMsgStream
merging the two types into the former; the only thing differing was.send()
which should really be a choice not to use since (at least eventually) the underlying impl will always be a bidir stream.BroadcastReciever
delivered fromMsgStream.subscribe()
with a.send()
which delegates to the underlying stream making it very simple to support multi-task usage of any fan-out stream 🏄🏼TODO:
ipython
WIP commit (aac8d34) from this history