libp2p / js-libp2p

The JavaScript Implementation of libp2p networking stack.
https://libp2p.io
Other
2.29k stars 438 forks source link

WebRTC transport apparently broken in the NodeJS environment #2425

Closed victorhahncastell closed 4 months ago

victorhahncastell commented 6 months ago

Severity:

Medium

Description:

The WebRTC transport does not seem to function correctly within the NodeJS environment. While it is obviously intended for the browser environment and not NodeJS, that means that in a usual setup any jest tests for a web project will misbehave in very strange ways:

Steps to reproduce the error:

Here's a minimal test case: 1) In a TypeScript file to be run e.g. with npx tsx, we create three libp2p nodes, a "server" (i.e. a node listening on a dialable address) and two "browser" nodes (i.e. not listening on any dialable address but simply on '/webrtc').

2) We then proceed to connect both "browsers" to the "server" and reserve circuit-relay-transport slots for the browsers at the server.

3) After that, we initiate a WebRTC connection between the two browsers. Through this connection, we send a message directly from browser1 to browser2.

4) We then shut down the server and send another message from browser1 to browser2.

5) Finally, we shut down the browser nodes to end the test.

Expected behavior: 1) Both messages sent through the WebRTC connection from browser1 to browser2 should arrive. 2) NodeJS should terminate at the end of the test.

Actual behaviour: 1) While the message sent from browser1 to browser2 while the server is still running arrives, the one send after the server was shut down does not. 2) NodeJS does not terminate even after all three nodes have been shut down.

Test code:

import { noise } from "@chainsafe/libp2p-noise";
import { yamux } from "@chainsafe/libp2p-yamux";
import { webRTC, webRTCDirect } from "@libp2p/webrtc";
import { webSockets } from "@libp2p/websockets";
import { createLibp2p } from "libp2p";
import { circuitRelayTransport, circuitRelayServer } from "@libp2p/circuit-relay-v2";
import { identify } from "@libp2p/identify";
import { Libp2pNode } from "libp2p/libp2p";
import * as filters from '@libp2p/websockets/filters'
import type { IncomingStreamData } from '@libp2p/interface/src/stream-handler'
import { Connection, Stream } from '@libp2p/interface/src/connection'
import { multiaddr, Multiaddr } from '@multiformats/multiaddr'
import { Buffer } from 'buffer';
import { lpStream, LengthPrefixedStream } from 'it-length-prefixed-stream'
import { Uint8ArrayList } from 'uint8arraylist'

async function main() {
  // create three nodes, one "server" (listening node) and two "browsers" (non-listening)
  const server = await createLibp2p({
    addresses: {listen: ['/ip4/127.0.0.1/tcp/31337/ws']},
    transports: [
      webSockets({filter: filters.all}),
      webRTC(),
      circuitRelayTransport(),
    ],
    connectionEncryption: [noise()],
    streamMuxers: [yamux()],
    services: {
      identify: identify(),
      relay: circuitRelayServer(),
    },
    connectionGater: {
      denyDialPeer: async () => false,
      denyDialMultiaddr: async() => false,
      filterMultiaddrForPeer: async() => true,
    },
    connectionManager: {minConnections: 0}
  }) as unknown as Libp2pNode;  // receiving unexpected type from createLibp2p
  server['name'] = "server";  // just for test output
  await server.start();
  await server.handle("/verity/1.0.0",
    (incomingStreamData: IncomingStreamData) => handleIncoming(incomingStreamData, server));

  const browser1 = await createLibp2p({
    addresses: {listen: ['/webrtc']},
    transports: [
      webSockets({filter: filters.all}),
      webRTC(),
      circuitRelayTransport(),
    ],
    connectionEncryption: [noise()],
    streamMuxers: [yamux()],
    services: {
      identify: identify(),
    },
    connectionGater: {
      denyDialPeer: async () => false,
      denyDialMultiaddr: async() => false,
      filterMultiaddrForPeer: async() => true,
    },
    connectionManager: {minConnections: 0}
  }) as unknown as Libp2pNode;  // receiving unexpected type from createLibp2p
  browser1['name'] = "browser1";  // just for test output
  await browser1.start();
  await browser1.handle("/verity/1.0.0",
    (incomingStreamData: IncomingStreamData) => handleIncoming(incomingStreamData, browser1));
  const b1crt: any = browser1.components.transportManager.
    getTransports().find( (transport) => 'reservationStore' in transport);
  assert(b1crt !== undefined);

  const browser2 = await createLibp2p({
    addresses: {listen: ['/webrtc']},
    transports: [
      webSockets({filter: filters.all}),
      webRTC(),
      circuitRelayTransport(),
    ],
    connectionEncryption: [noise()],
    streamMuxers: [yamux()],
    services: {
      identify: identify(),
    },
    connectionGater: {
      denyDialPeer: async () => false,
      denyDialMultiaddr: async() => false,
      filterMultiaddrForPeer: async() => true,
    },
    connectionManager: {minConnections: 0}
  }) as unknown as Libp2pNode;  // receiving unexpected type from createLibp2p
  await browser2.start();
  browser2['name'] = "browser2";  // just for test output
  await browser2.start();
  await browser2.handle("/verity/1.0.0",
    (incomingStreamData: IncomingStreamData) => handleIncoming(incomingStreamData, browser2));
  const b2crt: any = browser2.components.transportManager.
    getTransports().find( (transport) => 'reservationStore' in transport);
  assert(b2crt !== undefined);

  // connect both "browsers" to the server
  const b1ToServer: Connection = await browser1.dial(multiaddr('/ip4/127.0.0.1/tcp/31337/ws'));
  const b2ToServer: Connection = await browser2.dial(multiaddr('/ip4/127.0.0.1/tcp/31337/ws'));

  assert(server.getConnections().length == 2);
  assert(browser1.getConnections().length == 1);
  assert(browser2.getConnections().length == 1);

  // register circuit relay for both browsers
  b1crt.reservationStore.addRelay(b1ToServer.remotePeer, "configured");
  b2crt.reservationStore.addRelay(b2ToServer.remotePeer, "configured");
  await new Promise(resolve => setTimeout(resolve, 100));  // give it some time

  // both browsers should now have a dialable p2p-circuit/webRTC addr
  let browser1dialable: Multiaddr = undefined;
  let browser2dialable: Multiaddr = undefined;

  // check for a dialable p2p-circuit/webRTC addr at browser1
  for (const multiaddr of browser1.getMultiaddrs()) {
    const protos: string[] = multiaddr.protoNames();
     if (protos.includes("p2p") && protos.includes("p2p-circuit") && protos.includes("webrtc")) {
      browser1dialable = multiaddr;
    }
  }
  assert(browser1dialable !== undefined);
  console.log("browser1's dialable address is: " + browser1dialable)

  // check for a dialable p2p-circuit/webRTC addr at browser2
  for (const multiaddr of browser2.getMultiaddrs()) {
    const protos: string[] = multiaddr.protoNames();
     if (protos.includes("p2p") && protos.includes("p2p-circuit") && protos.includes("webrtc")) {
      browser2dialable = multiaddr;
    }
  }
  assert(browser2dialable !== undefined);
  console.log("browser2's dialable address is: " + browser2dialable)

  // connect browser1 to browser2
  const b1ToB2: Connection = await browser1.dial(browser2dialable);
  assert(b1ToB2.transient === false);
  assert(b1ToB2.status === "open");

  const b1b2rawstream: Stream = await b1ToB2.newStream("/verity/1.0.0");
  const b1tob2stream: LengthPrefixedStream = lpStream(b1b2rawstream);
  console.log("Sending message from browser1 to browser2");
  await b1tob2stream.write(Buffer.from("Message from browser1 while server still connected", 'utf8'));
  await new Promise(resolve => setTimeout(resolve, 100));  // give it some time

  console.log("Stopping server");
  await server.stop();

  console.log("Sending message from browser1 to browser2 after server stopped");
  b1tob2stream.write(Buffer.from("Message from browser1 after server disconnected", 'utf8'));
  await new Promise(resolve => setTimeout(resolve, 1000));  // give it some time

  console.log("teardown: stopping browser nodes");
  await browser1.stop();
  await browser2.stop();
  console.log("end of main, NodeJS should terminate now");
}

async function handleIncoming(incomingStreamData: IncomingStreamData, node) {
  console.log("handling incoming stream at " + node.name)
  const stream: LengthPrefixedStream = lpStream(incomingStreamData.stream);
  let msg: Uint8ArrayList;
  try {
    while (msg = await stream.read()) {
      const msgBuf: Buffer = Buffer.from(
        msg.subarray()  // Note: Here, subarray() re-assembles a message which
                        // may have been received in many fragments.
      );
      console.log(`Message received at ${node.name}: ${msgBuf.toString('utf8')}`);
    }
  } catch(error) {
    console.log("error reading stream at " + node.name + ": " + error);
  }
  console.log("stopping to handle stream at " + node.name)
}

function assert(assertion: boolean) {
  if (!assertion) throw Error("assertion failed");
}

main();

Test output:

$ npx tsx libp2pwebrtctest.ts 
browser1's dialable address is: /ip4/127.0.0.1/tcp/31337/ws/p2p/12D3KooWPWg5HMTu7K3Wmnz86hdHsFFSysvGD1c9zp9sQEeuCu9W/p2p-circuit/webrtc/p2p/12D3KooWA7xEr2TzwGFQVVwb5QzULbvo5YsqpGWugQoDi1FMH7KY
browser2's dialable address is: /ip4/127.0.0.1/tcp/31337/ws/p2p/12D3KooWPWg5HMTu7K3Wmnz86hdHsFFSysvGD1c9zp9sQEeuCu9W/p2p-circuit/webrtc/p2p/12D3KooWDMAEHtmZetCM1ZkC4n4gL7r8ExbNeBEteuk5VyZeZ6ED
handling incoming stream at browser2
Sending message from browser1 to browser2
Message received at browser2: Message from browser1 while server still connected
Stopping server
Sending message from browser1 to browser2 after server stopped
teardown: stopping browser nodes
error reading stream at browser2: Error: unexpected end of input
stopping to handle stream at browser2
end of main, NodeJS should terminate now

(Note there is only one line stating "Message received at browser2" while we'd expect there to be two.)

achingbrain commented 6 months ago

This is a simplified version of your script that fixes type errors and removes a lot of unnecessary listeners/services/setup:

import { noise } from "@chainsafe/libp2p-noise";
import { yamux } from "@chainsafe/libp2p-yamux";
import { webRTC } from "@libp2p/webrtc";
import { webSockets } from "@libp2p/websockets";
import { createLibp2p } from "libp2p";
import { circuitRelayTransport, circuitRelayServer } from "@libp2p/circuit-relay-v2";
import { identify } from "@libp2p/identify";
import * as filters from '@libp2p/websockets/filters'
import { multiaddr } from '@multiformats/multiaddr'
import { Buffer } from 'buffer';
import { lpStream } from 'it-length-prefixed-stream'
import { Uint8ArrayList } from 'uint8arraylist'
import type { LengthPrefixedStream } from 'it-length-prefixed-stream'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { IncomingStreamData } from '@libp2p/interface/src/stream-handler'
import type { Connection, Libp2p, Stream } from '@libp2p/interface'

async function main() {
  // create three nodes, one "server" (listening node) and two "browsers" (non-listening)
  const server = await createLibp2p({
    addresses: {listen: ['/ip4/127.0.0.1/tcp/31337/ws']},
    transports: [
      webSockets({filter: filters.all})
    ],
    connectionEncryption: [noise()],
    streamMuxers: [yamux()],
    services: {
      identify: identify(),
      relay: circuitRelayServer(),
    },
  });

  // @ts-expect-error not a field
  server['name'] = "server";  // just for test output

  const browser1 = await createLibp2p({
    addresses: {listen: ['/webrtc']},
    transports: [
      webSockets({filter: filters.all}),
      webRTC(),
      circuitRelayTransport({
        discoverRelays: 1
      }),
    ],
    connectionEncryption: [noise()],
    streamMuxers: [yamux()],
    services: {
      identify: identify(),
    }
  });

  // @ts-expect-error not a field
  browser1['name'] = "browser1";  // just for test output

  await browser1.handle("/verity/1.0.0",
    (incomingStreamData: IncomingStreamData) => handleIncoming(incomingStreamData, browser1));

  const browser2 = await createLibp2p({
    addresses: {listen: ['/webrtc']},
    transports: [
      webSockets({filter: filters.all}),
      webRTC(),
      circuitRelayTransport({
        discoverRelays: 1
      }),
    ],
    connectionEncryption: [noise()],
    streamMuxers: [yamux()],
    services: {
      identify: identify(),
    }
  });

  // @ts-expect-error not a field
  browser2['name'] = "browser2";  // just for test output

  await browser2.start();
  await browser2.handle("/verity/1.0.0",
    (incomingStreamData: IncomingStreamData) => handleIncoming(incomingStreamData, browser2));

  // connect both "browsers" to the server
  await Promise.all([
    browser1.dial(multiaddr('/ip4/127.0.0.1/tcp/31337/ws')),
    browser2.dial(multiaddr('/ip4/127.0.0.1/tcp/31337/ws'))
  ]);

  assert(server.getConnections().length == 2);
  assert(browser1.getConnections().length == 1);
  assert(browser2.getConnections().length == 1);

  // both browsers should now have a dialable p2p-circuit/webRTC addr
  const browser1dialable: Multiaddr = await findWebRTCAddress(browser1);
  console.log("browser1's dialable address is: " + browser1dialable)

  const browser2dialable: Multiaddr = await findWebRTCAddress(browser2);
  console.log("browser2's dialable address is: " + browser2dialable)

  // connect browser1 to browser2
  const b1ToB2: Connection = await browser1.dial(browser2dialable);
  assert(b1ToB2.transient === false);
  assert(b1ToB2.status === "open");

  const b1b2rawstream: Stream = await b1ToB2.newStream("/verity/1.0.0");
  const b1tob2stream: LengthPrefixedStream = lpStream(b1b2rawstream);
  console.log("Sending message from browser1 to browser2");
  await b1tob2stream.write(Buffer.from("Message from browser1 while server still connected", 'utf8'));
  await new Promise(resolve => setTimeout(resolve, 100));  // give it some time

  console.log("Stopping server");
  await server.stop();

  console.log("Sending message from browser1 to browser2 after server stopped");
  await b1tob2stream.write(Buffer.from("Message from browser1 after server disconnected", 'utf8'));
  await new Promise(resolve => setTimeout(resolve, 1000));  // give it some time

  console.log("teardown: stopping browser nodes");
  await browser1.stop();
  await browser2.stop();
  console.log("end of main, NodeJS should terminate now");
}

async function handleIncoming(incomingStreamData: IncomingStreamData, node: any) {
  console.log("handling incoming stream at " + node.name)
  const stream: LengthPrefixedStream = lpStream(incomingStreamData.stream);
  let msg: Uint8ArrayList;
  try {
    while (msg = await stream.read()) {
      const msgBuf: Buffer = Buffer.from(
        msg.subarray()  // Note: Here, subarray() re-assembles a message which
                        // may have been received in many fragments.
      );
      console.log(`Message received at ${node.name}: ${msgBuf.toString('utf8')}`);
    }
  } catch(error) {
    console.log("error reading stream at " + node.name + ": " + error);
  }
  console.log("stopping to handle stream at " + node.name)
}

function assert(assertion: boolean) {
  if (!assertion) throw Error("assertion failed");
}

async function findWebRTCAddress (node: Libp2p): Promise<Multiaddr> {
  while (true) {
    // check for a dialable p2p-circuit/webRTC addr
    for (const multiaddr of node.getMultiaddrs()) {
      const protos: string[] = multiaddr.protoNames();
      if (protos.includes("p2p") && protos.includes("p2p-circuit") && protos.includes("webrtc")) {
        return multiaddr;
      }
    }

    // not found relay yet, wait a bit
    await new Promise<void>(resolve => {
      setTimeout(() => {
        resolve()
      }, 1000)
    })
  }
}

main();

Notably it awaits the promise from the second write to b1tob2stream which the original didn't. I could not get the original to run unmodified due to the aforesaid type errors.

When it runs I see:

browser1's dialable address is: /ip4/127.0.0.1/tcp/31337/ws/p2p/12D3KooWFeRc98AeVfVsMcL1Tw53xi3BVszepy2bb5gHoJnYrKcV/p2p-circuit/webrtc/p2p/12D3KooWRck7bKBRCtCAVRKeFCHH4jBJ4bzn9FVihPEn1wNZ1B4G
browser2's dialable address is: /ip4/127.0.0.1/tcp/31337/ws/p2p/12D3KooWFeRc98AeVfVsMcL1Tw53xi3BVszepy2bb5gHoJnYrKcV/p2p-circuit/webrtc/p2p/12D3KooWDixHLhHBCYMFWpZCGKkMUBf86zr1JBKZsgbjrWNAHkvi
handling incoming stream at browser2
Sending message from browser1 to browser2
Message received at browser2: Message from browser1 while server still connected
Stopping server
Sending message from browser1 to browser2 after server stopped
Message received at browser2: Message from browser1 after server disconnected
teardown: stopping browser nodes
error reading stream at browser2: Error: unexpected end of input
stopping to handle stream at browser2
end of main, NodeJS should terminate now

So the second message is received after the relay server is stopped, however the process still fails to exit.

Running with why-is-node-running reveals a thread safe callback reference in node-datachannel seems to be stopping the process from running - more info here: https://github.com/murat-dogan/node-datachannel/issues/215#issuecomment-1988027243

achingbrain commented 5 months ago

node-datachannel@0.5.4 has been released, this solves the process not exiting problem when I run the modified version of the script in https://github.com/libp2p/js-libp2p/issues/2425#issuecomment-1988106138

github-actions[bot] commented 5 months ago

Oops, seems like we needed more information for this issue, please comment with more details or this issue will be closed in 7 days.

victorhahncastell commented 5 months ago

Hi Alex, thank you very much for looking into this, and of course for the fixed release. I can confirm the simplified code works as expected, while my more convoluted example does not. Will try to pinpoint the cause of that, as obviously our application code is even more complex than the example.

victorhahncastell commented 5 months ago

@achingbrain It appears the second message ("Message from browser1 after server disconnected") fails to arrive when I also activate the webRTC transport on the server node. Can you verify this on this end?

  const server = await createLibp2p({
    addresses: {listen: ['/ip4/127.0.0.1/tcp/31337/ws']},
    transports: [
      webSockets({filter: filters.all}),
      webRTC(),
    ],
    connectionEncryption: [noise()],
    streamMuxers: [yamux()],
    services: {
      identify: identify(),
      relay: circuitRelayServer(),
    },
  });
github-actions[bot] commented 5 months ago

Oops, seems like we needed more information for this issue, please comment with more details or this issue will be closed in 7 days.

github-actions[bot] commented 5 months ago

This issue was closed because it is missing author input.

victorhahncastell commented 5 months ago

Please reopen; issue is neither completed nor was it stale.

achingbrain commented 4 months ago

It appears the second message ("Message from browser1 after server disconnected") fails to arrive when I also activate the webRTC transport on the server node.

I think I found the problem, we were using the cleanup function from node-datachannel when closing a WebRTC transport (e.g. when the relay was being shut down).

It's an escape hatch that destroys all C++ objects associated with RTCPeerConnections or RTCDataChannels so not really great for a server.

That said, the problem appears in the script above because the relay and the two clients all run in the same process - I don't know how realistic this is as a deployment target.

Either way we shouldn't be calling this function, node-datachannel should clean up after itself.

This will be fixed in https://github.com/libp2p/js-libp2p/pull/2498

victorhahncastell commented 3 months ago

@achingbrain I just noticed I never gave any feedback -- apologies for that. I can confirm the fix works as expected, so again, thank you very much for taking the time to look into this :+1: