libp2p / js-libp2p

The JavaScript Implementation of libp2p networking stack.
https://libp2p.github.io/js-libp2p/
Other
2.33k stars 445 forks source link

pubsub.unsubscribe should comply with interface-ipfs-core spec, since it links to it in the docs #300

Closed noot closed 5 years ago

noot commented 5 years ago

Type: Bug Severity: high Description: Cannot unsubscribe from a pubsub topic after subscribing. Following the IPFS core API here: https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/PUBSUB.md

Steps to reproduce the error:

const libp2p = require("libp2p")
const TCP = require("libp2p-tcp")
const PeerInfo = require("peer-info")
const waterfall = require("async/waterfall")
const defaultsDeep = require("@nodeutils/defaults-deep")
const Mplex = require("libp2p-mplex")
const SECIO = require('libp2p-secio')
const Bootstrap = require('libp2p-bootstrap')

const bootstrappers = [
    '/ip4/127.0.0.1/tcp/43107/ipfs/Qmbm3gwdCZM9GQJg1fpsooZDJ6ma56AFPxqYup53NhvstB'
]

class MyBundle extends libp2p {
    constructor(_options) {
        const defaults = {
            modules: {
                transport: [TCP],
                streamMuxer: [Mplex],
                connEncryption: [SECIO],
                peerDiscovery: [Bootstrap]
            },
            config: {
                peerDiscovery: {
                    bootstrap: {
                        interval: 2000,
                        enabled: true,
                        list: bootstrappers
                    },
                },
                EXPERIMENTAL: {
                    pubsub: true
                }
            }
        }

        super(defaultsDeep(_options, defaults))
    }
}

function createNode(callback) {
    waterfall([
        (cb) => PeerInfo.create(cb),
        (peerInfo, cb) => {
            peerInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/0')
            node = new MyBundle({peerInfo: peerInfo})
            node.start(cb)
        }
    ], (err) => callback(err, node))
}

createNode((err, node) => {
    const topic = 'fruit-of-the-day'
    const receiveMsg = (msg) => console.log(msg.toString())

    node.pubsub.subscribe(topic, receiveMsg, (err) => {
      if (err) {
        return console.error(`failed to subscribe to ${topic}`, err)
      }

      console.log(`subscribed to ${topic}`)

      // unsubscribe a second later
      setTimeout(() => {
        node.pubsub.unsubscribe(topic, receiveMsg, (err) => {
          if (err) {
            return console.error(`failed to unsubscribe from ${topic}`, err)
          }

          console.log(`unsubscribed from ${topic}`)
        })
      }, 1000)
    })
})

This code is from the IPFS core API as well as the libp2p pubsub tutorial. When running the code, you see that we successfully subscribe to the topic, and after waiting 1 second, we should also see an unsubscribe message. However this never happens.

Any help/feedback is much appreciated!

jacobheun commented 5 years ago

Currently libp2p isn't actually doing anything with the callback, so you're never seeing the results. The unsubscribe is currently synchronous so the callback isn't actually needed, but we should add the callback since we're linking to the interface documents.

The unsubscribe should be happening. If you add an interval based message broadcast you should see it stop after the unsubscribe timeout is called.

setInterval(() => {
  node2.pubsub.publish(topic,
    Buffer.from('pomegranate'),
    () => {}
  )
}, 500)
noot commented 5 years ago

@jacobheun cool, looks like unsubscribe is working. thanks!

jacobheun commented 5 years ago

Resolved by #302 via 679d446daa0b43c5a1d77c8f300b34c32c678022