Closed bellbind closed 1 year ago
I made a library closable-stream.js
for wrapped libp2p(mplex) stream to send/accept read closing messages as:
// closable-stream.js
// wrapping libp2p stream (mplex/stream)
// - stream.source: AsyncIterable<Uint8Array>
// - stream.sink: (Iterable<Uint8Array> | AsyncIterable<Uint8Array>) => Promise<undefined>
// - stream.close, stream.closeRead, stream.closeWrite, stream.abort, stream.reset
const newQueue = () => {
const [gets, polls] = [[], []];
const next = () => new Promise(
get => polls.length > 0 ? polls.shift()(get) : gets.push(get));
const poll = () => new Promise(
poll => gets.length > 0 ? poll(gets.shift()) : polls.push(poll));
const push = value => poll().then(get => get({value, done: false}));
const close = () => poll().then(get => get({done: true}));
return {[Symbol.asyncIterator]() {return this;}, next, push, close};
}
const payload = (u8a, type = 0) => {
const ret = new Uint8Array(u8a.length + 1);
ret[0] = type;
ret.set(u8a, 1);
return ret;
}
export const newClosableStream = stream => {
const eventTarget = new EventTarget();
let sinkFinished = false, sourceFinished = false;
// send to remote
const writeQueue = newQueue();
const writing = async () => {
return stream.sink(async function* () {
let closed = false, finished = false;
while (!closed || !finished) {
const {done, value: {type, value}} = await writeQueue.next();
if (type === "data") {
yield payload(value, 0);
} else if (type === "close") {
yield Uint8Array.from([1]);
closed = true;
} else if (type === "finished") {
yield Uint8Array.from([2]);
finished = true;
}
}
stream.closeWrite();
//console.info("[stream.closeWrite()]");
}());
};
const writingPromise = writing().catch(error => {
eventTarget.dispatchEvent(new CustomEvent("error", {detail: error}));
});
// receive from remote
const readQueue = newQueue();
let remoteClosed = false;
const reading = async () => {
for await (const bl of stream.source) {
if (sourceFinished) break;
const u8a = bl.slice();
//console.log("type", u8a[0], u8a);
if (u8a[0] === 0) readQueue.push({type: "data", value: u8a.slice(1)});
if (u8a[0] === 1) remoteClosed = true;
if (u8a[0] === 2) readQueue.push({type: "finished"});
}
readQueue.push({type: "finished"});
stream.closeRead();
//console.info("[stream.closeRead()]");
};
const readingPromise = reading().catch(error => {
// (ipfs-0.65.0) may spawn `Error: Socket read timeout`
eventTarget.dispatchEvent(new CustomEvent("error", {detail: error}));
});
// wrapped stream.source
const source = (async function* () {
for (;;) {
const {done, value: {type, value}} = await readQueue.next();
if (type === "data") yield value;
if (type === "finished") break;
}
writeQueue.push({type: "close"});
sourceFinished = true;
})();
// wrapped stream.sink
const sink = async iter => {
for await (const value of iter) {
if (remoteClosed) break;
writeQueue.push({type: "data", value});
}
writeQueue.push({type: "finished"});
sinkFinished = true;
};
// send close to read;
const closeRead = async () => {
writeQueue.push({type: "close"});
sourceFinished = true;
};
const closeWrite = async () => {
writeQueue.push({type: "finished"});
sinkFinished = true;
};
// wrapped stream
return Object.assign(eventTarget, {
source, sink, closeRead, closeWrite,
close() {return Promise.all([closeRead(), closeWrite()]);},
reset() {return stream.reset();},
abort(...args) {return stream.abort(...args);},
});
};
then,using it in close-from-dialer.mjs
as:
// close-from-dialer.mjs
import * as fs from "node:fs";
import * as IPFS from "ipfs-core";
import {newClosableStream} from "./closable-stream.js";
// setup two IPFS nodes
const repo1 = "./test-repo1", repo2 = "./test-repo2";
fs.rmSync(repo1, {recursive: true, force: true});
fs.rmSync(repo2, {recursive: true, force: true});
const config = {
Addresses: {
Swarm: [
"/ip4/0.0.0.0/tcp/0",
],
},
};
const node1 = await IPFS.create({
repo: repo1,
config,
});
const id1 = await node1.id();
console.info("[node1 id]", id1.id.toJSON());
console.info("[node1 address]", id1.addresses[0].toJSON());
const node2 = await IPFS.create({
repo: repo2,
config,
});
const id2 = await node2.id();
console.info("[node2 id]", id2.id.toJSON());
console.info("[node2 address]", id2.addresses[0].toJSON());
// connect and ping as p2p-circuit
await node2.swarm.connect(id1.addresses[0].toJSON());
await node2.libp2p.ping(`/p2p/${id2.id.toJSON()}/p2p-circuit/p2p/${id1.id.toJSON()}`);
// handler: serve inifinitely
const protocol = "/example-protocol/1.0";
{
const handler = ({connection, stream}) => {
const cStream = newClosableStream(stream);
let count = 0;
cStream.sink((async function* () {
// while (true) {
for (let i = 0; i < 20; i++) {
// infinite stream
yield new TextEncoder().encode(`count: ${++count}`);
console.log(`[Served] ${count}`);
await new Promise(f => setTimeout(f, 50));
}
})());
};
await node1.libp2p.handle(protocol, handler);
}
// dialer: accept several messages, then stop
{
const stream = newClosableStream(await node2.libp2p.dialProtocol(`/p2p/${id1.id.toJSON()}`, protocol));
stream.addEventListener("error", async ev => {
console.log(ev.detail);
});
let i = 0;
for await (const bl of stream.source) {
console.log(new TextDecoder().decode(bl.slice().slice()));
if (++i === 10) break;
}
await stream.close();
}
const sec = 45;
console.log(`[wait ${sec}sec for spawn Socket read timeout]`);
await new Promise(f => setTimeout(f, sec * 1000)); // > Socket read timeout
// reconnect after socket read timeout
console.log("[connect and ping again]");
await node2.swarm.connect(id1.addresses[0].toJSON());
await node2.libp2p.ping(`/p2p/${id2.id.toJSON()}/p2p-circuit/p2p/${id1.id.toJSON()}`);
{// dial again
const stream = newClosableStream(await node2.libp2p.dialProtocol(`/p2p/${id1.id.toJSON()}`, protocol));
stream.addEventListener("error", async ev => {
console.log(ev.detail);
});
let i = 0;
for await (const bl of stream.source) {
console.log(new TextDecoder().decode(bl.slice().slice()));
if (++i === 10) break;
}
await stream.close();
}
console.log("[stop nodes]");
await node1.stop();
await node2.stop();
console.log("[stopped]");
It's output as:
$ node close-from-dialer.mjs
generating Ed25519 keypair...
to get started, enter:
jsipfs cat /ipfs/QmRaaUwTNfwgFZpeUy8qrZwrp2dY4kCKmmB5xEqvH3vtD1/readme
Swarm listening on /ip4/127.0.0.1/tcp/56864/p2p/12D3KooWHTdhfWpErfkxi2aHKcUsvGuuWPqrhLFf8b1qSgSVq6Ur
Swarm listening on /ip4/192.168.10.3/tcp/56864/p2p/12D3KooWHTdhfWpErfkxi2aHKcUsvGuuWPqrhLFf8b1qSgSVq6Ur
[node1 id] 12D3KooWHTdhfWpErfkxi2aHKcUsvGuuWPqrhLFf8b1qSgSVq6Ur
[node1 address] /ip4/127.0.0.1/tcp/56864/p2p/12D3KooWHTdhfWpErfkxi2aHKcUsvGuuWPqrhLFf8b1qSgSVq6Ur
generating Ed25519 keypair...
to get started, enter:
jsipfs cat /ipfs/QmRaaUwTNfwgFZpeUy8qrZwrp2dY4kCKmmB5xEqvH3vtD1/readme
Swarm listening on /ip4/127.0.0.1/tcp/56866/p2p/12D3KooWEWjditYEazptimMu4hcb1ndEGbEr54JMAhsZdxCkuybL
Swarm listening on /ip4/192.168.10.3/tcp/56866/p2p/12D3KooWEWjditYEazptimMu4hcb1ndEGbEr54JMAhsZdxCkuybL
[node2 id] 12D3KooWEWjditYEazptimMu4hcb1ndEGbEr54JMAhsZdxCkuybL
[node2 address] /ip4/127.0.0.1/tcp/56866/p2p/12D3KooWEWjditYEazptimMu4hcb1ndEGbEr54JMAhsZdxCkuybL
[Served] 1
count: 1
[Served] 2
count: 2
[Served] 3
count: 3
[Served] 4
count: 4
[Served] 5
count: 5
[Served] 6
count: 6
[Served] 7
count: 7
[Served] 8
count: 8
[Served] 9
count: 9
[Served] 10
count: 10
[wait 45sec for spawn Socket read timeout]
[connect and ping again]
[Served] 1
count: 1
[Served] 2
count: 2
[Served] 3
count: 3
[Served] 4
count: 4
[Served] 5
count: 5
[Served] 6
count: 6
[Served] 7
count: 7
[Served] 8
count: 8
[Served] 9
count: 9
[Served] 10
count: 10
[stop nodes]
[stopped]
js-ipfs is being deprecated in favor of Helia. You can https://github.com/ipfs/js-ipfs/issues/4336 and read the migration guide.
Please feel to reopen with any comments by 2023-06-02. We will do a final pass on reopened issues afterward (see https://github.com/ipfs/js-ipfs/issues/4336).
Assigning to @achingbrain to answer whether this issue is already resolved in Helia, or if this issue needs to be migrated to that repo!
The behaviour here is really down to the stream muxer.
Yamux and Mplex both only half-close streams, that is, when a node closes a stream, it's only closing its end for writing and the remotes end for reading.
If you abort the stream instead, the stream will be closed for reading/writing at both ends and a STREAM_RESET error will be thrown on the remote as the result of the stream.sink
invocation.
Where this is not the case an issue should be filed against the muxer in question.
package.json
as:Severity:
Description:
I wrote a library using P2P request/response messaging on IPFS nodes (
node.libp2p.handle()/dialPrococol()
). I tried to test infinite responses, such as Web browser'stext/event-stream
.A dial side calls
stream.close()
after several message read, but the handler side never stop tostream.sink
async generator;sink
continues to consumeyield message;
instream.sink((async function* () {...})())
.Steps to reproduce the error:
Code:
Result: