tradle / multi-hyperbee

A LevelUP compatible leaderless multi-master database with eventual consistency, using hyperbee + CRDT + HLC. Similarly CockroachDB achieves replication on top of RocksDB, but here it is a pure P2P **streaming** database, with zero central management. LevelDB compatibility allows to use Dynalite on top to achieve DynamoDB compatibility with sophisticated auto-updated secondary indexes, and fairly complex queries. Work on Dynalite is almost completed to remove HTTP server, to make this combination perfect for serverless.
36 stars 5 forks source link

Work with TCP server? #8

Closed sce9sc closed 3 years ago

sce9sc commented 3 years ago

Very impressive I tried the example also using it with plain TCP server and it does not work . Will it work only with HyperSwarm??

sce9sc commented 3 years ago

I also tried piping 2 replicating streams . like so

await db2.addPeer(diffFeed.key.toString('hex'))
await db1.addPeer(diffFeed2.key.toString('hex')
let stream1 = await db1.replicate(false, {stream:{},info:{client:false},live: true})
let stream = await db2.replicate(true, {stream:{},info:{client:true},live: true})
stream.pipe(stream1).pipe(stream)

Which actually worked . but only for a few seconds until I got

Error: ETIMEDOUT

I tracked it down to ProtocolStream where the handlers.timeout was undefined

module.exports = class ProtocolStream extends Duplex {
  constructor (initiator, handlers = {}) {
    super()

    if (typeof initiator !== 'boolean') throw new Error('Must specify initiator boolean in replication stream')

    this.initiator = initiator
    this.handlers = handlers
    this.channelizer = new Channelizer(this, {
      encrypted: handlers.encrypted,
      noise: handlers.noise,
      keyPair: handlers.keyPair
    })
    this.state = new SHP(initiator, this.channelizer)
    this.live = !!handlers.live
    this.timeout = null
    this.keepAlive = null
    this.prefinalize = new Nanoguard()
    this.bytesSent = 0
    this.bytesReceived = 0
    this.extensions = StreamExtension.createLocal(this)
    this.remoteExtensions = this.extensions.remote()

    this._utp = null
    this._changes = 0

    this.once('finish', this.push.bind(this, null))
    this.on('pipe', this._onpipe)

         if (handlers.timeout !== false && handlers.timeout !== 0) {
             const timeout = handlers.timeout || 20000
             this.setTimeout(timeout, () => this.destroy(new Error('ETIMEDOUT')))
             this.setKeepAlive(Math.ceil(timeout / 2))
         }

  }

I hope this helps

pgmemk commented 3 years ago

@sce9sc Thank you for your feedback 😊. It is really great to have someone interested in our project. Right now I am working on another urgent project but I'll definitely look into it in a few days.

pgmemk commented 3 years ago

The reason your example does not work is

It seemed confusing, that's why we changed the API so that you don't have to think about it.

On machine 1:

    let peer2 = await db1.addPeer(key2)
    let dbStream = await db1.replicate(isInitiator, {stream: streamBetweenPeers, live: true})

On machine 2:

    let peer1 = await db2.addPeer(key1)    
    let dbStream = await db2.replicate(isInitiator, {stream: streamBetweenPeers, live: true})

streamBetweenPeers - is either socket or the clone of the same multi-hyperbee's diff hyperbee on the other device. HyperSwarm is one way to establish a stream between peers