holepunchto / hypercore

Hypercore is a secure, distributed append-only log.
https://docs.holepunch.to
MIT License
2.53k stars 185 forks source link

protocol stream `.end()` not working as expected #366

Open gmaclennan opened 1 year ago

gmaclennan commented 1 year ago

The end event on the protocol stream returned by core.replicate() does not seem to propogate as expected.

Use case:

I want to gracefully end a replication stream without destroying it.

Expected behaviour:

const s = core.replicate(true)
s.on('end', () => console.log('end'))
s.end()

I would expect the above code to output end. It doesn't. There is a scenario where it does work: if the core that is replicating is a read-only peer, and you call s.end() after first calling core.update(). There does not seem to be a way to end the replication stream for the writer core.

Minimal reproduction:

import Hypercore from 'hypercore'
import RAM from 'random-access-memory'
import test from 'tape'

test('can end replication stream from writer', async t => {
  t.plan(2)
  ;(async () => {
    const core1 = new Hypercore(RAM)
    await core1.ready()
    const core2 = new Hypercore(RAM, core1.key)

    const s1 = core1.replicate(true)
    const s2 = core2.replicate(false)

    s1.on('end', () => t.pass('s1 end'))
    s2.on('end', () => t.pass('s2 end'))

    s1.pipe(s2).pipe(s1)

    await core1.update()
    s1.end()
  })()
})

test('can end replication stream from reader', t => {
  t.plan(2)
  ;(async () => {
    const core1 = new Hypercore(RAM)
    await core1.ready()
    const core2 = new Hypercore(RAM, core1.key)

    const s1 = core1.replicate(true)
    const s2 = core2.replicate(false)

    s1.on('end', () => t.pass('s1 end'))
    s2.on('end', () => t.pass('s2 end'))

    s1.pipe(s2).pipe(s1)

    await core2.update()
    s2.end()
  })()
})
LuKks commented 1 year ago

When you do stream.end() this sends the signal to the other side, meaning the other side gets the 'end' event but not you, unless they also send stream.end() back, for example:

const s = core.replicate(true)
s.on('end', () => s.end())
// ...
s.end()

I think this is automatically handled so you should only need to do s.end() and wait for 'close' event (unsure about timeouts here).

Except if you listen to the event like s.on('end', cb) then it's your job to do as above and manually call s.end(). In your Expected behaviour example you were registering the event and just logging, so you're not sending back s.end().

gmaclennan commented 1 year ago

Thanks @LuKks that makes sense, however this is not the behaviour I am seeing: s.end() does not cause any end event on the other side. You can see from running the tests in my comment that the only event that fires is s2.on('end') when you call s2.end(), in the second test. No other end events fire.

gmaclennan commented 1 year ago

Ok did some digging and got the following results. It seems like s.end() results in inconsistent behaviour if called in the same tick, or the next tick, but is consistent (but not quite in the way that you suggest) if I call it after setTimeout(0):

import Hypercore from 'hypercore'
import RAM from 'random-access-memory'

const core1 = new Hypercore(RAM)
await core1.ready()
const core2 = new Hypercore(RAM, core1.key)

const s1 = core1.replicate(true)
const s2 = core2.replicate(false)

s1.on('end', () => console.log('s1 end'))
s1.on('close', () => console.log('s1 close'))
s2.on('end', () => console.log('s2 end'))
s2.on('close', () => console.log('s2 close'))

s1.pipe(s2).pipe(s1)

s2.end()

This code logs nothing, either when calling s1.end() or s2.end().

If I await process.nextTick I get different results depending on which core I await and which stream I end:

await new Promise(res => process.nextTick(res))
s1.end()
// s1 end
// s1 close
// s2 close
await new Promise(res => process.nextTick(res))
s2.end()
// s2 end
// s1 end
// s1 close

If I await setTimeout(0) then I get consistent, but not quite what I would expect based on your comment.

await new Promise(res => setTimeout(res, 0))
s1.end()
// s1 end
// s1 close
// s2 close
await new Promise(res => setTimeout(res, 0))
s2.end()
// s2 end
// s2 close
// s1 close
LuKks commented 1 year ago

Try commenting out the lines where you listen to 'end' events, and try again without waiting the next tick

gmaclennan commented 1 year ago

Done: nothing logged (e.g. no 'close' events logging)

gmaclennan commented 1 year ago

Possibly related? https://github.com/mafintosh/streamx/issues/72

LuKks commented 1 year ago

I was about to say that it sounds like a bug on secret-stream because it registers the 'end' event a bit late, that's why if you wait a tick it works. You should create PR on secret-stream where you add a single simple test case reproducing the bug

LuKks commented 1 year ago

Meanwhile, instead of commenting it out do this:

s1.on('end', () => s1.end())
s2.on('end', () => s2.end())

And try again without waiting any tick

mafintosh commented 1 year ago

Unsure if you can end these streams atm. We tend to use the replicate(stream) api everywhere with a swarm stream, and the stream returned works a bit differently, but we'll look into it.