socketry / async-http

MIT License
298 stars 45 forks source link

Back-pressure streaming #6

Closed ioquatix closed 5 years ago

ioquatix commented 6 years ago

Right now, if a stream sends chunks of data faster than they are consumed, they get buffered in a queue. This can lead to excessive memory usage and performance issues.

https://github.com/socketry/async-http/blob/dd31bac812de504eb1c2b708b417627476f6ca8f/lib/async/http/body/writable.rb#L31

We should implement a queue which will only buffer a limited amount of data/number of chunks. This way, servers which are sending large amounts of data will be limited.

I suggest we allow the queue implementation to be provided as an argument, so that depending on requirements it can be changed. In some cases, buffering indefinitely might make sense. In other cases, you might not want buffering at all. Ideally the queue can handle limits like "up to X chunks" or "up to Y bytes".

cc @janko-m

ioquatix commented 6 years ago

For HTTP/2 this is a bit more problematic because HTTP/2 streams are all part of the same connection so blocking a single read to queue operation will block all streams. Perhaps it's not a stupid approach, but it might be possible to use flow control (e.g. the internal queue is almost full, please stop sending so quickly), so that other streams continue to work. Tricky to get right.

ioquatix commented 6 years ago

Also related to https://github.com/socketry/falcon/issues/7

ioquatix commented 5 years ago

Okay, I just released async-1.12.0

It includes Async::LimitedQueue which can be used as a drop in for Async::Queue but it limits the size of the queue to a specified size (e.g. 8 items).

This can be now dropped into Async::HTTP::Body::Writable:

body = Async::HTTP::Body::Writable.new(queue: Async::LimitedQueue.new(8))
ioquatix commented 5 years ago

Released in v0.37.7

ioquatix commented 5 years ago

Considering this somewhat experimental and only has limited testing.

julik commented 5 years ago

Given that Writable.new() already accepts length as the first and only posarg, is there support for the queue: kwarg too? I've bumped the dependency but don't see the API change on Writable#initialize...

ioquatix commented 5 years ago

This is the API:

https://github.com/socketry/async-http/blob/master/lib/async/http/body/writable.rb#L35

julik commented 5 years ago

Seen, the rubygems index needed updating. It works, but it seems that I need the "volkswagen switch" again to run tests and it is not clear what will happen if the "reading" side of the Writable body just sits there doing nothing. It seems there is no way for the writing side to see the reading side did not accept any input for a while and forcibly disconnect/close the body in response. Imagine having a very slow client that does not accept anything for multiple minutes and you can have resource exhaustion - if the OS does not give you an EPIPE in time at least.

ioquatix commented 5 years ago

Make sure you run the reader and writer in separate tasks, or there is a chance it would deadlock.

Disconnecting slow clients might be beyond the scope of the queue implementation.

ioquatix commented 5 years ago

Maybe a more specific queue which includes a timeout for the reading end... or you could make a subclass of Async::HTTP::Body::Writable which includes a timeout and terminates the output which should force the client connection to close.