libp2p / go-libp2p-pubsub

The PubSub implementation for go-libp2p
https://github.com/libp2p/specs/tree/master/pubsub
Other
327 stars 186 forks source link

Question: is pubsub discovery enabled by default (or some parts of it) #349

Closed iulianpascalau closed 4 years ago

iulianpascalau commented 4 years ago

I know it shouldn't have been enabled as the only option enabled is:

optsPS := []pubsub.Option{
    pubsub.WithMessageSigning(withMessageSigning),
}

The question was raised as I saw in one of my tests employing 8 hosts with attached pubsubs instances that where connected manually: x connects with y and z, z with a and b and so on. After about 14 seconds in my test, I saw that the connections changed (some connections appeared between peers that were not connected manually). I have commented out any pubsub usage (including publishing on topics, initialization and so on) and I have observed that my connections did not changed (remained as initially done).

We are using github.com/libp2p/go-libp2p-pubsub v0.3.1 Running in debug mode, I saw that the disc instance is instantiated with &discover{} and the call to Start returns on the first if. I can not figure out where are the extra connections whenever I turn on the pubsub instance. The thing reproduce even if there are no messages broadcast on the network. However extra connections do not appear if any of the pubsub instances did not joined a topic.

vyzo commented 4 years ago

No, discovery (or peer exchange) are not enabled by default. Pubsub shouldn't be opening any connections on its own with no options.

iulianpascalau commented 4 years ago

Will make a new clean test for this, and come up with the results here. Thanks.

iulianpascalau commented 4 years ago

Successfully reproduced the problem. It appears it is related with the subscription canceling and topic closing. This is the minimal piece of code I can get to reproduce the problem (just uncomment the 2 lines right after the TODO)

package main

import (
    "context"
    "crypto/ecdsa"
    "crypto/rand"
    "fmt"
    "time"

    "github.com/btcsuite/btcd/btcec"
    "github.com/libp2p/go-libp2p"
    libp2pCrypto "github.com/libp2p/go-libp2p-core/crypto"
    "github.com/libp2p/go-libp2p-core/host"
    "github.com/libp2p/go-libp2p-core/peer"
    pubsub "github.com/libp2p/go-libp2p-pubsub"
)

type messenger struct {
    host    host.Host
    pb      *pubsub.PubSub
    topic   *pubsub.Topic
    subscr  *pubsub.Subscription
}

func newMessenger() *messenger {
    address := fmt.Sprintf("/ip4/0.0.0.0/tcp/0")
    opts := []libp2p.Option{
        libp2p.ListenAddrStrings(address),
        libp2p.Identity(createP2PPrivKey()),
        libp2p.DefaultMuxers,
        libp2p.DefaultSecurity,
        libp2p.DefaultTransports,
        //we need the disable relay option in order to save the node's bandwidth as much as possible
        libp2p.DisableRelay(),
        libp2p.NATPortMap(),
    }

    h, _ := libp2p.New(context.Background(), opts...)

    optsPS := []pubsub.Option{
        pubsub.WithMessageSigning(true),
    }
    pb, _ := pubsub.NewGossipSub(context.Background(), h, optsPS...)

    return &messenger{
        host: h,
        pb:   pb,
    }
}

func createP2PPrivKey() *libp2pCrypto.Secp256k1PrivateKey {
    prvKey, _ := ecdsa.GenerateKey(btcec.S256(), rand.Reader)
    return (*libp2pCrypto.Secp256k1PrivateKey)(prvKey)
}

func (m *messenger) connectTo(target *messenger){
    addr := peer.AddrInfo{
        ID:    target.host.ID(),
        Addrs: target.host.Addrs(),
    }

    err := m.host.Connect(context.Background(), addr)
    if err != nil{
        fmt.Println("error connecting to peer: " + err.Error())
    }
}

func (m *messenger) joinTopic(topic string){
    m.topic, _ = m.pb.Join(topic)
    m.subscr, _ = m.topic.Subscribe()

    go func(){
       for{
           msg, err := m.subscr.Next(context.Background())
           if err != nil{
               return
           }

           fmt.Printf("%s: got message %s\n", m.host.ID().Pretty(), string(msg.Data))
       }
    }()

}

func main(){
    fmt.Println("creating 8 host connected statically...")
    peers := create8ConnectedPeers()

    defer func(){
       for _, p := range peers{
           _ = p.host.Close()
       }
    }()

    fmt.Println()

    for _, p := range peers{
        p.joinTopic("test")
    }

    go func(){
       time.Sleep(time.Second * 2)
       //TODO uncomment these 2 lines to make the pubsub create connections
       //peers[3].subscr.Cancel()
       //_ = peers[3].topic.Close()
    }()

    for i := 0; i < 10; i++{
        printConnections(peers)
        fmt.Println()
        time.Sleep(time.Second)
    }
}

func printConnections(peers []*messenger){
    for _, p := range peers{
        fmt.Printf(" %s is connected to %d peers\n", p.host.ID().Pretty(), len(p.host.Network().Peers()))
    }
}

// create8ConnectedPeers assembles a network as following:
//
//                             0------------------- 1
//                             |                    |
//        2 ------------------ 3 ------------------ 4
//        |                    |                    |
//        5                    6                    7
func create8ConnectedPeers() []*messenger{
    peers := make([]*messenger, 0)
    for i := 0; i < 8; i++{
        p := newMessenger()
        fmt.Printf("%d - created peer %s\n", i, p.host.ID().Pretty())

        peers = append(peers, p)
    }

    connections := map[int][]int{
        0: {1, 3},
        1: {4},
        2: {5, 3},
        3: {4, 6},
        4: {7},
    }

    createConnections(peers, connections)

    return peers
}

func createConnections(peers []*messenger, connections map[int][]int) {
    for pid, connectTo := range connections {
        connectPeerToOthers(peers, pid, connectTo)
    }
}

func connectPeerToOthers(peers []*messenger, idx int, connectToIdxes []int) {
    for _, connectToIdx := range connectToIdxes {
        peers[idx].connectTo(peers[connectToIdx])
    }
}
vyzo commented 4 years ago

that smells like a bug; @aschmahmann can you take a look at this?

aschmahmann commented 4 years ago

@vyzo I'm not really sure how discovery could even apply here since there's no discovery mechanism even constructed (i.e. there's no DHT, no rendezvous server, etc.).

I'm not up to speed with how peer exchange works in gossipsub, but I would suspect that since doPX=true is passed upon performing a prune (see below) that when we leave the mesh we send a message saying "hey I'm leaving, but try using these guys" https://github.com/libp2p/go-libp2p-pubsub/blob/e18b4edd9af3163d06d50ba09212ea30e4b8f906/gossipsub.go#L1008

changing this to false makes the "extra" connections go away

vyzo commented 4 years ago

Hrm, that's a bug, it should set it to the router's doPX variable; it only happens on leave.

vyzo commented 4 years ago

fix in #353.