libp2p / go-libp2p

libp2p implementation in Go
MIT License
6.04k stars 1.07k forks source link

EnableHolePunching not working! #2878

Closed iGwkang closed 2 months ago

iGwkang commented 3 months ago

I hope to get help. I found similar issus, but my problem was not solved. https://github.com/libp2p/go-libp2p/issues/2761 https://github.com/libp2p/go-libp2p/issues/2630

Below is the sample code I wrote and the running steps and results.

package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "time"

    "github.com/libp2p/go-libp2p"
    dht "github.com/libp2p/go-libp2p-kad-dht"
    "github.com/libp2p/go-libp2p/core/network"
    "github.com/libp2p/go-libp2p/core/peer"
    drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing"
    dutil "github.com/libp2p/go-libp2p/p2p/discovery/util"
    "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
    ma "github.com/multiformats/go-multiaddr"
)

var op = flag.String("op", "", "relay and bootstrap server")
var relayAddr = flag.String("relay", "", "relay Addr")

func main() {
    flag.Parse()
    if *op == "relay" {
        runRelayAndBootstrapServer()
    } else if *op == "conn" {
        runNode()
    }
}

func runRelayAndBootstrapServer() {
    relay1, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/9876"))
    if err != nil {
        log.Printf("Failed to create relay1: %v", err)
        return
    }

    _, err = relay.New(relay1)
    if err != nil {
        log.Printf("Failed to instantiate the relay: %v", err)
        return
    }
    _, err = dht.New(context.Background(), relay1, dht.Mode(dht.ModeServer)) //
    if err != nil {
        log.Printf("Failed to create DHT: %v", err)
        return
    }

    // _, err = autonat.New(relay1)
    // if err != nil {
    //  log.Printf("Failed to create AutoNAT: %v", err)
    //  return
    // }

    fmt.Printf("[*] Your Bootstrap ID Is: /ip4/%s/tcp/%v/p2p/%s\n", "0.0.0.0", 9876, relay1.ID().String())

    select {}
}

func runNode() {
    maRelayAddr, _ := ma.NewMultiaddr(*relayAddr)
    relay1info, _ := peer.AddrInfoFromP2pAddr(maRelayAddr)

    conn1, err := libp2p.New(
        libp2p.EnableRelay(),
        libp2p.NATPortMap(),
        libp2p.EnableHolePunching(),
        // libp2p.ForceReachabilityPrivate(),
        libp2p.EnableAutoRelayWithStaticRelays([]peer.AddrInfo{*relay1info}),
    )
    if err != nil {
        fmt.Println("Failed to create unreachable1: ", err)
        return
    }

    conn1.SetStreamHandler("/customprotocol", func(s network.Stream) {
        log.Println("Awesome! We're now communicating via the relay!")
        buf := make([]byte, 1024*64)
        for {
            _, err := s.Read(buf)
            if err != nil {
                log.Println("Failed to read from stream")
                break
            }
        }
        s.Close()
    })

    // err = conn1.Connect(context.Background(), *relay1info)
    // if err != nil {
    //  log.Printf("Failed to connect to relay1: %v", err)
    //  return
    // }
    // fmt.Println("Successfully connected to relay1!")

    ctx := context.Background()

    kademliaDHT, err := dht.New(ctx, conn1, dht.BootstrapPeers(*relay1info))

    fmt.Println("Bootstrapping the DHT")
    if err = kademliaDHT.Bootstrap(ctx); err != nil {
        panic(err)
    }

    // // Wait a bit to let bootstrapping finish (really bootstrap should block until it's ready, but that isn't the case yet.)
    time.Sleep(1 * time.Second)

    // We use a rendezvous point "meet me here" to announce our location.
    // This is like telling your friends to meet you at the Eiffel Tower.
    fmt.Println("Announcing ourselves...")
    routingDiscovery := drouting.NewRoutingDiscovery(kademliaDHT)
    dutil.Advertise(ctx, routingDiscovery, "test")
    fmt.Println("Successfully announced!")

    fmt.Println("Searching for other peers...")
    peerChan, err := routingDiscovery.FindPeers(ctx, "test")
    if err != nil {
        panic(err)
    }

    for peer := range peerChan {
        if peer.ID == conn1.ID() {
            continue
        }
        fmt.Println("Found And Connecting to peer:", peer)

        s, err := conn1.NewStream(ctx, peer.ID, "/customprotocol")

        if err != nil {
            fmt.Println("Connection failed:", err)
            continue
        }
        fmt.Println("Connected to:", peer)

        go func() {
            buf := make([]byte, 1024*64)
            for {
                _, err := s.Write(buf)
                if err != nil {
                    log.Println("Unexpected error here. Failed to write: ", err)
                    break
                }
                time.Sleep(70 * time.Millisecond)
            }
            s.Close()
        }()
    }

    select {}
}
go build -o relay .

Host A and Host B, terminal Output:

Connection failed: failed to dial: failed to dial 12D3KooWGvYWA3R5TSr8d6ozuja1Nhy1URd3hQGmVyziSs5PNF4A: all dials failed
  * [/ip6/::1/tcp/7334] dial tcp6 [::1]:7334: connectex: No connection could be made because the target machine actively refused it.
  * [/ip6/::1/udp/62311/quic-v1] context deadline exceeded
  * [/ip6/240e:3b7:3248:7a1:e423:e7e:6217:a354/udp/62311/quic-v1] timeout: no recent network activity
  * [/ip4/127.0.0.1/tcp/7333] dial tcp4 127.0.0.1:7333: connectex: No connection could be made because the target machine actively refused it.
  * [/ip4/10.0.0.16/udp/62309/quic-v1] timeout: no recent network activity
  * [/ip4/10.0.0.16/udp/62310/quic-v1/webtransport/certhash/uEiAzQuWpYHXEkiVhAYOohxId2kLmyXS5m3TcFdPPRX2uxw/certhash/uEiA0Ixeot0LRUq6TydJo-NRHlGAOcSHufpALjXNI-ib-Fg] timeout: no recent network activity
  * [/ip4/127.0.0.1/udp/62309/quic-v1] timeout: no recent network activity
  * [/ip6/::1/udp/62312/quic-v1/webtransport/certhash/uEiAzQuWpYHXEkiVhAYOohxId2kLmyXS5m3TcFdPPRX2uxw/certhash/uEiA0Ixeot0LRUq6TydJo-NRHlGAOcSHufpALjXNI-ib-Fg] context deadline exceeded
  * [/ip4/127.0.0.1/udp/62310/quic-v1/webtransport/certhash/uEiAzQuWpYHXEkiVhAYOohxId2kLmyXS5m3TcFdPPRX2uxw/certhash/uEiA0Ixeot0LRUq6TydJo-NRHlGAOcSHufpALjXNI-ib-Fg] context deadline exceeded
  * [/ip4/10.0.0.16/tcp/7333] dial tcp4 0.0.0.0:20934->10.0.0.16:7333: i/o timeout
  * [/ip6/240e:3b7:3248:7a1:e423:e7e:6217:a354/udp/62312/quic-v1/webtransport/certhash/uEiAzQuWpYHXEkiVhAYOohxId2kLmyXS5m3TcFdPPRX2uxw/certhash/uEiA0Ixeot0LRUq6TydJo-NRHlGAOcSHufpALjXNI-ib-Fg] timeout: no recent network activity

The output addresses do not contain the addresses of the holepunching, they are all their own local addresses.

How should I modify this code to make HolePunching work ?

iGwkang commented 2 months ago

ping... @MarcoPolo @wlynxg Can you help me? Thanks!

MarcoPolo commented 2 months ago

Take a look at our tests for hole punching at (WIP as there is some failure between rust<->go nodes): https://github.com/libp2p/test-plans/pull/381. Specifically this folder https://github.com/libp2p/test-plans/tree/marco/go-holepunch-interop/hole-punch-interop. go-libp2p<->go-libp2p passes the tests. Compare your code with what's in there. The test is using a rust-libp2p relay, but that shouldn't matter, and it should be relatively easy to swap out a go-libp2p relay as well.

iGwkang commented 2 months ago

@MarcoPolo Thanks for your answers. I still failed.

HostA's console:

D:\work\go\src\p2p_test>set TRANSPORT=quic

D:\work\go\src\p2p_test>set MODE=listen

D:\work\go\src\p2p_test>p2p_test.exe -relay /ip4/x.x.x.x/udp/9876/quic-v1/p2p/QmZ65Rt73qZ9Ery88GeXMVQscEQJUfdShKDJfjQAYksPuo
2024/07/21 01:59:45 Attempting to connect to relay QmZ65Rt73qZ9Ery88GeXMVQscEQJUfdShKDJfjQAYksPuo. Try #0
2024/07/21 01:59:45 Connected to relay QmZ65Rt73qZ9Ery88GeXMVQscEQJUfdShKDJfjQAYksPuo
2024/07/21 01:59:45 Listening on [/ip4/10.0.0.16/udp/56685/quic-v1 /ip4/127.0.0.1/udp/56685/quic-v1 /ip4/x.x.x.x/udp/9876/quic-v1/p2p/QmZ65Rt73qZ9Ery88GeXMVQscEQJUfdShKDJfjQAYksPuo/p2p-circuit]
listen client ID: 12D3KooWPFFQADd5Rt51phP3zYWPDZMEKr4X8MryYkbksFjFGByy

HostB's console:

C:\Users\jjbeleive1\Downloads>set TRANSPORT=quic

C:\Users\jjbeleive1\Downloads>set MODE=dial

C:\Users\jjbeleive1\Downloads>p2p_test.exe -relay /ip4/x.x.x.x/udp/9876/quic-v1/p2p/QmZ65Rt73qZ9Ery88GeXMVQscEQJUfdShKDJfjQAYksPuo -peer 12D3KooWPFFQADd5Rt51phP3zYWPDZMEKr4X8MryYkbksFjFGByy
2024/07/21 02:00:18 Attempting to connect to relay QmZ65Rt73qZ9Ery88GeXMVQscEQJUfdShKDJfjQAYksPuo. Try #0
2024/07/21 02:00:18 Connected to relay QmZ65Rt73qZ9Ery88GeXMVQscEQJUfdShKDJfjQAYksPuo
2024/07/21 02:00:18 Connected to relayed peer 12D3KooWPFFQADd5Rt51phP3zYWPDZMEKr4X8MryYkbksFjFGByy
2024/07/21 02:01:18 failed to open stream: context deadline exceeded

How to ensure that it hole punching is working properly? Even if it fails to hole punching, should I switch to relay?

MarcoPolo commented 2 months ago

Can you provide a reproducible test case that fails? Use the hole-punch interop as your base.

wlynxg commented 2 months ago

You will definitely fail to drill because you haven't interacted with enough nodes.

iGwkang commented 2 months ago

Can you provide a reproducible test case that fails? Use the hole-punch interop as your base.

@MarcoPolo I'm using the sample code from the link you gave. https://github.com/libp2p/test-plans/pull/381/files#diff-585dcceb99c2bf95200b714a700fa5a84f32530f4e5cd00b21acda99c57415df

I commented out the Redis-related code and passed in the relay server and peer ID via command line parameters. You can look at my previous reply. https://github.com/libp2p/go-libp2p/issues/2878#issuecomment-2241248884

package main

import (
    "bytes"
    "context"
    "crypto/rand"
    "encoding/json"
    "flag"
    "fmt"
    "log"
    "os"
    "os/signal"
    "time"

    "github.com/libp2p/go-libp2p"
    "github.com/libp2p/go-libp2p/core/event"
    "github.com/libp2p/go-libp2p/core/host"
    "github.com/libp2p/go-libp2p/core/network"
    "github.com/libp2p/go-libp2p/core/peer"
    "github.com/libp2p/go-libp2p/p2p/protocol/identify"
    "github.com/libp2p/go-libp2p/p2p/protocol/ping"
    libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
    "github.com/libp2p/go-libp2p/p2p/transport/tcp"
    "github.com/multiformats/go-multiaddr"
)

var listenClientPeerID = flag.String("peer", "", "")
var relayAddr = flag.String("relay", "", "")

// const redisAddr = "redis:6379"

type resultInfo struct {
    RttToHolePunchedPeerMillis int `json:"rtt_to_holepunched_peer_millis"`
}

func main() {
    flag.Parse()
    tpt := os.Getenv("TRANSPORT")
    switch tpt {
    case "tcp", "quic":
    default:
        log.Fatal("invalid transport")
    }
    mode := os.Getenv("MODE")
    switch mode {
    case "listen", "dial":
    default:
        log.Fatal("invalid mode")
    }
    // rClient := redis.NewClient(&redis.Options{
    //  Addr:     redisAddr,
    //  Password: "",
    //  DB:       0,
    // })
    // defer rClient.Close()
    testTimeout := 3 * time.Minute
    ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
    defer cancel()
    // waitForRedis(ctx, rClient)

    // var err error
    // var resultParts []string
    // switch tpt {
    // case "tcp":
    //  resultParts, err = rClient.BLPop(ctx, testTimeout, "RELAY_TCP_ADDRESS").Result()
    // case "quic":
    //  resultParts, err = rClient.BLPop(ctx, testTimeout, "RELAY_QUIC_ADDRESS").Result()
    // }
    // if err != nil {
    //  log.Fatal("Failed to wait for listener to be ready")
    // }
    relayAddr := multiaddr.StringCast(*relayAddr)

    ai, err := peer.AddrInfoFromP2pAddr(relayAddr)
    if err != nil {
        log.Fatal(err)
    }

    opts := []libp2p.Option{
        libp2p.EnableAutoRelayWithStaticRelays([]peer.AddrInfo{*ai}),
        libp2p.EnableHolePunching(),
        libp2p.ForceReachabilityPrivate(),
        libp2p.NATPortMap(),
    }
    switch tpt {
    case "tcp":
        opts = append(opts, libp2p.Transport(tcp.NewTCPTransport), libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
    case "quic":
        opts = append(opts, libp2p.Transport(libp2pquic.NewTransport), libp2p.ListenAddrStrings("/ip4/0.0.0.0/udp/0/quic-v1"))
    }
    if mode == "listen" {
        opts = append(opts, libp2p.EnableAutoRelayWithStaticRelays([]peer.AddrInfo{*ai}))
    }

    identify.ActivationThresh = 1 // We only have one relay, so we should activate immediately
    h, err := libp2p.New(opts...)
    if err != nil {
        log.Fatal(err)
    }

    waitToConnectToRelay(ctx, h, *ai)

    switch mode {
    case "listen":
        // Listen on the relay
        e, err := h.EventBus().Emitter(new(event.EvtLocalReachabilityChanged))
        if err != nil {
            log.Fatal(err)
        }
        err = e.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPrivate})
        if err != nil {
            log.Fatal(err)
        }

        timeoutTime := time.Now().Add(2 * time.Second)
        for time.Now().Before(timeoutTime) {
            log.Printf("Listening on %s", h.Addrs())
            if len(h.Addrs()) > 0 {
                break
            }

            time.Sleep(500 * time.Millisecond)
        }
        time.Sleep(time.Second) // ? sometimes the relay doesn't have the reservation yet?
        fmt.Println("listen client ID:", h.ID())

        // _, err = rClient.RPush(ctx, listenClientPeerID, h.ID().String()).Result()
        // if err != nil {
        //  log.Fatal(err)
        // }
        c := make(chan os.Signal, 1)
        signal.Notify(c, os.Interrupt)
        <-c
    case "dial":

        // Block on getting the relay's peer ID
        // parts, err := rClient.BLPop(ctx, 30*time.Second, listenClientPeerID).Result()
        // if err != nil {
        //  log.Fatal(err)
        // }
        pid, err := peer.Decode(*listenClientPeerID)
        if err != nil {
            log.Fatal(err)
        }
        circuitAddr := relayAddr.Encapsulate(multiaddr.StringCast("/p2p-circuit/"))
        err = h.Connect(ctx, peer.AddrInfo{
            ID:    pid,
            Addrs: []multiaddr.Multiaddr{circuitAddr},
        })
        if err != nil {
            log.Fatal(err)
        }

        log.Printf("Connected to relayed peer %s", pid)

        // Wait for a direct conn
        s, err := h.NewStream(ctx, pid, ping.ID)
        if err != nil {
            log.Fatal(err)
        }
        defer s.Close()
        // Send a ping message. Implementing this ourselves since the ping protocol allows for pings over relay.
        buf := [32]byte{}
        rand.Read(buf[:])
        start := time.Now()
        _, err = s.Write(buf[:])
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("Is conn limited? %v. %s", s.Conn().Stat().Limited, s.Conn().RemoteMultiaddr())
        retBuf := [32]byte{}
        _, err = s.Read(retBuf[:])
        if err != nil {
            log.Fatal(err)
        }
        if !bytes.Equal(buf[:], retBuf[:]) {
            log.Fatal("Ping failed. Bytes did not match.")
        }
        result := resultInfo{
            RttToHolePunchedPeerMillis: int(time.Since(start).Milliseconds()),
        }
        b, err := json.Marshal(result)
        if err != nil {
            log.Fatal(err)
        }
        fmt.Println(string(b))
    }

}

// func waitForRedis(ctx context.Context, rClient *redis.Client) {
//  for {
//      if ctx.Err() != nil {
//          log.Fatal("timeout waiting for redis")
//      }

//      // Wait for redis to be ready
//      _, err := rClient.Ping(ctx).Result()
//      if err == nil {
//          break
//      }
//      time.Sleep(100 * time.Millisecond)
//  }
// }

func waitToConnectToRelay(ctx context.Context, h host.Host, relayInfo peer.AddrInfo) {
    try := 0
    for {
        log.Printf("Attempting to connect to relay %s. Try #%d", relayInfo.ID, try)
        try++
        if ctx.Err() != nil {
            log.Fatal("timeout waiting for relay")
        }
        err := h.Connect(ctx, relayInfo)
        if err == nil {
            log.Printf("Connected to relay %s", relayInfo.ID)
            break
        }
        time.Sleep(500 * time.Millisecond)
    }
}
iGwkang commented 2 months ago

You will definitely fail to drill because you haven't interacted with enough nodes.

@wlynxg How many nodes are needed at least? I have a relay node and two node nodes. These two node nodes are behind NAT.

iGwkang commented 2 months ago

@MarcoPolo I have been working on this problem for over a week. Could you please take 5 minutes to run this code and see if the behavior is the same as mine? https://github.com/libp2p/go-libp2p/issues/2878#issuecomment-2241447611 Thank you so much ❤