clj-commons / aleph

Asynchronous streaming communication for Clojure - web server, web client, and raw TCP/UDP
http://aleph.io
MIT License
2.54k stars 241 forks source link

Add write backpressure by checking Channel writability #691

Open KingMob opened 1 year ago

KingMob commented 1 year ago

Background

From @DerGuteMoritz on Slack:

Question: Does Aleph apply backpressure on outbound traffic anywhere? I know that it does so for inbound traffic (by toggling autoread) but so far I haven't seen anything for outbound traffic. Looks like writes are just queued without bounds 🤔

This can of course be implemented in user code but it seems like a dangerous default..! Then again, Netty also punts this (see e.g. https://github.com/netty/netty/pull/6662).

My response:

Well, both TCP and HTTP/2 advertise windows, but how does that manifest in Netty/Aleph? As you've noted, it doesn't directly; Netty just keeps queueing up whatever you feed it.

See:

Backpressure/throttling has to be implemented in consumer code, and broadly, it seems like there's two options:

  1. All writes return a future/promise that won't be resolved until finished, but we don't necessarily want to chain callbacks onto those, because it's (a) difficult to relate to underlying congestion windows without counting bytes ourselves, and (b) tricky to coordinate with multiple Aleph/Manifold threads.

  2. We listen for changes in the Channel's writability state, since Netty uses that itself to throttle/block, and hold off on any flushes until it's writable.

In the HTTP2 code, I already added a writable? AtomicBoolean for disabling the user handler from writing after an error, but we could use something similar for backpressure. We can check Channel writability, and if not writable, set a flag to call .flush() when .channelWritabilityChanged() is fired.

The only catch with that is, we want to block the .write() -calling code, not just the .flush(). It does no good to respect Netty writability if some user handler is still consuming memory without bound.

Read backpressure

Internally, the aleph.netty/put! fn toggles auto-reading off until it's resolved, which will block Netty reads, and propagate backpressure up the inbound pipeline, and out over the network to any sources. This works reasonably well, but only indirectly handles write backpressure. It doesn't help for non-inbound sources of writes. E.g., if a GET triggers a stream of a 1GB video that overwhelms the client, telling the client to slow down fixes nothing.

See http://normanmaurer.me/presentations/2014-twitter-meetup-netty/slides.html#26.0

Tentative plan

Something along the lines of http://normanmaurer.me/presentations/2014-twitter-meetup-netty/slides.html#9.0

  1. Add an AtomicBoolean(false) to the main handlers. Maybe something like needs-to-write?
  2. In the main pathway, check chan isWritable() before writing. If false, set needs-to-write? to true
  3. In the channelWritabilityChanged handler, check to see if the (1) AtomicBoolean is true, and (2), Channel.isWritable() is true, and if so, write/flush. Set needs-to-write? back to false.

Questions: