ssbc / muxrpc

lightweight multiplexed rpc
MIT License
99 stars 13 forks source link

Do substreams have backpressure? #61

Open hackergrrl opened 4 years ago

hackergrrl commented 4 years ago

I couldn't tell by looking at the source, but I wrote this and it seems like the source gets read entirely regardless of the sink's reading capacity:

const MRPC = require('muxrpc')
const pull = require('pull-stream')
const toPull = require('stream-to-pull-stream')
const net = require('net')

const manifest = {
  stuff: 'source'
}

let data = [1,2,3,4,5,6,7,8,9]
const api = {
  stuff() {
    return function (abort, cb) {
      if (!data.length) {
        console.log('source pulled: end')
        return cb(true)
      }
      const d = data.shift()
      console.log('source pulled data:', d)
      cb(null, d)
    }
  }
}

const client = MRPC(manifest, null) ()
const server = MRPC(null, manifest) (api)

net.createServer(stream => {
  stream = toPull.duplex(stream)
  pull(stream, server.createStream(), stream)
}).listen(8080)

const stream = toPull.duplex(net.connect(8080))

const onClose = () => {
  console.log('rpc closed')
}

pull(stream, client.createStream(onClose), stream)

pull(
  client.stuff(),
  function (read) {
    read(null, function next (end, data) {
      console.log('got', end, data)
      if (end) return
      setTimeout(() => {
        read(null, next)
      }, 500)
    })
  }
)

gives

source pulled data: 1
source pulled data: 2
source pulled data: 3
source pulled data: 4
source pulled data: 5
source pulled data: 6
source pulled data: 7
source pulled data: 8
source pulled data: 9
source pulled: end
source pulled: end
got undefined 1
got null 2
got null 3
got null 4
got null 5
got null 6
got null 7
got null 8
got null 9
got true undefined
hackergrrl commented 4 years ago

@dominictarr @christianbundy Any ideas on how we might go about implementing this?

One idea is to have an ACK for each message, which would make it easy to signal backpressure, since the sink would just withhold its ACK until it's ready for more. However, it means an extra half round-trip for each stream chunk, and a change to the protocol.

dominictarr commented 4 years ago

I did a rewrite on the v7 branch that was gonna implement this, but I when I deployed it it ramped up too slowly and broke stuff. I rolled back to v6 and never got around to finishing it. I think the v7 version is a big improvement too, but it needs some research into ways to detect when to change the rate of flow.

on v7 it sent "credit" default 64k, and when that drained half way it would send more. but for a high latency connection this slows it down a lot... it's kinda like a train, it has lots of momentum. if you wanna go fast, it takes longer to stop. ideally, you'd have a way of measuring a the bandwidth and latency of a connection, so that you can start going at the most sensible speed. then decrease or increase from there.

dominictarr commented 4 years ago

hmm, I think that could be added to the control stream (on the 7 branch)

dominictarr commented 4 years ago

oh yeah, ACK is very pull-stream style. that would work to create back pressure, but except over very low latency connections it would be VERY SLOW.

for example, I just did a speed test for (on 3g here) Screenshot_2020-08-22 Speedtest by Ookla - The Global Broadband Speed Test

1Mbps (bits) so 125k/s (bytes) that means band width lets say a packet is 1kish, so 100ish packets per second. but ping is 0.086 seconds, or 11 per second. so if it was one ack per packet you'd only be able to send 11k. that's 10 times slower!

so sending 10 packets per ack would be better... but it's a little bit more tricky than that because you want more data to be flowing while the ack is travelling.

dominictarr commented 4 years ago

of course those values are only a particular connection. we need to detect what they should be on each connection, and they might change over time too

hackergrrl commented 4 years ago

Ha, so it's basically re-implementing TCP flow control.

Does this mean sbot doesn't have backpressure on its source RPCs? Doesn't this cause problems for e.g. createHistoryStream on a large dataset?

staltz commented 4 years ago

@noffle I remember @cryptix saying exactly that: createHistoryStream is problematic because you get all messages as a firehose

clehner commented 4 years ago

Relevant thread: %xcRxdizHSOvF7uV9kVd/SKjrt3Fij37eiQFhMuId29M=.sha256

dominictarr commented 4 years ago

correct it's tcp on top of tcp. legacy replication was particularily bad because it opened potentially thousands of streams, most of which don't send any data, so maintaining back pressure for all of that would add a lot of over head. on the other hand, EBT replication is over a single stream, so far less overhead.

In that thread @cryptix links to yamux... reading their "spec" they barely describe back pressure works... so it sounds similar to the naive method that v7 currently uses...

dominictarr commented 4 years ago

now having some time for this to process in my subconscious I've realized that it just needs to add a ping signal in the control stream... not that hard...