ChainSafe / js-libp2p-yamux

Typescript implementation of Yamux
Other
10 stars 8 forks source link

regression: js<->go integration test hangs with version 6.0.1 vs. 5.0.4 #71

Closed paralin closed 4 months ago

paralin commented 7 months ago

The update from 5.0.4 to 6.0.1 breaks my integration tests between Go and Js yamux:

https://github.com/aperturerobotics/starpc/actions/runs/7055030516/job/19204891175

It works in most cases but there's a specific test which hangs under 6.0.1 and works under 5.0.4.

I haven't been able to find the root cause of the issue yet, but definitely have confirmed the regression is in 6.0.1 vs 5.0.4.

wemeetagain commented 5 months ago

Do you have any more details on this?

paralin commented 5 months ago

@wemeetagain Sure, can reproduce the issue like so:

  1. Clone https://github.com/aperturerobotics/starpc
  2. Try running the example: yarn install then yarn integration
  3. Checkout the branch: git checkout update-yamux
  4. Run integration test again: yarn install then yarn integration

You can see that the updated version hangs. If I rollback yamux to 5.x, it works.

wemeetagain commented 5 months ago

Haven't looked much deeper into this, but I at least enabled yamux logs to see what's happening on the js side.

diff --git a/srpc/log.ts b/srpc/log.ts
index bbfe6ab..2af47bb 100644
--- a/srpc/log.ts
+++ b/srpc/log.ts
@@ -1,4 +1,5 @@
 import type { ComponentLogger, Logger } from '@libp2p/interface'
+import { defaultLogger } from '@libp2p/logger'

 // https://github.com/libp2p/js-libp2p/issues/2276
 // https://github.com/libp2p/js-libp2p/blob/bca8d6e689b47d85dda74082ed72e671139391de/packages/logger/src/index.ts#L86
@@ -21,5 +22,6 @@ export function createDisabledLogger(namespace: string): Logger {
 }

 export function createDisabledComponentLogger(): ComponentLogger {
+  return defaultLogger()
   return { forComponent: createDisabledLogger }
 }
$ DEBUG=libp2p:yamux:trace yarn integration
yarn run v1.22.19
$ npm run test:integration

> starpc@0.22.6 test:integration
> make integration

cd ./integration && bash ./integration.bash
Compiling ts...

  integration.js  404.2kb

Compiling go...
Starting server...
listening on :5000
~/Code/starpc ~/Code/starpc/integration
Starting client...
Connecting to ws://localhost:5000/demo
  libp2p:yamux:trace muxer created +0ms
  libp2p:yamux:trace muxer keepalive enabled interval=30000 +1ms
  libp2p:yamux:trace sending ping request pingId=0 +0ms
  libp2p:yamux:trace sending frame { type: 2, flag: 1, streamID: 0, length: 0 } +1ms
Running RpcStream test via WebSocket..
Calling RpcStream to open a RPC stream client...
Calling Echo via RPC stream...
  libp2p:yamux:trace new outgoing stream id=1 +5ms
  libp2p:yamux:trace sending frame { type: 1, flag: 1, streamID: 1, length: 0 } +1ms
  libp2p:yamux:trace sending frame { type: 0, flag: 0, streamID: 1, length: 4 } +3ms
  libp2p:yamux:trace sending frame { type: 0, flag: 0, streamID: 1, length: 26 } +1ms
  libp2p:yamux:trace sending frame { type: 0, flag: 0, streamID: 1, length: 4 } +0ms
  libp2p:yamux:trace sending frame { type: 0, flag: 0, streamID: 1, length: 12 } +0ms
  libp2p:yamux:trace sending frame { type: 0, flag: 0, streamID: 1, length: 4 } +0ms
  libp2p:yamux:trace sending frame { type: 0, flag: 0, streamID: 1, length: 52 } +0ms
  libp2p:yamux:trace received frame { type: 2, flag: 1, streamID: 0, length: 0 } +6ms
  libp2p:yamux:trace received ping request pingId=0 +0ms
  libp2p:yamux:trace sending ping response pingId=0 +0ms
  libp2p:yamux:trace sending frame { type: 2, flag: 2, streamID: 0, length: 0 } +0ms
  libp2p:yamux:trace received frame { type: 2, flag: 2, streamID: 0, length: 0 } +0ms
  libp2p:yamux:trace received ping response pingId=0 +0ms
  libp2p:yamux:trace received frame { type: 1, flag: 2, streamID: 1, length: 0 } +1ms

So js is sending a new stream open message (type 1, flag 1), sending data (type 0, flag 0), some ping back and forth (type 2), we receive an acknowledgement of the stream being opened (type 1, flag 2), then it hangs (right before we receiving data back).

paralin commented 4 months ago

@wemeetagain Thanks for taking a look at this. I updated the update-yamux branch and switched to defaultLogger() on default. Not sure what's going on here still, though.

achingbrain commented 4 months ago

I can replicate this locally. The change in v6.x.x was for the muxer stream to yield Uint8ArrayLists instead of Uint8Arrays so we can delay copying any bytes until they hit the transport, or not copy bytes at all.

What I've noticed is if I patch the muxer sendFrame function make it go back to just yielding Uint8Arrays, the starpc integration tests start working again:

diff --git a/src/muxer.ts b/src/muxer.ts
index df4e792..502dc7e 100644
--- a/src/muxer.ts
+++ b/src/muxer.ts
@@ -561,9 +561,8 @@ export class YamuxMuxer implements StreamMuxer {
       if (data === undefined) {
         throw new CodeError('invalid frame', ERR_INVALID_FRAME)
       }
-      this.source.push(
-        new Uint8ArrayList(encodeHeader(header), data)
-      )
+      this.source.push(encodeHeader(header))
+      this.source.push(data.subarray())
     } else {
       this.source.push(encodeHeader(header))
     }

Can you point to the bit of starpc that consumes the output from the muxer? I think it may not be handling Uint8ArrayLists properly.

paralin commented 4 months ago

Good catch. I'll have a look

paralin commented 4 months ago

@achingbrain Found the issue: now yamux is emitting Uint8ArrayList objects, and I pass those to it-ws for WebSocket. The WebSocket only supports ingesting Uint8Array (which is fine) but for some reason the type checks didn't flag this, so my code was piping Uint8ArrayList to the Websocket which resulted in sending a buffer of all zeros without throwing an error.

The fix was to simply add a step to the pipe to merge together all the Uint8ArrayList elements before passing to it-ws: https://github.com/aperturerobotics/starpc/pull/121/files#diff-ebbf8f0716a15b0ffad17f77ecd8ee9566f1941b8a921087552cd13ce92e7e6eR24

Thanks very much for the assistance here 👍🏽

achingbrain commented 4 months ago

Good stuff, glad you got it sorted.

One other thing to consider is when turning Uint8ArrayLists into Uint8Arrays you can either copy* all the backing Uint8Arrays into a new Uint8Array:

async function * copyTransform (source) {
  for await (const list of source) {
    yield list.subarray()
  }
}

or you can just yield the backing Uint8Arrays without copying them:

async function * noCopyTransform (source) {
  for await (const list of source) {
    yield * list
  }
}

The advantage of the second method is you use a lot less memory, the disadvantage is you write lots of small bits of data into the transport which can incur some overhead so it's worth testing which is more efficient for your application, but generally not copying the data is better.


* This only actually copies if the Uint8ArrayList contains multiple Uint8Arrays