libp2p / go-libp2p

libp2p implementation in Go
MIT License
5.89k stars 1.05k forks source link

webtransport: Listener can not receive from a stream created with OpenStream #2343

Closed oblique closed 1 year ago

oblique commented 1 year ago

I was using WebTransport implementation as an echo server to write some test cases for https://github.com/libp2p/rust-libp2p/pull/4015 and I found the following bug:

A WebTransport listener can not receive data from a stream opened by OpenStream.

The following code reproduces the problem:

package main

import (
    "context"
    "crypto/rand"
    "fmt"

    webtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport"
    "github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
    "github.com/libp2p/go-libp2p/core/crypto"
    "github.com/libp2p/go-libp2p/core/peer"
    "github.com/multiformats/go-multiaddr"
    "golang.org/x/exp/slices"
)

func startListener() (multiaddr.Multiaddr, peer.ID) {
    priv, pub, err := crypto.GenerateEd25519Key(rand.Reader)
    if err != nil {
        panic(err)
    }

    peerId, err := peer.IDFromPublicKey(pub)
    if err != nil {
        panic(err)
    }

    connManager, err := quicreuse.NewConnManager([32]byte{})
    if err != nil {
        panic(err)
    }

    transport, err := webtransport.New(priv, nil, connManager, nil, nil);
    if err != nil {
        panic(err)
    }

    listener, err := transport.Listen(multiaddr.StringCast("/ip4/127.0.0.1/udp/0/quic-v1/webtransport"))
    if err != nil {
        panic(err)
    }
    fmt.Printf("Listener: Listening\n")

    ma := listener.Multiaddr()

    go func() {
        conn, err := listener.Accept()
        if err != nil {
            panic(err)
        }
        fmt.Printf("Listener: Accepted connection\n")

        fmt.Printf("Listener: Opening stream\n")
        stream, err := conn.OpenStream(context.Background())
        if err != nil {
            panic(err)
        }
        fmt.Printf("Listener: Stream opened\n")

        fmt.Printf("Listener: Receiving\n")
        recvBuf := make([]byte, 8)
        stream.Read(recvBuf)
        fmt.Printf("Listener: Received: %v\n", recvBuf)

        stream.Write(recvBuf)
        fmt.Printf("Listener: Send: %v\n", recvBuf)
    }()

    return ma, peerId
}

func connectAndTest(ma multiaddr.Multiaddr, peerId peer.ID) {
    priv, _, err := crypto.GenerateEd25519Key(rand.Reader)
    if err != nil {
        panic(err)
    }

    connManager, err := quicreuse.NewConnManager([32]byte{})
    if err != nil {
        panic(err)
    }

    transport, err := webtransport.New(priv, nil, connManager, nil, nil);
    if err != nil {
        panic(err)
    }

    fmt.Printf("Dialer: Dialing\n")
    conn, err := transport.Dial(context.Background(), ma, peerId)
    if err != nil {
        panic(err)
    }
    fmt.Printf("Dialer: Dialed\n")

    fmt.Printf("Dialer: Accepting stream\n")
    stream, err := conn.AcceptStream()
    if err != nil {
        panic(err)
    }
    fmt.Printf("Dialer: Accepted stream\n")

    fmt.Printf("Dialer: Sending\n")
    sendBuf := make([]byte, 8)
    rand.Read(sendBuf)
    stream.Write(sendBuf)
    fmt.Printf("Dialer: Send: %v\n", sendBuf)

    recvBuf := make([]byte, 8)
    stream.Read(recvBuf)
    fmt.Printf("Dialer: Received: %v\n", recvBuf)

    if !slices.Equal(sendBuf, recvBuf) {
        panic("sendBuf is not equal to recvBuf")
    }
}

func main() {
    ma, peerId := startListener()
    connectAndTest(ma, peerId)
}

Output:

Listener: Listening
Dialer: Dialing
Dialer: Dialed
Dialer: Accepting stream
Listener: Accepted connection
Listener: Opening stream
Listener: Stream opened
Listener: Receiving
Version Information

github.com/libp2p/rust-libp2p/transports/webtransport-websys/echo-server2
cloud.google.com/go v0.37.0
dmitri.shuralyov.com/app/changes v0.0.0-20180602232624-0a106ad413e3
dmitri.shuralyov.com/html/belt v0.0.0-20180602232347-f7d459c86be0
dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412
dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96
github.com/BurntSushi/toml v0.3.1
github.com/alecthomas/kingpin/v2 v2.3.1
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239
github.com/benbjohnson/clock v1.3.0
github.com/beorn7/perks v1.0.1
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23
github.com/cespare/xxhash v1.1.0
github.com/cespare/xxhash/v2 v2.2.0
github.com/chzyer/readline v1.5.1
github.com/client9/misspell v0.3.4
github.com/containerd/cgroups v1.1.0
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d
github.com/coreos/go-systemd/v22 v22.5.0
github.com/davecgh/go-spew v1.1.1
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c
github.com/decred/dcrd/crypto/blake256 v1.0.0
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0
github.com/dgraph-io/badger v1.6.2
github.com/dgraph-io/ristretto v0.0.2
github.com/docker/go-units v0.5.0
github.com/dustin/go-humanize v1.0.0
github.com/elastic/gosigar v0.14.2
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568
github.com/flynn/noise v1.0.0
github.com/francoispqt/gojay v1.2.13
github.com/fsnotify/fsnotify v1.5.4
github.com/ghodss/yaml v1.0.0
github.com/gliderlabs/ssh v0.1.1
github.com/go-errors/errors v1.0.1
github.com/go-kit/log v0.2.1
github.com/go-logfmt/logfmt v0.5.1
github.com/go-logr/logr v1.2.3
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572
github.com/godbus/dbus/v5 v5.1.0
github.com/gogo/protobuf v1.3.2
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/lint v0.0.0-20180702182130-06c8688daad7
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.3
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c
github.com/google/go-cmp v0.5.9
github.com/google/go-github v17.0.0+incompatible
github.com/google/go-querystring v1.0.0
github.com/google/gopacket v1.1.19
github.com/google/martian v2.1.0+incompatible
github.com/google/pprof v0.0.0-20230405160723-4a4c7d95572b
github.com/google/uuid v1.3.0
github.com/googleapis/gax-go v2.0.0+incompatible
github.com/googleapis/gax-go/v2 v2.0.3
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7
github.com/grpc-ecosystem/grpc-gateway v1.5.0
github.com/hashicorp/golang-lru/v2 v2.0.2
github.com/huin/goupnp v1.1.0
github.com/ianlancetaylor/demangle v0.0.0-20220517205856-0058ec4f073c
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-badger v0.3.0
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/jackpal/go-nat-pmp v1.0.2
github.com/jbenet/go-temp-err-catcher v0.1.0
github.com/jbenet/goprocess v0.1.4
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1
github.com/jpillora/backoff v1.0.0
github.com/json-iterator/go v1.1.12
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024
github.com/julienschmidt/httprouter v1.3.0
github.com/kisielk/gotool v1.0.0
github.com/klauspost/compress v1.16.4
github.com/klauspost/cpuid/v2 v2.2.4
github.com/koron/go-ssdp v0.0.4
github.com/kr/pretty v0.2.1
github.com/kr/pty v1.1.3
github.com/kr/text v0.1.0
github.com/libp2p/go-buffer-pool v0.1.0
github.com/libp2p/go-cidranger v1.1.0
github.com/libp2p/go-flow-metrics v0.1.0
github.com/libp2p/go-libp2p v0.27.5
github.com/libp2p/go-libp2p-asn-util v0.3.0
github.com/libp2p/go-libp2p-testing v0.12.0
github.com/libp2p/go-mplex v0.7.0
github.com/libp2p/go-msgio v0.3.0
github.com/libp2p/go-nat v0.1.0
github.com/libp2p/go-netroute v0.2.1
github.com/libp2p/go-reuseport v0.2.0
github.com/libp2p/go-yamux/v4 v4.0.0
github.com/libp2p/zeroconf/v2 v2.2.0
github.com/lunixbochs/vtclean v1.0.0
github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd
github.com/mattn/go-isatty v0.0.18
github.com/matttproud/golang_protobuf_extensions v1.0.4
github.com/microcosm-cc/bluemonday v1.0.1
github.com/miekg/dns v1.1.53
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1
github.com/minio/sha256-simd v1.0.0
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
github.com/modern-go/reflect2 v1.0.2
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-base32 v0.1.0
github.com/multiformats/go-base36 v0.2.0
github.com/multiformats/go-multiaddr v0.9.0
github.com/multiformats/go-multiaddr-dns v0.3.1
github.com/multiformats/go-multiaddr-fmt v0.1.0
github.com/multiformats/go-multibase v0.2.0
github.com/multiformats/go-multicodec v0.8.1
github.com/multiformats/go-multihash v0.2.1
github.com/multiformats/go-multistream v0.4.1
github.com/multiformats/go-varint v0.0.7
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86
github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab
github.com/onsi/ginkgo v1.16.5
github.com/onsi/ginkgo/v2 v2.9.2
github.com/onsi/gomega v1.27.4
github.com/opencontainers/runtime-spec v1.0.2
github.com/openzipkin/zipkin-go v0.1.1
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/pkg/errors v0.9.1
github.com/pmezard/go-difflib v1.0.0
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.42.0
github.com/prometheus/procfs v0.9.0
github.com/quic-go/qpack v0.4.0
github.com/quic-go/qtls-go1-18 v0.2.0
github.com/quic-go/qtls-go1-19 v0.3.2
github.com/quic-go/qtls-go1-20 v0.2.2
github.com/quic-go/quic-go v0.33.0
github.com/quic-go/webtransport-go v0.5.3
github.com/raulk/go-watchdog v1.3.0
github.com/russross/blackfriday v1.5.2
github.com/sergi/go-diff v1.0.0
github.com/shurcooL/component v0.0.0-20170202220835-f88ec8f54cc4
github.com/shurcooL/events v0.0.0-20181021180414-410e4ca65f48
github.com/shurcooL/github_flavored_markdown v0.0.0-20181002035957-2122de532470
github.com/shurcooL/go v0.0.0-20180423040247-9e1955d9fb6e
github.com/shurcooL/go-goon v0.0.0-20170922171312-37c2f522c041
github.com/shurcooL/gofontwoff v0.0.0-20180329035133-29b52fc0a18d
github.com/shurcooL/gopherjslib v0.0.0-20160914041154-feb6d3990c2c
github.com/shurcooL/highlight_diff v0.0.0-20170515013008-09bb4053de1b
github.com/shurcooL/highlight_go v0.0.0-20181028180052-98c3abbbae20
github.com/shurcooL/home v0.0.0-20181020052607-80b7ffcb30f9
github.com/shurcooL/htmlg v0.0.0-20170918183704-d01228ac9e50
github.com/shurcooL/httperror v0.0.0-20170206035902-86b7830d14cc
github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371
github.com/shurcooL/httpgzip v0.0.0-20180522190206-b1c53ac65af9
github.com/shurcooL/issues v0.0.0-20181008053335-6292fdc1e191
github.com/shurcooL/issuesapp v0.0.0-20180602232740-048589ce2241
github.com/shurcooL/notifications v0.0.0-20181007000457-627ab5aea122
github.com/shurcooL/octicon v0.0.0-20181028054416-fa4f57f9efb2
github.com/shurcooL/reactions v0.0.0-20181006231557-f2e0b4ca5b82
github.com/shurcooL/sanitized_anchor_name v0.0.0-20170918181015-86672fcb3f95
github.com/shurcooL/users v0.0.0-20180125191416-49c67e49c537
github.com/shurcooL/webdavfs v0.0.0-20170829043945-18c3829fa133
github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d
github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/objx v0.1.0
github.com/stretchr/testify v1.8.2
github.com/syndtr/goleveldb v1.0.0
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07
github.com/viant/assertly v0.4.8
github.com/viant/toolbox v0.24.0
github.com/xhit/go-str2duration v1.2.0
github.com/yuin/goldmark v1.4.13
go.opencensus.io v0.18.0
go.uber.org/atomic v1.10.0
go.uber.org/dig v1.16.1
go.uber.org/fx v1.19.2
go.uber.org/goleak v1.1.12
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
go4.org v0.0.0-20180809161055-417644f6feb5
golang.org/x/build v0.0.0-20190111050920-041ab4dc3f9d
golang.org/x/crypto v0.7.0
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
golang.org/x/lint v0.0.0-20200302205851-738671d3881b
golang.org/x/mod v0.10.0
golang.org/x/net v0.10.0
golang.org/x/oauth2 v0.5.0
golang.org/x/perf v0.0.0-20180704124530-6e6d33e29852
golang.org/x/sync v0.1.0
golang.org/x/sys v0.8.0
golang.org/x/term v0.8.0
golang.org/x/text v0.9.0
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c
golang.org/x/tools v0.7.0
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
google.golang.org/api v0.1.0
google.golang.org/appengine v1.6.7
google.golang.org/genproto v0.0.0-20190306203927-b5d61aea6440
google.golang.org/grpc v1.19.0
google.golang.org/protobuf v1.30.0
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c
gopkg.in/inf.v0 v0.9.1
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
grpc.go4.org v0.0.0-20170609214715-11d0a25b4919
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a
lukechampine.com/blake3 v1.1.7
nhooyr.io/websocket v1.8.7
sourcegraph.com/sourcegraph/go-diff v0.5.0
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4
oblique commented 1 year ago

For completeness I provide an example that listener accepts a stream, which works.

package main

import (
    "context"
    "crypto/rand"
    "fmt"

    webtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport"
    "github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
    "github.com/libp2p/go-libp2p/core/crypto"
    "github.com/libp2p/go-libp2p/core/peer"
    "github.com/multiformats/go-multiaddr"
    "golang.org/x/exp/slices"
)

func startListener() (multiaddr.Multiaddr, peer.ID) {
    priv, pub, err := crypto.GenerateEd25519Key(rand.Reader)
    if err != nil {
        panic(err)
    }

    peerId, err := peer.IDFromPublicKey(pub)
    if err != nil {
        panic(err)
    }

    connManager, err := quicreuse.NewConnManager([32]byte{})
    if err != nil {
        panic(err)
    }

    transport, err := webtransport.New(priv, nil, connManager, nil, nil);
    if err != nil {
        panic(err)
    }

    listener, err := transport.Listen(multiaddr.StringCast("/ip4/127.0.0.1/udp/0/quic-v1/webtransport"))
    if err != nil {
        panic(err)
    }
    fmt.Printf("Listener: Listening\n")

    ma := listener.Multiaddr()

    go func() {
        conn, err := listener.Accept()
        if err != nil {
            panic(err)
        }
        fmt.Printf("Listener: Accepted connection\n")

        fmt.Printf("Listener: Accepting stream\n")
        stream, err := conn.AcceptStream()
        if err != nil {
            panic(err)
        }
        fmt.Printf("Listener: Accepted stream\n")

        fmt.Printf("Listener: Receiving\n")
        recvBuf := make([]byte, 8)
        stream.Read(recvBuf)
        fmt.Printf("Listener: Received: %v\n", recvBuf)

        stream.Write(recvBuf)
        fmt.Printf("Listener: Send: %v\n", recvBuf)
    }()

    return ma, peerId
}

func connectAndTest(ma multiaddr.Multiaddr, peerId peer.ID) {
    priv, _, err := crypto.GenerateEd25519Key(rand.Reader)
    if err != nil {
        panic(err)
    }

    connManager, err := quicreuse.NewConnManager([32]byte{})
    if err != nil {
        panic(err)
    }

    transport, err := webtransport.New(priv, nil, connManager, nil, nil);
    if err != nil {
        panic(err)
    }

    fmt.Printf("Dialer: Dialing\n")
    conn, err := transport.Dial(context.Background(), ma, peerId)
    if err != nil {
        panic(err)
    }
    fmt.Printf("Dialer: Dialed\n")

    // This works
    fmt.Printf("Dialer: Opening stream\n")
    stream, err := conn.OpenStream(context.Background())
    if err != nil {
        panic(err)
    }

    fmt.Printf("Dialer: Stream opened\n")

    sendBuf := make([]byte, 8)
    rand.Read(sendBuf)
    stream.Write(sendBuf)
    fmt.Printf("Dialer: Send: %v\n", sendBuf)

    recvBuf := make([]byte, 8)
    stream.Read(recvBuf)
    fmt.Printf("Dialer: Received: %v\n", recvBuf)

    if !slices.Equal(sendBuf, recvBuf) {
        panic("sendBuf is not equal to recvBuf")
    }
}

func main() {
    ma, peerId := startListener()
    connectAndTest(ma, peerId)
}

Output:

Listener: Listening
Dialer: Dialing
Dialer: Dialed
Dialer: Opening stream
Dialer: Stream opened
Dialer: Send: [3 149 155 96 203 87 215 219]
Listener: Accepted connection
Listener: Accepting stream
Listener: Accepted stream
Listener: Receiving
Listener: Received: [3 149 155 96 203 87 215 219]
Listener: Send: [3 149 155 96 203 87 215 219]
Dialer: Received: [3 149 155 96 203 87 215 219]
marten-seemann commented 1 year ago

Works as expected. Opening a stream is a local operation, until you send data on that stream. You never do that.

oblique commented 1 year ago

Thanks a lot! I didn't know this detail