waku-org / go-waku

Go implementation of Waku v2 protocol
https://waku.org/
Other
120 stars 43 forks source link

bug: message won't be sent over from node1 to node2 with sharded topic subscription #1086

Closed romanzac closed 6 months ago

romanzac commented 7 months ago

Description During TestStaticShardingLimits execution 1024 pubsub topics were subscribed on relay at node1 and node2. Relay instance1 was used to publish one message on randomly selected pubsub topic out of those 1024 subscribed. Subscription on relay at node2 is used to check the message was received. It looks message was not received. I have added relay.IsSubscribed() check on both relays, and this check is passing.

Similar scenario is working at TestWakuRelayStaticSharding with the difference message is sent from relay2 to relay1.

To Reproduce 1) Checkout https://github.com/waku-org/go-waku/pull/1060/commits/e1210c7ce3b5d3eb3d3ae1284855d465cfe4e1df 2) Apply patch from https://github.com/waku-org/go-waku/pull/1084 3) cd go-waku/waku/v2/node 4) go test -run TestStaticShardingLimits

Expected behavior Message could be received on node2 after publishing on node1.

Log

--- FAIL: TestStaticShardingLimits (33.97s)
    utils.go:402:
            Error Trace:    /Users/roman/sources/waku-org/go-waku/tests/utils.go:402
                                        /usr/local/go/src/runtime/asm_arm64.s:1172
            Error:          Message timeout
            Test:           TestStaticShardingLimits
FAIL
richard-ramos commented 7 months ago

I modified the code slightly:

diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go
index eee80e80..7cb94d38 100644
--- a/waku/v2/node/wakunode2_test.go
+++ b/waku/v2/node/wakunode2_test.go
@@ -4,7 +4,6 @@ import (
    "bytes"
    "context"
    "fmt"
-   wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
    "math/big"
    "math/rand"
    "net"
@@ -12,6 +11,8 @@ import (
    "testing"
    "time"

+   wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
+
    "github.com/ethereum/go-ethereum/crypto"
    "github.com/ethereum/go-ethereum/p2p/enode"
    "github.com/prometheus/client_golang/prometheus"
@@ -418,7 +419,7 @@ func TestStaticShardingMultipleTopics(t *testing.T) {
 }

 func TestStaticShardingLimits(t *testing.T) {
-   ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
+   ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second)
    defer cancel()

    testClusterID := uint16(21)
@@ -507,6 +508,8 @@ func TestStaticShardingLimits(t *testing.T) {
    require.True(t, r1.IsSubscribed(shardedPubSubTopics[randomShard]))
    require.True(t, r2.IsSubscribed(shardedPubSubTopics[randomShard]))

+   time.Sleep(120 * time.Second)
+
    // Publish on node1
    _, err = r1.Publish(ctx, msg1, relay.WithPubSubTopic(shardedPubSubTopics[randomShard]))
    require.NoError(t, err)

And noticed that with this sleep, the test case is succesful:

2024-04-10T09:57:14.708-0400    INFO    gowaku.node2.peer-manager       peermanager/peer_manager.go:441 peer already found in peerstore, but re-adding it as ENR sequence is higher than locally stored   {"peer": "16Uiu2HAmCFRE1yenDPuRK91gMTbJm9iEuMWFZwG5Qmxc9Fk7n9vZ", "newENRSeq": 1712757407949, "storedENRSeq": 1712757407346}
2024-04-10T09:57:14.709-0400    INFO    gowaku.node2.peer-manager       peermanager/peer_manager.go:474 adding peer to peerstore        {"peer": "16Uiu2HAmCFRE1yenDPuRK91gMTbJm9iEuMWFZwG5Qmxc9Fk7n9vZ"}
2024-04-10T09:58:00.945-0400    INFO    gowaku.node2.connection-notifier        node/connectedness.go:58        peer connected  {"peer": "16Uiu2HAmCFRE1yenDPuRK91gMTbJm9iEuMWFZwG5Qmxc9Fk7n9vZ", "direction": "Outbound"}
2024-04-10T09:58:00.945-0400    INFO    gowaku.node2.connection-notifier        node/connectedness.go:58        peer connected  {"peer": "16Uiu2HAm5uw5LNkbVeJizxDs5PEszEHcSYHgQHBgpBYmSEjNEHb8", "direction": "Inbound"}
2024-04-10T09:59:16.642-0400    WARN    pubsub  go-libp2p-pubsub@v0.10.0/tag_tracer.go:157      error bumping delivery tag: no decaying tag registered for topic /waku/2/rs/21/920
2024-04-10T09:59:17.643-0400    INFO    gowaku  tests/utils.go:400      Received        {"msg": "payload:\"test message\" content_topic:\"/test/2/my-app/sharded\" timestamp:1712757436641541548"}

However, do note that it took 45s for the peer to be discovered and connected (check the delta between line 2 and 3 of the logs). Not sure what is the correct behavior, tho. I imagine that the time it took depends on how long running the connectivity loop takes between iterations.

cc: @chaitanyaprem

romanzac commented 7 months ago

Thanks for trying out. I was not able to receive message even after modifications you have proposed. What I have tried myself is to check if discovery works well before subscriptions have started. And it looks, it works well after waiting just 3 seconds. Please checkout https://github.com/waku-org/go-waku/pull/1060/commits/b562771c6965504bf897cb30a65a9883912367f4

go test -run TestStaticShardingLimits | grep Node
2024-04-11T10:07:55.156+0800    INFO    gowaku  node/wakunode2_test.go:468  Node1 has   {"peer ID": "16Uiu2HAm3jhTi59Qkzzj86ydBMeSJ2Ku7E72QtMcFJLGozU2SeBP"}
2024-04-11T10:07:55.156+0800    INFO    gowaku  node/wakunode2_test.go:469  Node2 has   {"peer ID": "16Uiu2HAmD4VrEwiK5mCvZga96o87hoHF4XWGcsgYTkwJ5YCzuNtN"}
2024-04-11T10:07:55.156+0800    INFO    gowaku  node/wakunode2_test.go:473  Peers known to Node1    {"ID": "16Uiu2HAm3jhTi59Qkzzj86ydBMeSJ2Ku7E72QtMcFJLGozU2SeBP"}
2024-04-11T10:07:55.156+0800    INFO    gowaku  node/wakunode2_test.go:473  Peers known to Node1    {"ID": "16Uiu2HAmD4VrEwiK5mCvZga96o87hoHF4XWGcsgYTkwJ5YCzuNtN"}
2024-04-11T10:07:55.156+0800    INFO    gowaku  node/wakunode2_test.go:478  Peers known to Node2    {"ID": "16Uiu2HAmD4VrEwiK5mCvZga96o87hoHF4XWGcsgYTkwJ5YCzuNtN"}
2024-04-11T10:07:55.156+0800    INFO    gowaku  node/wakunode2_test.go:478  Peers known to Node2    {"ID": "16Uiu2HAm3jhTi59Qkzzj86ydBMeSJ2Ku7E72QtMcFJLGozU2SeBP"}

I would say some problems occur after many subscriptions are made. Something breaks around peerstorge ? Not sure where to look.

romanzac commented 7 months ago

When I zoom out a bit, I admit that two nodes working at the same machine and having bursts in topic subscriptions is not very standard situation. If you @richard-ramos and @chaitanyaprem think we should not keep resolving this issue, I will rewrite the test to use one node only and we can close this issue or park it. What do you think?

romanzac commented 7 months ago

I've changed the test to subscribe to just one topic at each node and message still won't get through. Please checkout https://github.com/waku-org/go-waku/pull/1060/commits/8d7e31bde9eaca3eca389ebcf15263b072781960

romanzac commented 6 months ago

Information collected from debugging (single topic subscription):

chaitanyaprem commented 6 months ago

Information collected from debugging:

* 40% of the time the message is not delivered (23 runs)

* Message is not present in the target node’s subscription channel

* No apparent differences between failed and passed test visible in the log

Is this happening with single topic subscription?

suspicious messages: unable to create decaying delivery tag failed to fetch peer score

First message is fine, have seen this happen when we re-initialize relay. Second message, would be nice to know the exact log statement.

romanzac commented 6 months ago

Information collected from debugging:

* 40% of the time the message is not delivered (23 runs)

* Message is not present in the target node’s subscription channel

* No apparent differences between failed and passed test visible in the log

Is this happening with single topic subscription?

suspicious messages: unable to create decaying delivery tag failed to fetch peer score

First message is fine, have seen this happen when we re-initialize relay. Second message, would be nice to know the exact log statement.

Yes, single topic only.

Is it actually for success case: 2024-05-10T15:52:36.852+0800 WARN gowaku.node2.peer-manager peermanager/peer_manager.go:134 failed to fetch peer score {"error": "item not found", "peer": "16Uiu2HAmNL9KcHxCdjuhvizBp3Nw48dC3or4K4GBsdN1bc8xxye1"}

Full logs: failed.log success.log

One more success log without "failed to fetch peer score": success2.log

One more failed log with "peer already found in peerstore, but re-adding": failed2.log

chaitanyaprem commented 6 months ago
  • both success case and failed case shows two times "peer connected” and “peer disconnected” events.

This seems to be happening as metadata protocol is disconnecting the peer as there are no shards specified. This seems ok, because there is a subscribe to shard later in the test. But i am wondering why peers are not connecting again to each other. Taking a look at how to address this.

chaitanyaprem commented 6 months ago
  • both success case and failed case shows two times "peer connected” and “peer disconnected” events.

This seems to be happening as metadata protocol is disconnecting the peer as there are no shards specified. This seems ok, because there is a subscribe to shard later in the test. But i am wondering why peers are not connecting again to each other. Taking a look at how to address this.

You can make the test successful is for both nodes to subscribe to atleast one of the pubsubTopics before the peers discover each other. Let me know if it works.

romanzac commented 6 months ago

@chaitanyaprem Yes, I confirm. One node knows the topic before discovery, the test works.

func TestStaticShardingLimits(t *testing.T) {

    log := utils.Logger()

    if os.Getenv("RUN_FLAKY_TESTS") != "true" {

        log.Info("Skipping", zap.String("test", t.Name()),
            zap.String("reason", "RUN_FLAKY_TESTS environment variable is not set to true"))
        t.SkipNow()
    }

    ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
    defer cancel()

    testClusterID := uint16(21)

    var shardedPubSubTopics []string
    contentTopic1 := "/test/2/my-app/sharded"

    // Node1 with Relay
    hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
    require.NoError(t, err)
    discv5UDPPort1, err := tests.FindFreeUDPPort(t, "0.0.0.0", 3)
    require.NoError(t, err)
    wakuNode1, err := New(
        WithHostAddress(hostAddr1),
        WithWakuRelay(),
        WithClusterID(testClusterID),
        WithDiscoveryV5(uint(discv5UDPPort1), nil, true),
    )
    require.NoError(t, err)
    err = wakuNode1.Start(ctx)
    require.NoError(t, err)
    defer wakuNode1.Stop()

    r1 := wakuNode1.Relay()

    // Subscribe topics related to static sharding
    for i := 0; i < 1; i++ {
        shardedPubSubTopics = append(shardedPubSubTopics, fmt.Sprintf("/waku/2/rs/%d/%d", testClusterID, i))
        _, err = r1.Subscribe(ctx, protocol.NewContentFilter(shardedPubSubTopics[i], contentTopic1))
        require.NoError(t, err)
        time.Sleep(10 * time.Millisecond)
    }

    // Node2 with Relay
    hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
    require.NoError(t, err)
    discv5UDPPort2, err := tests.FindFreeUDPPort(t, "0.0.0.0", 3)
    require.NoError(t, err)
    wakuNode2, err := New(
        WithHostAddress(hostAddr2),
        WithWakuRelay(),
        WithClusterID(testClusterID),
        WithDiscoveryV5(uint(discv5UDPPort2), []*enode.Node{wakuNode1.localNode.Node()}, true),
    )
    require.NoError(t, err)
    err = wakuNode2.Start(ctx)
    require.NoError(t, err)
    defer wakuNode2.Stop()

    err = wakuNode1.DiscV5().Start(ctx)
    require.NoError(t, err)
    err = wakuNode2.DiscV5().Start(ctx)
    require.NoError(t, err)

    r2 := wakuNode2.Relay()

    // Let discovery and ENR updates to finish
    time.Sleep(3 * time.Second)

    // Subscribe topics related to static sharding
    for i := 0; i < 1; i++ {
        _, err = r2.Subscribe(ctx, protocol.NewContentFilter(shardedPubSubTopics[i], contentTopic1))
        require.NoError(t, err)
        time.Sleep(10 * time.Millisecond)
    }

    // Let ENR updates to finish
    time.Sleep(3 * time.Second)

    // Check ENR value after 1024 subscriptions
    shardsENR, err := wenr.RelaySharding(wakuNode1.ENR().Record())
    require.NoError(t, err)
    require.Equal(t, testClusterID, shardsENR.ClusterID)
    require.Equal(t, 1, len(shardsENR.ShardIDs))

    // Prepare message
    msg1 := tests.CreateWakuMessage(contentTopic1, utils.GetUnixEpoch(), "test message")

    // Select shard to publish
    randomShard := rand.Intn(1)

    // Check both nodes are subscribed
    require.True(t, r1.IsSubscribed(shardedPubSubTopics[randomShard]))
    require.True(t, r2.IsSubscribed(shardedPubSubTopics[randomShard]))

    time.Sleep(1 * time.Second)

    // Publish on node1
    _, err = r1.Publish(ctx, msg1, relay.WithPubSubTopic(shardedPubSubTopics[randomShard]))
    require.NoError(t, err)

    time.Sleep(1 * time.Second)

    s2, err := r2.GetSubscriptionWithPubsubTopic(shardedPubSubTopics[randomShard], contentTopic1)
    require.NoError(t, err)

    var wg sync.WaitGroup

    // Retrieve on node2
    tests.WaitForMsg(t, 2*time.Second, &wg, s2.Ch)

}