Closed neekolas closed 2 years ago
I would be happy to put together a PR that proposes an alternate version of the Waku Filter Spec,
Looking at the spec, I don't see any change necessary. The spec is at a higher level than the issue described here which is libp2p stream management. Or am I missing something?
Looking at the spec, I don't see any change necessary.
You're right that it isn't strictly necessary to change the spec. We could have the replies mirror the existing FilterRequest.
We might be able to clean up the API a little with this new model (have a leaner FilterResponse proto just for the replies, and possibly get rid of the RequestID altogether). But those are squarely in the nice-to-have column, and maybe have some downsides I'm not thinking of. I'd be fine with leaving the spec as-is.
Thanks for this issue, @neekolas! Indeed, the spec currently allows for short-lived connections, with the serving node initiating connections for FilterResponse
s and current implementations follow this route. This is not a spec requirement though, so an implementation that keeps the stream open will fit the spec. We could add a MUST
-requirement for environments (such as browsers) where this is a necessity.
I've therefore added this issue to the ongoing work to improve the filter
protocol and spec: https://github.com/vacp2p/rfc/issues/469
You're also right, in such single stream environments the RequestID
will not be necessary, but we will lose the more general benefit of allowing nodes (both client and serving) to have short connection windows by simply initiating new connections for each communication.
In general I think there are quite a number of things that needs addressing on the filter protocol and supporting browser-based clients will be a priority. The wire protocol can certainly be simplified and improved. Watch this space! :)
@jm-clius @D4nte I think the easiest solution here is to mimic the behavior of go-waku
, which uses Connect()
instead of Dial()
when pushing messages to subscribers. This re-uses an existing connection from the peer store rather than attempting to open a new one. I'm not sure what the equivalent function is in nwaku
, but I would imagine there is similar functionality available.
@neekolas I agree with the simplicity of this solution, which is less formal than using the connection as an identifier in the filter protocol. This should be the default behaviour in nim-libp2p already: when you dial()
but a connection already exists, the switch
will simply reuse the existing connection. The filter protocol does not actively close this connection as part of its normal operation, but also does not attempt to keep the connection alive (in which case dial
will simply attempt negotiating a new connection). Perhaps then a combination of filter with keep-alive pings to maintain the connection?
@jm-clius interesting. I'm not very familiar with the nim libp2p internals. I'm mostly basing my understanding on the fact that the test suite for https://github.com/xmtp-labs/hq/issues/493 only works when the client exposes a listen_address
in its configuration. That test client should have an open Relay connection, which does its own heartbeat, so there should be an already open connection.
It's entirely possible something else is going wrong that is preventing the test case from working. Are you able to confirm that the switch should not be dialing under these conditions?
@neekolas, indeed, I don't have access to the GH with the test suite, but from what you're describing the existing connection should simply by reused when calling "dial" in nim-libp2p.
I've set up what I think is a small scale setup similar to yours consisting of three nwaku nodes: two relay nodes communicating to each other, one filter client node subscribing to receive messages from one of these.
A log snippet from the client:
# Client initiates connection to subscribe
DBG 2022-05-18 11:15:47.860+02:00 post_waku_v2_filter_v1_subscription topics="filter api" tid=1541 file=filter_api.nim:56
INF 2022-05-18 11:15:47.860+02:00 subscribe content topics="wakunode" tid=1541 file=wakunode2.nim:312 filter="(contentFilters: @[(contentTopic: \"my_content\")], pubSubTopic: \"\", subscribe: true)"
INF 2022-05-18 11:15:47.860+02:00 Dialing peer from manager topics="wakupeers" tid=1541 file=peer_manager.nim:51 wireAddr=/ip4/0.0.0.0/tcp/8000/wss peerId=16U*9PCrQZ
DBG 2022-05-18 11:15:47.860+02:00 Dialing peer topics="libp2p dialer" tid=1541 file=dialer.nim:53 peerId=16U*9PCrQZ
DBG 2022-05-18 11:15:48.061+02:00 created new pubsub peer topics="libp2p pubsub" tid=1541 file=pubsub.nim:296 peerId=16U*9PCrQZ
DBG 2022-05-18 11:15:48.061+02:00 Incoming WakuRelay connection topics="wakurelay" tid=1541 file=waku_relay.nim:33
DBG 2022-05-18 11:15:48.061+02:00 starting pubsub read loop topics="libp2p pubsubpeer" tid=1541 file=pubsubpeer.nim:116 conn=16U*9PCrQZ:6284b944e881f3b036e69bac peer=16U*9PCrQZ closed=false
DBG 2022-05-18 11:15:48.162+02:00 decodeMsg: decoded identify topics="libp2p identify" tid=1541 file=identify.nim:142 pubkey=Some(s...1a08)) addresses=/ip4/0.0.0.0/tcp/60000,/ip4/0.0.0.0/tcp/8000/wss protocols=/ipfs/id/1.0.0,/vac/waku/relay/2.0.0,/ipfs/ping/1.0.0,/vac/waku/swap/2.0.0-beta1,/vac/waku/store/2.0.0-beta4,/vac/waku/filter/2.0.0-beta1 observable_address=/ip4/127.0.0.1/tcp/39346/wss proto_version=ipfs/0.1.0 agent_version=nim-libp2p/0.0.1 signedPeerRecord=Some
DBG 2022-05-18 11:15:48.162+02:00 starting pubsub read loop topics="libp2p pubsubpeer" tid=1541 file=pubsubpeer.nim:116 conn=16U*9PCrQZ:6284b944e881f3b036e69bae peer=16U*9PCrQZ closed=false
DBG 2022-05-18 11:15:48.162+02:00 Dial successful topics="libp2p dialer" tid=1541 file=dialer.nim:111 conn=16U*9PCrQZ:6284b943e881f3b036e69baa peerId=16U*9PCrQZ
# Client receives push on existing connection
INF 2022-05-18 11:16:08.831+02:00 filter message received topics="wakufilter" tid=1541 file=waku_filter.nim:177
INF 2022-05-18 11:16:08.831+02:00 push received topics="wakunode" tid=1541 file=wakunode2.nim:463
At 11:15:47.860
the client subscribes to the filter service node. There is no existing connection, so you see a bunch of "dial" logs. If a connection existed before (e.g. for relay
) it should simply be reused.
At 11:16:08.831
the filter service node pushes a message to the client node. This involves "dialing" the client, but since there is an existing connection it is simply reused and the client receives the push on this open connection.
UPDATE: a bit later the connection times out without a keep-alive and the service node indeed creates a new connection when dialing the client with a new push
:
DBG 2022-05-18 11:37:07.994+02:00 Accepted an incoming connection topics="libp2p switch" tid=1541 file=switch.nim:198 conn=:6284be43e881f3b036e69bb2
DBG 2022-05-18 11:37:07.995+02:00 About to accept incoming connection topics="libp2p switch" tid=1541 file=switch.nim:172
DBG 2022-05-18 11:37:08.041+02:00 created new pubsub peer topics="libp2p pubsub" tid=1541 file=pubsub.nim:296 peerId=16U*9PCrQZ
DBG 2022-05-18 11:37:08.142+02:00 Incoming WakuRelay connection topics="wakurelay" tid=1541 file=waku_relay.nim:33
DBG 2022-05-18 11:37:08.142+02:00 starting pubsub read loop topics="libp2p pubsubpeer" tid=1541 file=pubsubpeer.nim:116 conn=16U*9PCrQZ:6284be44e881f3b036e69bb7 peer=16U*9PCrQZ closed=false
DBG 2022-05-18 11:37:08.143+02:00 starting pubsub read loop topics="libp2p pubsubpeer" tid=1541 file=pubsubpeer.nim:116 conn=16U*9PCrQZ:6284be44e881f3b036e69bb5 peer=16U*9PCrQZ closed=false
DBG 2022-05-18 11:37:08.143+02:00 decodeMsg: decoded identify topics="libp2p identify" tid=1541 file=identify.nim:142 pubkey=Some(s...1a08)) addresses=/ip4/0.0.0.0/tcp/60000,/ip4/0.0.0.0/tcp/8000/wss protocols=/ipfs/id/1.0.0,/vac/waku/relay/2.0.0,/ipfs/ping/1.0.0,/vac/waku/swap/2.0.0-beta1,/vac/waku/store/2.0.0-beta4,/vac/waku/filter/2.0.0-beta1 observable_address=/ip4/127.0.0.1/tcp/60001 proto_version=ipfs/0.1.0 agent_version=nim-libp2p/0.0.1 signedPeerRecord=Some
INF 2022-05-18 11:37:08.292+02:00 filter message received topics="wakufilter" tid=1541 file=waku_filter.nim:177
INF 2022-05-18 11:37:08.292+02:00 push received topics="wakunode" tid=1541 file=wakunode2.nim:463
Of course, there may indeed be something happening under the hood that breaks your test case and requires the two-way communication, but as long as the connection does not time out or break, it should be reused (at least in principle). :)
@jm-clius really appreciate you helping verify the behavior. I think this means that the js-waku
filter protocol should be safe to use, given that js-waku
already has a keepalive built in. /cc @D4nte
Unfortunately I came to the same conclusion as @neekolas: there seems to be an issue with the filter protocol as nwaku is not able to push a message back to js-waku. We are able to make it work when we make js-waku listen locally on NodeJS but this is not a valid workaround for the browser.
Please see logs attached (using nwaku v0.9):
This is still a valid issue with v0.11
It works fine when using NodeJS and a NodeJS listens to a port (not an option in the browser):
nwaku_Waku_Filter_unsubscribes.log nwaku_Waku_Filter_handles_multiple_messages.log nwaku_Waku_Filter_creates_a_subscription.log
However, it fails to send the messages to js-waku when NodeJS does not have a port open (similar to the browser). I also had issues when using filter in the browser with the prod fleet, which I assume is due to the same reason:
nwaku_Waku_Filter_unsubscribes.log nwaku_Waku_Filter_handles_multiple_messages.log nwaku_Waku_Filter_creates_a_subscription.log
TRC 2022-08-20 19:01:09.959+10:00 Found matching contentTopic topics="wakufilter" tid=917558 file=waku_filter.nim:255 filter="(contentTopic: \"/test/1/waku-filter\")" msg="(payload: @[70, 105, 108, 116, 101, 114, 105, 110, 103, 32, 119, 111, 114, 107, 115, 33], contentTopic: \"/test/1/waku-filter\", version: 0, timestamp: 1660986069912000000, proof: (proof: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], merkleRoot: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], epoch: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], shareX: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], shareY: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], nullifier: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]))"
ERR 2022-08-20 19:01:09.959+10:00 failed to push messages to remote peer topics="wakufilter" tid=917558 file=waku_filter.nim:265
Feel free to push a nwaku branch with more trace log that I can enable and test locally.
Or you can do it locally, here is the changes I do in js-waku to "make it fail": https://github.com/status-im/js-waku/compare/master...filter
Thanks, @fryorcraken. Will take a look. Were you using --keepalive
during the test, in case this is related to timeouts?
Thanks, @fryorcraken. Will take a look. Were you using
--keepalive
during the test, in case this is related to timeouts?
Not using keep alive. Not convinced it's relevant as the rest runs withing 20s. Will try with this parameter.
Right, so the issue is indeed that the connection is closed after the filter request is received and therefore cannot be reused later when a message is pushed to the client node.
However, I can't yet say why the connection closes or whether it is closed from nwaku
s side (or it may be that there's a mismatch in libp2p channel management between nim and js libp2p impls).
I've recreated the test with two nwaku nodes (using websocket connections). In this case you can see that the connection remains open, even if the channel is closed:
TRC 2022-08-22 16:47:54.059+02:00 handle: got request topics="libp2p multistream" tid=3154 file=multistream.nim:139 conn=16U*8BQug3:6303971aadce07293d70b5f8 ms=/vac/waku/filter/2.0.0-beta1
TRC 2022-08-22 16:47:54.059+02:00 found handler topics="libp2p multistream" tid=3154 file=multistream.nim:168 conn=16U*8BQug3:6303971aadce07293d70b5f8 protocol=/vac/waku/filter/2.0.0-beta1
TRC 2022-08-22 16:47:54.059+02:00 writing mplex message topics="libp2p mplexcoder" tid=3154 file=coder.nim:80 conn=16U*8BQug3:63039719adce07293d70b5f2 id=3 msgType=MsgIn data=30 encoded=32
TRC 2022-08-22 16:47:54.059+02:00 Sending data to remote topics="websock ws-session" tid=3154 file=session.nim:112 opcode=Binary masked=false dataSize=50
TRC 2022-08-22 16:47:54.103+02:00 Decoded new frame topics="websock ws-session" tid=3154 file=session.nim:281 len=61 fin=true mask=true opcode=Binary
TRC 2022-08-22 16:47:54.104+02:00 Setting binary flag topics="websock ws-session" tid=3154 file=session.nim:348 consumed=0 len=61 first=true remainder=61 fin=true opcode=Binary masked=true
TRC 2022-08-22 16:47:54.104+02:00 Reading bytes from frame stream topics="websock ws-session" tid=3154 file=session.nim:352 len=61 consumed=0 first=true remainder=61 fin=true opcode=Binary masked=true
TRC 2022-08-22 16:47:54.105+02:00 Read data from frame topics="websock ws-session" tid=3154 file=session.nim:358 read=2 consumed=2 len=61 first=true remainder=59 fin=true opcode=Binary masked=true
TRC 2022-08-22 16:47:54.105+02:00 readFrame topics="libp2p noise" tid=3154 file=noise.nim:308 sconn=16U*8BQug3:63039719adce07293d70b5f1 size=59
TRC 2022-08-22 16:47:54.105+02:00 Setting binary flag topics="websock ws-session" tid=3154 file=session.nim:348 consumed=2 len=61 first=true remainder=59 fin=true opcode=Binary masked=true
TRC 2022-08-22 16:47:54.106+02:00 Reading bytes from frame stream topics="websock ws-session" tid=3154 file=session.nim:352 len=61 consumed=2 first=true remainder=59 fin=true opcode=Binary masked=true
TRC 2022-08-22 16:47:54.106+02:00 Read data from frame topics="websock ws-session" tid=3154 file=session.nim:358 read=59 consumed=61 len=61 first=true remainder=0 fin=true opcode=Binary masked=true
TRC 2022-08-22 16:47:54.107+02:00 Read all frames, breaking topics="websock ws-session" tid=3154 file=session.nim:367 consumed=61 len=61 first=false remainder=0 fin=true opcode=Binary masked=true
TRC 2022-08-22 16:47:54.107+02:00 decryptWithAd topics="libp2p noise" tid=3154 file=noise.nim:168 tagIn=67eabc960a5f...2a5b48e25709 tagOut=67eabc960a5f...2a5b48e25709 nonce=18
TRC 2022-08-22 16:47:54.107+02:00 read header varint topics="libp2p mplexcoder" tid=3154 file=coder.nim:47 varint=26 conn=16U*8BQug3:63039719adce07293d70b5f2
TRC 2022-08-22 16:47:54.108+02:00 read data topics="libp2p mplexcoder" tid=3154 file=coder.nim:50 dataLen=41 data=280a14326233...6f6e74656e74 conn=16U*8BQug3:63039719adce07293d70b5f2
TRC 2022-08-22 16:47:54.108+02:00 read message from connection topics="libp2p mplex" tid=3154 file=mplex.nim:141 m=16U*8BQug3:63039719adce07293d70b5f2 data=280a14326233...6f6e74656e74 msgType=MsgOut id=3 initiator=false size=41
TRC 2022-08-22 16:47:54.108+02:00 Processing channel message topics="libp2p mplex" tid=3154 file=mplex.nim:160 m=16U*8BQug3:63039719adce07293d70b5f2 channel=16U*8BQug3:6303971aadce07293d70b5f8:6303971a6d3506293d00b438 data=280a14326233...6f6e74656e74 msgType=MsgOut id=3 initiator=false size=41
TRC 2022-08-22 16:47:54.108+02:00 pushing data to channel topics="libp2p mplex" tid=3154 file=mplex.nim:177 m=16U*8BQug3:63039719adce07293d70b5f2 channel=16U*8BQug3:6303971aadce07293d70b5f8:6303971a6d3506293d00b438 len=41 msgType=MsgOut id=3 initiator=false size=41
TRC 2022-08-22 16:47:54.109+02:00 Pushing data topics="libp2p bufferstream" tid=3154 file=bufferstream.nim:88 s=16U*8BQug3:6303971aadce07293d70b5f8 data=41
TRC 2022-08-22 16:47:54.109+02:00 pushed data to channel topics="libp2p mplex" tid=3154 file=mplex.nim:179 m=16U*8BQug3:63039719adce07293d70b5f2 channel=16U*8BQug3:6303971aadce07293d70b5f8:6303971a6d3506293d00b438 len=41 msgType=MsgOut id=3 initiator=false size=41
TRC 2022-08-22 16:47:54.109+02:00 waiting for data topics="libp2p mplex" tid=3154 file=mplex.nim:130 m=16U*8BQug3:63039719adce07293d70b5f2
TRC 2022-08-22 16:47:54.110+02:00 Reading new frame topics="websock ws-session" tid=3154 file=frame.nim:163
TRC 2022-08-22 16:47:54.110+02:00 add leftovers topics="libp2p bufferstream" tid=3154 file=bufferstream.nim:158 s=16U*8BQug3:6303971aadce07293d70b5f8 len=40
TRC 2022-08-22 16:47:54.110+02:00 readOnce topics="libp2p mplexchannel" tid=3154 file=lpchannel.nim:172 s=16U*8BQug3:6303971aadce07293d70b5f8:6303971a6d3506293d00b438 bytes=1
TRC 2022-08-22 16:47:54.111+02:00 readOnce topics="libp2p mplexchannel" tid=3154 file=lpchannel.nim:172 s=16U*8BQug3:6303971aadce07293d70b5f8:6303971a6d3506293d00b438 bytes=40
INF 2022-08-22 16:47:54.111+02:00 filter message received topics="wakufilter" tid=3154 file=waku_filter.nim:187
TRC 2022-08-22 16:47:54.111+02:00 Closing channel topics="libp2p mplexchannel" tid=3154 file=lpchannel.nim:130 s=16U*8BQug3:6303971aadce07293d70b5f8:6303971a6d3506293d00b438 conn=16U*8BQug3:63039719adce07293d70b5f2 len=0
TRC 2022-08-22 16:47:54.111+02:00 writing mplex message topics="libp2p mplexcoder" tid=3154 file=coder.nim:80 conn=16U*8BQug3:63039719adce07293d70b5f2 id=3 msgType=CloseIn data=0 encoded=2
TRC 2022-08-22 16:47:54.112+02:00 Sending data to remote topics="websock ws-session" tid=3154 file=session.nim:112 opcode=Binary masked=false dataSize=20
TRC 2022-08-22 16:47:54.112+02:00 Closing with EOF topics="libp2p lpstream" tid=3154 file=lpstream.nim:295 s=6303971aadce07293d70b5f8
TRC 2022-08-22 16:47:54.112+02:00 Already closed topics="libp2p mplexchannel" tid=3154 file=lpchannel.nim:126 s=16U*8BQug3:6303971aadce07293d70b5f8:6303971a6d3506293d00b438
TRC 2022-08-22 16:47:54.112+02:00 Closed channel topics="libp2p mplexchannel" tid=3154 file=lpchannel.nim:146 s=16U*8BQug3:6303971aadce07293d70b5f8:6303971a6d3506293d00b438 len=0
TRC 2022-08-22 16:47:54.441+02:00 running heartbeat tid=3154 file=behavior.nim:682 instance=140626653904968
From the above filter message received
indicates the FilterRequest
and normal processing continues after Closed channel
with heartbeat, etc.
However, when the same filter request is sent from a js-waku node we see that the connection is closed after the message is received:
INF 2022-08-20 19:01:09.909+10:00 filter message received topics="wakufilter" tid=917558 file=waku_filter.nim:187
TRC 2022-08-20 19:01:09.909+10:00 Closing channel topics="libp2p mplexchannel" tid=917558 file=lpchannel.nim:130 s=12D*biCKqW:6300a2d5848bd6003074c1eb:3 conn=12D*biCKqW:6300a2d5848bd6003074c1e3 len=0
TRC 2022-08-20 19:01:09.909+10:00 writing mplex message topics="libp2p mplexcoder" tid=917558 file=coder.nim:80 conn=12D*biCKqW:6300a2d5848bd6003074c1e3 id=3 msgType=CloseIn data=0 encoded=2
TRC 2022-08-20 19:01:09.909+10:00 Sending data to remote topics="websock ws-session" tid=917558 file=session.nim:112 opcode=Binary masked=false dataSize=20
TRC 2022-08-20 19:01:09.909+10:00 Closed channel topics="libp2p mplexchannel" tid=917558 file=lpchannel.nim:146 s=12D*biCKqW:6300a2d5848bd6003074c1eb:3 len=0
TRC 2022-08-20 19:01:09.909+10:00 Closing with EOF topics="libp2p lpstream" tid=917558 file=lpstream.nim:295 s=6300a2d5848bd6003074c1eb
TRC 2022-08-20 19:01:09.909+10:00 Already closed topics="libp2p mplexchannel" tid=917558 file=lpchannel.nim:126 s=12D*biCKqW:6300a2d5848bd6003074c1eb:3
TRC 2022-08-20 19:01:09.909+10:00 EOF topics="libp2p bufferstream" tid=917558 file=bufferstream.nim:149 s=12D*biCKqW:6300a2d5848bd6003074c1eb
TRC 2022-08-20 19:01:09.909+10:00 readOnce topics="libp2p mplexchannel" tid=917558 file=lpchannel.nim:172 s=12D*biCKqW:6300a2d5848bd6003074c1eb:3 bytes=0
TRC 2022-08-20 19:01:09.909+10:00 Closing BufferStream topics="libp2p bufferstream" tid=917558 file=bufferstream.nim:177 s=12D*biCKqW:6300a2d5848bd6003074c1eb len=0
TRC 2022-08-20 19:01:09.909+10:00 Closed BufferStream topics="libp2p bufferstream" tid=917558 file=bufferstream.nim:213 s=12D*biCKqW:6300a2d5848bd6003074c1eb
TRC 2022-08-20 19:01:09.909+10:00 Closing connection topics="libp2p connection" tid=917558 file=connection.nim:93 s=12D*biCKqW:6300a2d5848bd6003074c1eb
TRC 2022-08-20 19:01:09.909+10:00 Closed connection topics="libp2p connection" tid=917558 file=connection.nim:103 s=12D*biCKqW:6300a2d5848bd6003074c1eb
TRC 2022-08-20 19:01:09.909+10:00 Closing stream topics="libp2p lpstream" tid=917558 file=lpstream.nim:263 s=6300a2d5848bd6003074c1eb objName=LPChannel dir=In
TRC 2022-08-20 19:01:09.909+10:00 Closed stream topics="libp2p lpstream" tid=917558 file=lpstream.nim:267 s=6300a2d5848bd6003074c1eb objName=LPChannel dir=In
Oh, wait! @fryorcraken, I think I have a hunch on what's going on. Could you try the test again but have relay
mounted on the filter client (but not subscribed to any pubsub topics, so that it won't be used to receive messages).
Afaik there's a known issue (limitation?) in nim-libp2p that requires peers connecting to each other to have the same protocols mounted (even if those protocols are not used) or else the entire connection gets killed (not just the protocol channels). In this case the fact that relay
is mounted on your nwaku node but not on the js-waku node, could mean that the nwaku node closes the entire connection simply because it fails to negotiate relay
with js-waku. @Menduist
Relay is always mounted on js-waku today.
Good to know tho as I intend to enable dismounting relay'
Do note that the subscribe seems to work. Nwaku fails to then forward the message (see logs)
Do note that the subscribe seems to work. Nwaku fails to then forward the message (see logs)
Indeed. But afaict nwaku fails to forward the message because the connection is closed immediately after the (successful) subscribe and it's not clear to me what is triggering this connection closure.
Afaik there's a known issue in nim-libp2p that requires peers connecting to each other to have the same protocols mounted or else the entire connection gets killed
I'm not aware of this, but looking quickly at the code, it's possible that gossipsub kills a peer if it's not compatible with gossipsub, which is indeed an issue
And sorry about the unhelpful log messages, I can't make sense of what's happening here :/ (if it's nwaku closing the connection or js-waku)
I'm not aware of this, but looking quickly at the code, it's possible that gossipsub kills a peer if it's not compatible with gossipsub, which is indeed an issue
Yeah, IIRC it was something required by nimbus to prevent "leaking" connections. The behaviour was to always attempt negotiating gossipsub with a new peer if mounted (even if dialed for a different codec) and to kill all connections to the peer if negotiation fails. Can't find the conversation we had back then, but we ended up just mounting gossipsub/relay by default as a workaround.
And sorry about the unhelpful log messages, I can't make sense of what's happening here :/ (if it's nwaku closing the connection or js-waku)
Ahaha. I was hoping to ask you later this afternoon if you can :D.
This seems to happen just before the filter subscribe message is received and handled:
TRC 2022-08-20 19:01:09.908+10:00 Pushing EOF topics="libp2p bufferstream" tid=917558 file=bufferstream.nim:106 s=12D*biCKqW:6300a2d5848bd6003074c1eb
which I thought may be an indication of the remote end closing the connection?
This line means that the peer closed the stream, not the connection
Some comments:
--keep-alive
, same behaviour, here are the logs:nwaku_Waku_Filter_creates_a_subscription.log nwaku_Waku_Filter_handles_multiple_messages.log nwaku_Waku_Filter_handles_multiple_subscriptions.log nwaku_Waku_Filter_unsubscribes.log
Do note that the js-waku filters do work with go-waku. Which is what XMTP (@neekolas) is using.
I am not able to pull go-waku logs at the moment (facing some issues running js-waku tests with go-waku). Let me know if that's something that would help and I can spend some time on that.
Also something else interesting, in js-libp2p, I seem to only be able to open outbound streams on a connection. I thought from my rust-libp2p days that I could open either from a given client.
I am a bit confused lately in libp2p, I remembered something different from my Rust libp2p days.
What I can see is that the local node opens outbound streams. When doing the subscribe request, the js node opens an outbound stream on the connection with nwaku, for the filter protocol.
The stream is a duplex so it's used for js to send a subscribe message to nwaku, and then nwaku replies to this message (with some empty data?
Is that the stream that is getting closed?
When opening a stream, I have no option on TTL or anything like that. Can only set the protocol.
Also, what I observed in the various js-libp2p examples and protocol implementations is that there are generally 2 strategy to handle streams when needing to send data:
Filter, Store are implemented with strategy (1)
js-libp2p-gossipsub is implemented with strategy 2 (more or less).
Shouldn't nwaku try to open a new stream when trying to send a message to js-waku? Looks like it's missing the fact that the stream is closed. While it may be ideal to use the same stream, I don't think it should be mandatory.
Interestingly, I can see that the stream is closed as soon as it is used (even before the reference to the stream is dropped).
Currently investigating if I can stop the stream closing from my side but IMO this is a temporary fix and I would still expect the remote peer to open a new stream if none are currently available.
If I am able to keep the stream open the remote peer (nwaku) could use this outbound stream to send more data. Meaning that I would need to monitor incoming data on the given outbound stream.
This is not the expected design for libp2p. The expected design is registered a protocol handler (for filter) on the libp2p object. Said handler then handles incoming stream data (inbound stream, opened by remote peer).
Register handler for filter protocol: https://github.com/status-im/js-waku/blob/1c2df434e0f6cbeb038147f8bc0421d58ef182b0/src/lib/waku_filter/index.ts#L71-L70
StreamHandler
interface: https://github.com/libp2p/js-libp2p-interfaces/blob/c71f9db4f30fe4f80dba457af14b0d8d19c4ddf9/packages/interface-registrar/src/index.ts#L9
Just a note, which make these conversation a bit more tricky, is the fact that streams can be half closed.
So you can totally do:
await stream.sendSomeData()
await stream.close()
let res = await stream.read()
close
will close your side of the pipe, but the peer can continue to send stuff
Interesting, the js API seems to indicate that it is possible to specify whether it's closing for writing, reading or both:
From a behaviour perspective, it work with js when the js node is listening on a port. Which to me indicates that nwaku tries to open a new connection instead of a new stream. Because opening a new connection works when the js node is reachable on a given port (advertised multiaddr), but would not for a browser or when not listening on a port with NodeJS.
[nevermind]
Note that we are using mplex in JS. I can see that streams are single use in JS mplex:
Note that it seems nwaku may indeed try to use the previously opened stream. I can see an incoming message on the filter stream.
I run out of time to confirm. I was trying to dismount relay to confirm that indeed, the message is coming on the filter stream and that i am not looking at the relay stream but I faced other issues.
If it's the case, it's not ideal because the stream is considered at close, I have not been successful in processing incoming data on it. I can try again later.
Looking at JS logs it looks like
Note that we are using mplex in JS. I can see that streams are single use in JS mplex:
This is incorrect interpretation of the code, it only ends the source if there is an error.
I tried to see if I can process data on the open outbound stream but the stream get closed as soon (5ms) as the subscribe data is sent. I also attempted to hold the stream open to no avail.
I am not sure why the stream gets closed, it seems that the closing request comes from remote peer:
waku:filter subscribe to [ '/test/1/waku-filter' ] +0ms
libp2p:mplex:trace initiator stream 3 send {
id: 3,
type: 'MESSAGE_INITIATOR (2)',
data: '5d0a2436653962643465322d613733652d346631372d386462662d30373839613561636536373312350801121a2f77616b752f322f64656661756c742d77616b752f70726f746f1a150a132f746573742f312f77616b752d66696c746572'
} +0ms # THIS IS THE SUBSCRIBE MESSAGE
libp2p:mplex:trace initiator stream 3 send { id: 3, type: 'CLOSE_INITIATOR (4)' } +1ms
libp2p:mplex:stream:trace initiator stream 3 sink end - err: undefined +6ms # ALL DATA SENT, CLOSING SINK
waku:test wait +10ms
libp2p:mplex:trace incoming message {
id: 3,
type: 'MESSAGE_INITIATOR (2)',
data: '200a1e0801121a2f77616b752f322f64656661756c742d77616b752f70726f746f'
} +2ms # MESSAGE CONTAINING `waku/2/default-waku/proto`. Not sure why it's incoming on what seems to be the same stream?
libp2p:mplex:trace incoming message {
id: 4,
type: 'MESSAGE_RECEIVER (1)',
data: '132f6d756c746973747265616d2f312e302e300a'
} +0ms # /multistream/1.0.0
libp2p:mplex:trace incoming message {
id: 4,
type: 'MESSAGE_RECEIVER (1)',
data: '1c2f7661632f77616b752f72656c61792f322e302e302d62657461320a'
} +0ms # /vac/waku/relay/2.0.0-beta2
libp2p:mplex:trace initiator stream 4 send {
id: 4,
type: 'MESSAGE_INITIATOR (2)',
data: '200a1e0801121a2f77616b752f322f64656661756c742d77616b752f70726f746f'
} +1ms # /waku/2/default-waku/proto
libp2p:mplex:trace incoming message { id: 3, type: 'CLOSE_RECEIVER (3)' } +1ms # MESSAGE RECEIVED FROM REMOTE TO CLOSE READ
libp2p:mplex:stream:trace initiator stream 3 closeRead +4ms
libp2p:mplex:stream:trace initiator stream 3 source end - err: undefined +0ms # CLOSE READ DONE
libp2p:mplex initiator stream 3 ended 3 +8ms
waku:test push +501ms # SEND MESSAGE OVER LIGHT PUSH
From what I can see, local receives a message to close read the filter stream from remote.
I enabled timestamp to investigate when this CLOSE_RECEIVER
message is initiated:
INF 2022-08-24 10:49:18.611+10:00 filter message received topics="wakufilter" tid=1301717 file=waku_filter.nim:187
TRC 2022-08-24 10:49:18.611+10:00 Closing channel topics="libp2p mplexchannel" tid=1301717 file=lpchannel.nim:130 s=12D*FrLCSF:6305758e361325e805a316c7:3 conn=12D*FrLCSF:6305758e361325e805a316bf len=0
TRC 2022-08-24 10:49:18.611+10:00 writing mplex message topics="libp2p mplexcoder" tid=1301717 file=coder.nim:80 conn=12D*FrLCSF:6305758e361325e805a316bf id=3 msgType=CloseIn data=0 encoded=2
TRC 2022-08-24 10:49:18.611+10:00 Sending data to remote topics="websock ws-session" tid=1301717 file=session.nim:112 opcode=Binary masked=false dataSize=20
TRC 2022-08-24 10:49:18.611+10:00 Closed channel topics="libp2p mplexchannel" tid=1301717 file=lpchannel.nim:146 s=12D*FrLCSF:6305758e361325e805a316c7:3 len=0
TRC 2022-08-24 10:49:18.611+10:00 Closing with EOF topics="libp2p lpstream" tid=1301717 file=lpstream.nim:295 s=6305758e361325e805a316c7
TRC 2022-08-24 10:49:18.611+10:00 Already closed topics="libp2p mplexchannel" tid=1301717 file=lpchannel.nim:126 s=12D*FrLCSF:6305758e361325e805a316c7:3
TRC 2022-08-24 10:49:18.611+10:00 EOF topics="libp2p bufferstream" tid=1301717 file=bufferstream.nim:149 s=12D*FrLCSF:6305758e361325e805a316c7
TRC 2022-08-24 10:49:18.611+10:00 readOnce topics="libp2p mplexchannel" tid=1301717 file=lpchannel.nim:172 s=12D*FrLCSF:6305758e361325e805a316c7:3 bytes=0
TRC 2022-08-24 10:49:18.611+10:00 Closing BufferStream topics="libp2p bufferstream" tid=1301717 file=bufferstream.nim:177 s=12D*FrLCSF:6305758e361325e805a316c7 len=0
TRC 2022-08-24 10:49:18.611+10:00 Closed BufferStream topics="libp2p bufferstream" tid=1301717 file=bufferstream.nim:213 s=12D*FrLCSF:6305758e361325e805a316c7
TRC 2022-08-24 10:49:18.611+10:00 Closing connection topics="libp2p connection" tid=1301717 file=connection.nim:93 s=12D*FrLCSF:6305758e361325e805a316c7
TRC 2022-08-24 10:49:18.611+10:00 Closed connection topics="libp2p connection" tid=1301717 file=connection.nim:103 s=12D*FrLCSF:6305758e361325e805a316c7
TRC 2022-08-24 10:49:18.611+10:00 Closing stream topics="libp2p lpstream" tid=1301717 file=lpstream.nim:263 s=6305758e361325e805a316c7 objName=LPChannel dir=In
TRC 2022-08-24 10:49:18.611+10:00 Closed stream topics="libp2p lpstream" tid=1301717 file=lpstream.nim:267 s=6305758e361325e805a316c7 objName=LPChannel dir=In
TRC 2022-08-24 10:49:18.611+10:00 Stream handler done tid=1301717 file=muxedupgrade.nim:223 conn=12D*FrLCSF:6305758e361325e805a316c7
2022-08-24T00:49:18.614Z libp2p:mplex:trace incoming message { id: 3, type: 'CLOSE_RECEIVER (3)' }
2022-08-24T00:49:18.614Z libp2p:mplex:stream:trace initiator stream 3 closeRead
2022-08-24T00:49:18.614Z libp2p:mplex:stream:trace initiator stream 3 source end - err: undefined
2022-08-24T00:49:18.614Z libp2p:mplex initiator stream 3 ended 3
It looks like nwaku closes the stream as soon as the filter message is received.
full logs:
Now, looking at logs when interacting with go-waku (where it works), the stream also gets closed. The difference I can see is that go-waku opens a new inbound stream to forward the message on filter:
Open outbound stream to send subscribe message:
2022-08-24T01:19:43.862Z waku:filter subscribe to [ '/test/1/waku-filter' ]
2022-08-24T01:19:43.863Z libp2p:mplex:trace initiator stream 3 send {
id: 3,
type: 'MESSAGE_INITIATOR (2)',
data: '5d0a2461373838326330312d313432342d343736372d623939332d30346532333638386631336112350801121a2f77616b752f322f64656661756c742d77616b752f70726f746f1a150a132f746573742f312f77616b752d66696c746572'
}
Some reset messages received, is that normal @richard-ramos?
2022-08-24T01:19:43.865Z libp2p:mplex:trace incoming message { id: 0, type: 'RESET_RECEIVER (5)' }
2022-08-24T01:19:43.871Z libp2p:mplex:stream:trace initiator stream 0 source end - err: Error: stream reset at Object.reset (file:///home/fryorcraken/src/status-im/js-waku/node_modules/@libp2p/mplex/src/stream.ts:140:27) at MplexStreamMuxer._handleIncoming (file:///home/fryorcraken/src/status-im/js-waku/node_modules/@libp2p/mplex/src/mplex.ts:314:16) at file:///home/fryorcraken/src/status-im/js-waku/node_modules/@libp2p/mplex/src/mplex.ts:208:20 at processTicksAndRejections (node:internal/process/task_queues:96:5) at async MplexStreamMuxer.sink (file:///home/fryorcraken/src/status-im/js-waku/node_modules/@libp2p/mplex/src/mplex.ts:202:9) { code: 'ERR_MPLEX_STREAM_RESET' }
2022-08-24T01:19:43.871Z libp2p:mplex:stream:trace initiator stream 0 sink end - err: Error: stream reset at Object.reset (file:///home/fryorcraken/src/status-im/js-waku/node_modules/@libp2p/mplex/src/stream.ts:140:27) at MplexStreamMuxer._handleIncoming (file:///home/fryorcraken/src/status-im/js-waku/node_modules/@libp2p/mplex/src/mplex.ts:314:16) at file:///home/fryorcraken/src/status-im/js-waku/node_modules/@libp2p/mplex/src/mplex.ts:208:20 at processTicksAndRejections (node:internal/process/task_queues:96:5) at async MplexStreamMuxer.sink (file:///home/fryorcraken/src/status-im/js-waku/node_modules/@libp2p/mplex/src/mplex.ts:202:9) { code: 'ERR_MPLEX_STREAM_RESET' }
2022-08-24T01:19:43.871Z libp2p:mplex initiator stream 0 ended 0
2022-08-24T01:19:43.871Z libp2p:mplex:stream:trace initiator stream 0 reset
Close read on the stream (like nwaku):
2022-08-24T01:19:43.872Z libp2p:mplex:trace incoming message { id: 3, type: 'CLOSE_RECEIVER (3)' }
2022-08-24T01:19:43.872Z libp2p:mplex:stream:trace initiator stream 3 closeRead
2022-08-24T01:19:43.872Z libp2p:mplex:stream:trace initiator stream 3 source end - err: undefined
2022-08-24T01:19:43.872Z libp2p:mplex initiator stream 3 ended 3
New inbound stream that pushes the message on filter:
2022-08-24T01:19:44.392Z libp2p:mplex:trace incoming message { id: 2, type: 'NEW_STREAM (0)', data: '2' }
2022-08-24T01:19:44.392Z libp2p:mplex new receiver stream 2 2
2022-08-24T01:19:44.393Z libp2p:mplex:trace incoming message {
id: 2,
type: 'MESSAGE_INITIATOR (2)',
data: '132f6d756c746973747265616d2f312e302e300a1d2f7661632f77616b752f66696c7465722f322e302e302d62657461310a640a2461373838326330312d313432342d343736372d623939332d3034653233363838663133611a3c0a3a0a1046696c746572696e6720776f726b732112132f746573742f312f77616b752d66696c7465725080e7f2d9fbca918e2e2152b8f62b5fc1d841'
} # /multistream/1.0.0 /vac/waku/filter/2.0.0-beta1 d $a7882c01-1424-4767-b993-04e23688f13a�< Filtering works!/test/1/waku-filterP�����ʑ�.!R��+_��A
2022-08-24T01:19:44.393Z libp2p:mplex:trace receiver stream 2 send {
id: 2,
type: 'MESSAGE_RECEIVER (1)',
data: '132f6d756c746973747265616d2f312e302e300a'
}
2022-08-24T01:19:44.393Z libp2p:mplex:trace incoming message { id: 2, type: 'CLOSE_INITIATOR (4)' }
2022-08-24T01:19:44.393Z libp2p:mplex:stream:trace receiver stream 2 closeRead
2022-08-24T01:19:44.393Z libp2p:mplex:stream:trace receiver stream 2 source end - err: undefined
full logs:
Issue comes from here: https://github.com/status-im/nwaku/blob/bc296196790425577e8eb00f3a5d9da0d2f958dd/waku/v2/node/peer_manager/peer_manager.nim#L293-L294
Because the peer doesn't have an address, nwaku doesn't even try to dial it (even though it can create streams since it's already connected to the peer)
Fix:
diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim
index e1e0c902..11fa0904 100644
--- a/waku/v2/node/peer_manager/peer_manager.nim
+++ b/waku/v2/node/peer_manager/peer_manager.nim
@@ -48,7 +48,7 @@ proc insertOrReplace(ps: PeerStorage,
proc dialPeer(pm: PeerManager, peerId: PeerID,
addrs: seq[MultiAddress], proto: string,
dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
- info "Dialing peer from manager", wireAddr = addrs[0], peerId = peerId
+ info "Dialing peer from manager", wireAddr = addrs, peerId = peerId
# Dial Peer
let dialFut = pm.switch.dial(peerId, addrs, proto)
@@ -290,7 +290,7 @@ proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = def
return none(Connection)
let addrs = pm.switch.peerStore[AddressBook][peerId]
- if addrs.len == 0:
- return none(Connection)
return await pm.dialPeer(peerId, addrs, proto, dialTimeout)
Right, thanks for this really good catch, @Menduist. Opening fix PR now.
Fix as implemented by @Menduist: https://github.com/status-im/nwaku/pull/1090
@fryorcraken I think this can be closed as the fundamental issue has been addressed? Any further unexpected behaviour can be reported in a new issue.
The Problem
Waku Filter conceptually seems like an ideal protocol for browser-based environments. Unfortunately, the way the 2.0 version of the filter is specified, it does not seem to be designed with browser-based light clients in mind.
Here's my understanding of how Waku Filter works currently:
FilterRequest
to a node that implements the Filter protocol, specifying some predicates for the filter (topic
,contentTopics
)FilterRequest
to unsubscribe, which will delete the mapping. Alternatively, if replies to the client fail consecutively over a given time period the Filter Node may automatically delete the dead client from its records.This falls apart for browser clients at Step 4, as browsers are only capable of initiating outbound connections but not listening for inbound requests.
I discovered this when attempting to implement Waku Filter in
js-waku
. My implementation works fine in node.js tests when the following configuration options are passed to the Waku Client, but fails otherwise.I haven't actually validated that this also fails in the browser, but my overall understanding of LibP2P tells that it should. Feel free to correct me if I'm wrong there.
Proposed Solution
We would implement a V3 of the Waku Filter protocol that would keep the stream open upon receiving a
FilterRPC
request and send any messages that match the predicate back down the same stream.I would be happy to put together a PR that proposes an alternate version of the Waku Filter Spec, and would be able to implement it in
go-waku
andjs-waku
if everyone was on board. I'd need someone from the Vac/Status side to take onnwaku
support.Alternate solutions