holepunchto / hyperswarm

A distributed networking stack for connecting peers.
https://docs.holepunch.to
MIT License
1.06k stars 85 forks source link

undiffused 'appends' in network #108

Open utanapishtim opened 2 years ago

utanapishtim commented 2 years ago

I am trying to benchmark how long it takes to gossip 'append' (new core lengths) through a swarm given variations in maxPeers per swarm and total swarms in a network. After a increasing the total number of swarms past a given number, I start to observe that not every swarm is receiving an 'append'. A script to reproduce is found below:

// test.js
import { once } from 'events'
import testnet from '@hyperswarm/testnet'
import DHT from '@hyperswarm/dht'
import Hyperswarm from 'hyperswarm'
import Hypercore from 'hypercore'
import ram from 'random-access-memory'
import { pipeline } from 'streamx'

const maxPeers = 2
const network = await testnet(2)
const { bootstrap } = network

const core = new Hypercore(ram)
await core.ready()

const swarm = new Hyperswarm({ maxPeers, dht: new DHT({ bootstrap }) })
swarm.on('connection', (conn) => pipeline(conn, core.replicate(false), conn))
swarm.join(core.key, { server: true, client: false })
await swarm.flush()

const mirrors = Array.from(Array(55)).map(() => new Hypercore(ram, { keyPair: { publicKey: core.key } }))
let count = mirrors.length
await Promise.all(mirrors.map((mirror) => mirror.ready()))
const recvs = Promise.all(mirrors.map((mirror, i) => {
  return once(mirror, 'append').then(
    () => {
      count--
      console.log(`(mirror-${i}) recv'd gossip, ${count} outstanding`)
    },
    (e) => {
      count--
      console.error(`(mirror-${i}) errored, ${count} outstanding\n${e}`)
    }
  )
}))

const swarms = mirrors.map((reader, i) => {
  const swarm = new Hyperswarm({ maxPeers, dht: new DHT({ bootstrap }) })
  swarm.on('connection', (conn) => pipeline(conn, reader.replicate(true), conn))
  swarm.join(core.key, { server: true, client: true })
  return swarm
})

Promise.all(swarms.map((swarm, i) => {
  const done = mirrors[i].findingPeers()
  return swarm.flush().then(done, done)
})).then(() => console.log('readers flushed'))

const start = Date.now()
core.append(Buffer.from('append'))
await recvs
const end = Date.now()
console.log(`It took ${end - start} (ms) to gossip append through ${mirrors.length} mirrors!`)
await Promise.all(swarms.concat(swarm).map((swarm) => swarm.destroy()))
for (const node of network) await node.destroy()
mafintosh commented 2 years ago

You have a bug in your second pipeline where you set the initiator flag to true always. This should be true/false depending on whether it's a client connection (server: true, client: true means its a mix). You can check this by inspecting conn.isInitiator, but a much easier (and more efficient) mode of replication is:

swarm.on('connection', conn => core.replicate(conn))

That does it all for you - use that in general.

mafintosh commented 2 years ago

Fixing that seems to fix the testcase for me. Full (for me) passing gist here:

import { once } from 'events'
import testnet from '@hyperswarm/testnet'
import DHT from '@hyperswarm/dht'
import Hyperswarm from 'hyperswarm'
import Hypercore from 'hypercore'
import ram from 'random-access-memory'
import { pipeline } from 'streamx'

const maxPeers = 2
const network = await testnet(2)
const { bootstrap } = network

const core = new Hypercore(ram)
await core.ready()

const swarm = new Hyperswarm({ maxPeers, dht: new DHT({ bootstrap }) })
swarm.on('connection', (conn) => core.replicate(conn))
swarm.join(core.discoveryKey, { server: true, client: false })
await swarm.flush()

const mirrors = Array.from(Array(1000)).map(() => new Hypercore(ram, core.key))

let count = mirrors.length
await Promise.all(mirrors.map((mirror) => mirror.ready()))
const recvs = Promise.all(mirrors.map((mirror, i) => {
  return once(mirror, 'append').then(
    () => {
      count--
      console.log(`(mirror-${i}) recv'd gossip, ${count} outstanding`)
    },
    (e) => {
      count--
      console.error(`(mirror-${i}) errored, ${count} outstanding\n${e}`)
    }
  )
}))

const swarms = mirrors.map((reader, i) => {
  const swarm = new Hyperswarm({ maxPeers, dht: new DHT({ bootstrap }) })
  swarm.on('connection', (conn) => reader.replicate(conn))
  swarm.join(core.discoveryKey, { server: true, client: true })
  return swarm
})

Promise.all(swarms.map((swarm, i) => {
  const done = mirrors[i].findingPeers()
  return swarm.flush().then(done, done)
})).then(() => console.log('readers flushed'))

const start = Date.now()
core.append(Buffer.from('append'))
await recvs
const end = Date.now()
console.log(`It took ${end - start} (ms) to gossip append through ${mirrors.length} mirrors!`)

await Promise.all(swarms.concat(swarm).map((swarm) => swarm.destroy()))

for (const node of network) await node.destroy()