libp2p / js-libp2p-mplex

JavaScript implementation of https://github.com/libp2p/mplex
https://libp2p.io
Other
51 stars 30 forks source link

Error: ERR_STREAM_RESET #244

Closed bakhshandeh closed 1 year ago

bakhshandeh commented 1 year ago

This happens when number of parallel pipes is more than 30.

Error: stream reset
    at Object.reset (node_modules/@libp2p/mplex/dist/src/stream.js:105:33)
    at MplexStreamMuxer._handleIncoming (node_modules/@libp2p/mplex/dist/src/mplex.js:253:24)
    at MplexStreamMuxer.sink (node_modules/@libp2p/mplex/dist/src/mplex.js:156:36)
    at processTicksAndRejections (node:internal/process/task_queues:96:5)

code: 'ERR_STREAM_RESET'

Sample code to reproduce the issue:

import { tcp } from "@libp2p/tcp";
import { mplex } from "@libp2p/mplex";
import { noise } from "@chainsafe/libp2p-noise";
import { createLibp2p } from "libp2p";
import { pipe } from "it-pipe";
import { toString as uint8ArrayToString } from "uint8arrays/to-string";
import { fromString as uint8ArrayFromString } from "uint8arrays/from-string";

const createNode = async () => {
  const node = await createLibp2p({
    addresses: {
      listen: ["/ip4/127.0.0.1/tcp/0"],
    },
    transports: [tcp()],
    streamMuxers: [mplex()],
    connectionEncryption: [noise()],
  });

  await node.start();
  return node;
};

(async () => {
  const listenerNode = await createNode();

  // Log a message when we receive a connection
  listenerNode.connectionManager.addEventListener("peer:connect", (evt) => {
    const connection = evt.detail;
    console.log("Received dial from:", connection.remotePeer.toString());
  });

  // echo protocol
  await listenerNode.handle("/echo/1.0.0", async ({ stream }) => {
    await pipe(stream.source, stream.sink);
    stream.close();
  });

  // ================= DIALER ============

  const dialerNode = await createNode();
  async function callProcess() {
    for (let i = 0; i < 50; i++) {
      console.log(`i: ${i}`);
      // Dial the listener node
      const stream = await dialerNode.dialProtocol(
        listenerNode.getMultiaddrs()[0],
        "/echo/1.0.0"
      );
      await pipe(
        // Source data
        [uint8ArrayFromString(`hey [${i}]`)],
        // Write to the stream, and pass its output to the next function
        stream,
        // Sink function
        async function(source) {
          // For each chunk of data
          for await (const data of source) {
            // Output the data
            console.log("Received echo:", uint8ArrayToString(data.subarray()));
          }
        }
      );
      // stream.close();
    }
  }

  // parallel streams
  const PARALLEL_CALLS = 40; // 30 works fine
  await Promise.all(new Array(PARALLEL_CALLS).fill(0).map((i) => callProcess()));
  console.log("all done");
})();

"dependencies": {
    "@chainsafe/libp2p-gossipsub": "^5.3.0",
    "@chainsafe/libp2p-noise": "^10.0.1",
    "@libp2p/bootstrap": "^5.0.0",
    "@libp2p/kad-dht": "^6.0.0",
    "@libp2p/mdns": "^5.1.1",
    "@libp2p/mplex": "^7.1.1",
    "@libp2p/pubsub-peer-discovery": "^7.0.0",
    "@libp2p/tcp": "^6.0.0",
    "@libp2p/websockets": "^5.0.2",
    "@nodeutils/defaults-deep": "^1.1.0",
    "it-concat": "^2.0.0",
    "it-drain": "^2.0.0",
    "it-last": "^2.0.0",
    "it-length-prefixed": "^3.1.0",
    "it-pair": "^1.0.0",
    "it-pipe": "^1.1.0",
    "it-pushable": "^1.4.0",
    "libp2p": "^0.41.0",
    "multiformats": "^9.6.4"
  }```
achingbrain commented 1 year ago

Please see LIMITS.md - protocol handlers default to 32 inbound streams.

You can increase this limit when you register your handler:

// echo protocol
await listenerNode.handle("/echo/1.0.0", async ({ stream }) => {
  await pipe(stream.source, stream.sink);
  stream.close();
}, {
  maxInboundStreams: 100,
  maxOutboundStreams: 100
);