Open 417db2c9-5b99-40d8-8a5d-508110ab7596 opened 7 years ago
When there is a huge amount of writer.write
calls followed by await writer.drain()
on a non-paused channel, and there are no other coroutine switches, await writer.drain()
immediately returns without a switch. This is because asyncio.stream.FlowControlMixin._drain_helper
do not yield
or yield from
on a non-paused stream.
Use-case: AMQP basic.publish method, for which the broker (rabbitmq) do not send any replies back. Trying to publish 4k messages results in the following warnings (PYTHONASYNCIODEBUG env variable is set):
Executing <Handle <TaskWakeupMethWrapper object at 0x1106fde28>(<Future finis...events.py:275>) created at /Users/malinoff/Projects/ideas/amqproto/amqproto/channel.py:85> took 2.371 seconds
2.371 seconds is the time spent on 4k basic_publish
calls.
You can find the test itself on github: https://github.com/malinoff/amqproto/blob/master/tests/stress/test_4k_msgs.py#L11-L12
An easy fix would be to replace return (https://github.com/python/cpython/blob/master/Lib/asyncio/streams.py#L206) with yield (and but the code below under the else clause; I'm willing to prepare a pull request), but maybe I'm missing something and such behavior is intentional?
Andrew, what are your thoughts on this one?
I've changed the implementation significantly since August 2017, futures are not involved anymore so please ignore that part. However, such drain behavior is still present - but I don't think anymore that yielding to the event loop is an easy fix.
I've tried to do so in my lib, and it showed significant slowdowns (around 4-5k publishes per second). It's not acceptable. I also found this message from Guido https://github.com/python/asyncio/issues/263#issuecomment-142702725.
What really helped is a counter that tracks send calls without waiting for replies, and a user-provided limit; when the counter reaches the limit, an explicit yield (via await asyncio.sleep(0)) is performed. This helped to achieve around 15-16k publishes per second (3-4 times faster. Here's the code: https://github.com/malinoff/amqproto/blob/6568204b539ecf820af2da11bddcca9ce7323ac5/amqproto/adapters/asyncio_adapter.py#L53-L71
Now I'm thinking that such behavior should only be documented - so library authors can deal with it before they face this in production. But if you have other thoughts, I'd be glad to hear.
Documenting this would be a great first step.
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields: ```python assignee = None closed_at = None created_at =
labels = ['3.7', '3.8', 'expert-asyncio']
title = 'asyncio.stream.FlowControlMixin._drain_helper may lead to a blocking behavior'
updated_at =
user = 'https://github.com/malinoff'
```
bugs.python.org fields:
```python
activity =
actor = 'yselivanov'
assignee = 'none'
closed = False
closed_date = None
closer = None
components = ['asyncio']
creation =
creator = 'malinoff'
dependencies = []
files = []
hgrepos = []
issue_num = 31096
keywords = []
message_count = 4.0
messages = ['299610', '317706', '317726', '317886']
nosy_count = 4.0
nosy_names = ['asvetlov', 'yselivanov', 'wumpus', 'malinoff']
pr_nums = []
priority = 'normal'
resolution = None
stage = None
status = 'open'
superseder = None
type = None
url = 'https://bugs.python.org/issue31096'
versions = ['Python 3.7', 'Python 3.8']
```