libp2p / go-libp2p-pubsub

The PubSub implementation for go-libp2p
https://github.com/libp2p/specs/tree/master/pubsub
Other
328 stars 186 forks source link

basic pubsub test - fail to make it pass #142

Open nikkolasg opened 5 years ago

nikkolasg commented 5 years ago

Hi, I wanted to play with pubsub a bit, with a very basic scenario:

I would expect that peer 2 would receive the message and also transmit it to peer 3 but in my code only peer 2 does.

I fail to find the relevant missing piece of info in the library... Is my scenario a valid one or is this expected to fail according to the specs ? And if it is a valid one, would you know why it does not work with my code below ?

I'd be happy to add lots of comments and make a PR to show such a basic example.

PS: I added the DisableRelay option since otherwise that does not work, as shown in the issue https://github.com/libp2p/go-libp2p-examples/issues/21

package main

import (
    "bytes"
    "context"
    "fmt"
    "time"

    libp2p "github.com/libp2p/go-libp2p"
    host "github.com/libp2p/go-libp2p-host"
    pstore "github.com/libp2p/go-libp2p-peerstore"
    pubsub "github.com/libp2p/go-libp2p-pubsub"
)

const gossipSubID = "/meshsub/1.0.0"

func main() {

    //golog.SetAllLoggers(gologging.DEBUG) // Change to DEBUG for extra info
    h1 := newHost(2001)
    h2 := newHost(2002)
    h3 := newHost(2003)
    fmt.Printf("host 1: \n\t-Addr:%s\n\t-ID: %s\n", h1.Addrs()[0], h1.ID().Pretty())
    fmt.Printf("host 2: \n\t-Addr:%s\n\t-ID: %s\n", h2.Addrs()[0], h2.ID().Pretty())
    fmt.Printf("host 3: \n\t-Addr:%s\n\t-ID: %s\n", h3.Addrs()[0], h3.ID().Pretty())

    time.Sleep(100 * time.Millisecond)

    // add h1 to h2's store
    h2.Peerstore().AddAddr(h1.ID(), h1.Addrs()[0], pstore.PermanentAddrTTL)
    // add h2 to h1's store
    h1.Peerstore().AddAddr(h2.ID(), h2.Addrs()[0], pstore.PermanentAddrTTL)
    // add h3 to h2's store
    h2.Peerstore().AddAddr(h3.ID(), h3.Addrs()[0], pstore.PermanentAddrTTL)
    // add h2 to h3's store
    h3.Peerstore().AddAddr(h3.ID(), h3.Addrs()[0], pstore.PermanentAddrTTL)

    // ---- gossip sub part
    topic := "random"
    opts := pubsub.WithMessageSigning(false)
    g1, err := pubsub.NewGossipSub(context.Background(), h1, opts)
    requireNil(err)
    g2, err := pubsub.NewGossipSub(context.Background(), h2, opts)
    requireNil(err)
    g3, err := pubsub.NewGossipSub(context.Background(), h3, opts)
    requireNil(err)
    s2, err := g2.Subscribe(topic)
    requireNil(err)
    s3, err := g3.Subscribe(topic)
    requireNil(err)
    time.Sleep(1 * time.Second)

        // 1 connect to 2 and 2 connect to 3
    err = h1.Connect(context.Background(), h2.Peerstore().PeerInfo(h2.ID()))
    requireNil(err)
    err = h2.Connect(context.Background(), h3.Peerstore().PeerInfo(h3.ID()))
    requireNil(err)

    // publish and read
    msg := []byte("Hello Word")
    requireNil(g1.Publish(topic, msg))

    pbMsg, err := s2.Next(context.Background())
    requireNil(err)
    checkEqual(msg, pbMsg.Data)
    fmt.Println(" GOSSIPING WORKS #1")

    pbMsg, err = s3.Next(context.Background())
    requireNil(err)
    checkEqual(msg, pbMsg.Data)
    fmt.Println(" GOSSIPING WORKS #2")
}

func checkEqual(exp, rcvd []byte) {
    if !bytes.Equal(exp, rcvd) {
        panic("not equal")
    }
}

func requireNil(err error) {
    if err != nil {
        panic(err)
    }
}

func newHost(port int) host.Host {
    opts := []libp2p.Option{
        libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)),
        libp2p.DisableRelay(),
    }
    basicHost, err := libp2p.New(context.Background(), opts...)
    if err != nil {
        panic(err)
    }
    return basicHost
}
vyzo commented 5 years ago

Your scenario is valid. You need to wait a bit for overlay initialization (1-2s for the heartbeat).

nikkolasg commented 5 years ago

Great, indeed it works when I put 2 seconds, thanks ! I'm working on a commented version now.

Also there's still one weird behavior I would not have expected: if I connect the nodes together before creating the pubsub and subscribing to the topic, it does not work - simply hangs. Connecting the nodes after subscribing works....

vyzo commented 5 years ago

the delay should fix this weird behaviour too.

vyzo commented 5 years ago

Note that if you open the connections before creating the PubSub object, then it will not work as there will be no connection notification events and so the peers will not be discovered.

daviddias commented 4 years ago

@nikkolasg this is a nice example and something that others have requested (https://github.com/libp2p/go-libp2p-pubsub/issues/160 & https://github.com/libp2p/go-libp2p-pubsub/issues/235), mind if I take it to add it to the repo?