datdotorg / datdot-service

datdot service
https://playproject.io/datdot-service
MIT License
15 stars 9 forks source link

standard for discovering related hypercores #7

Open serapath opened 4 years ago

serapath commented 4 years ago

@todo


serapath commented 4 years ago

kappa-core / multifeed / multiplex

multifeed - mux

MULTIPLEXER

function Multiplexer (isInitiator, key, opts) { // `key` - protocol encryption key

  self._id = opts._id || Math.floor(Math.random() * 10000).toString(16)

  var LOCALOFFER          = []
  var REQUESTED_FEEDS     = []
  var REMOTE_OFFER        = []
  var ACTIVE_FEED_STREAMS = {}

  // Open a virtual feed that has the key set to the shared key
  self._feed = stream.open(key, function onopen () {
    // ...
    self._handshakeExt.send({ client: MULTIFEED, version: PROTOCOL_VERSION, userData: opts.userData })
  })

  // PROTOCOL:

  // PEER A send OFFER
  // 1. CREATE & EMIT 'manifest', called by `multifeed: mux.ready(...)` AND `multifeed: _addFeed(...)`
  self.offerFeeds = function (keys, opts) { // opts = custom data for 'want' selections
    self._localOffer = [...self._localOffer, ...keys] // 2. REMEMBER LOCAL OFFER
    self._manifestExt.send({keys})
  }

  // PEER B receive OFFER
  self._manifestExt = extension(EXT_MANIFEST, msg => { // 3. 
    REMOTE_OFFER = [...REMOTE_OFFER, ...msg.keys] // 4. ADD REMOTE OFFER
    self.emit('manifest', msg, self.requestFeeds) // => triggers REQUEST
  })

  // PEER B send REQUEST
  // 5. Sends your wishlist to remote AND `mux.on('manifest', function onManifest(m) { mux.requestFeeds(m.keys) }`
  // for classical multifeed `ACCEPT_ALL` behaviour both must call `want(remoteHas)`
  self.requestFeeds = function (keys) {
    REQUESTED_FEEDS = [...REQUESTED_FEEDS, ...keys] // 6. REMEMBER REQUESTED FEEDS
    self._requestFeedsExt.send(keys) // only request new feeds
  }

  // PEER A - receive REQUEST
  self._requestFeedsExt = extension(EXT_REQUEST_FEEDS, keys => keys => { // 5a. by other PEER
      var filtered = uniq(keys.filter(key => {
      if (!~LOCALOFFER.indexOf(key)) return // got request for non-offered feed
      return true // All good, we accept the key request
    }))
    self._replicateFeedsExt.send(filtered) // Tell remote which keys we will replicate
    self._replicateFeeds(filtered) // Start replicating as promised
  ))

  // PEER B - receive REPLICATION OFFER
  self._replicateFeedsExt = extension(EXT_REPLICATE_FEEDS, keys => { // feeds
    var filtered = keys.filter( key => !~REQUESTED_FEEDS.indexOf(key))
    // Start replicating as requested.
    self._replicateFeeds(filtered, () => self.stream.emit('remote-feeds')) // 
  })

  // Initializes new replication streams for feeds and joins their streams into
  // the main stream.
  self._replicateFeeds = function (keys, cb) {

    self.emit('replicate', keys, once(startFeedReplication))

    return keys

    // PEER A + B
    function startFeedReplication (keys) {
      var feeds = keys
      var pending = feeds.length

      // only the feeds passed to `feeds` option will be replicated (sent or received)
      // hypercore-protocol has built in protection against receiving unexpected/not asked for data.

      feeds.forEach(feed => {
        var hexKey = feed.key.toString('hex')

        // prevent a feed from being folded into the main stream twice.
        if (typeof ACTIVE_FEED_STREAMS[hexKey] !== 'undefined') return (!--pending) ? cb() : void 0

        var fStream = feed.replicate(self._initiator, Object.assign({}, { // REPLICATE FEED
          live: opts.live, download: opts.download, upload: opts.upload, encrypt: opts.encrypt, stream: self.stream
        }))

        ACTIVE_FEED_STREAMS[hexKey] = fStream // Store reference to this particular feed stream

        function cleanup (_, res) { // delete feed stream reference
          if (ACTIVE_FEED_STREAMS[hexKey]) delete ACTIVE_FEED_STREAMS[hexKey]
        }
        fStream.once('end', cleanup)
        fStream.once('error', cleanup)

        if (!--pending) cb()
      })

      // Bail on replication entirely if there were no feeds to add, and none are pending or active.
      if (feeds.length === 0 && Object.keys(ACTIVE_FEED_STREAMS).length === 0) {
        debug('[REPLICATION] terminating mux: no feeds to sync')
        self._feed.close()
        process.nextTick(cb)
      }
    }
  }
}

multifeed - index

LOCAL PEER STORAGE

self._streams = [] // all peers in the form of `mux` objects

LOCAL FEED STORAGE

self._feeds = {
  [feedkey1]: feed1,
  [feedkey2]: feed2,
  [feedkey3]: feed3,
}

on READY offers all it's FEED KEYS to PEERS

mux.ready(function () {
  var keys = values(self._feeds).map(function (feed) { return feed.key.toString('hex') })
  mux.offerFeeds(keys)
})

stores locally and offers new FEED to PEERS

Multifeed.prototype._addFeed = function (feed, name) {
  self._feeds[name] = feed
  self._feedKeyToFeed[feed.key.toString('hex')] = feed
  feed.setMaxListeners(Infinity)
  self.emit('feed', feed, name)

  // forward live feed announcements
  if (!self._streams.length) return // no-op if no live-connections
  var hexKey = feed.key.toString('hex')
  // Tell each remote that we have a new key available unless it's already being replicated
  self._streams.forEach(function (mux) {
    if (!~mux.knownFeeds().indexOf(hexKey)) mux.offerFeeds([hexKey])
  })
}

add new peer (mux)

Multifeed.prototype.replicate = function (isInitiator, opts) {
  // All multifeeds get a random or passed in `_id`
  // When "ready" they make a feed using a default or passed encryptionKey and set it as `_root` feed

  // MAKE SESSION
  var mux = multiplexer(isInitiator, self._root.key, Object.assign({}, opts, {_id: this._id}))
  /* on ready */ self._streams.push(mux)

  // KEY EXCHANGE listener
  mux.on('manifest', function onManifest(m) { mux.requestFeeds(m.keys) }

  mux.on('replicate', function onReplicate(keys, done) { // REPLICATION REQUEST?
    await addMissingKeys(keys)
    // 1. make sure all keys are proper feedkeys
    // 2. check non of the given feeds already exist
    // for all new keys:
    //   1. make a storage with name "myKey" is "self._feeds.length"
    //   2. make new hypercore for the new feed
    //   3. `self._addFeed(feed, myKey)

    // => looks up all existing feeds based on given keys
    // => calls `mux` callback to replicate those feeds

    done(feed) // replicates
  })

  return mux.stream
}
serapath commented 4 years ago

proposal draft: see https://github.com/playproject-io/datdot-research/issues/17#issuecomment-602563335

okdistribute commented 4 years ago

@serapath it's multifeed that's doing that, not cabal, just to be clear :)

serapath commented 4 years ago

@okdistribute when checking i found cabal-core to use kappa-core and kappa-core to use multifeed, so it felt like underneath, that's whats happening on the level of exchanging hypercores. can you recommend me where or what i should be looking at additionally?

okdistribute commented 4 years ago

Sorry I think I wasn't clear, the datastructure that handles the feeds in cabal is multifeed, and in hyperdrive, it's corestore. So any app built on multifeed or corestore could work with datdot, if you target those datastructures rather than a particular application like hyperdrive or cabal