poelstra / ts-stream

Type-safe object streams with seamless support for backpressure, ending, and error handling
MIT License
65 stars 13 forks source link

isEnded() should already be true when end() is sent, not when end() is resolved #12

Closed rogierschouten closed 9 years ago

rogierschouten commented 9 years ago

The use case for issue #3 was to determine if we could still call end(). The way it was implemented leaves a race condition: if isEnded() returns false, it may be that end() was called but it has not returned yet. Please change isEnded() to return true when end() was called.

According to comments:

/**
     * Determine whether stream has completely ended (i.e. end handler has been
     * called and its return Thenable, if any, is resolved).
     * @return true when stream has ended, false otherwise
     */
    isEnded(): boolean;
poelstra commented 9 years ago

I had a use case for ended() that needed this behaviour, and was thinking about having isEndingOrEnded()-kind-of-method in addition.

You just mentioned isEndSent() might be a better name for that, so I'll probably go for that.

On the other hand, when is the control flow in your program such that the writer doesn't know when it did or did not send end() yet? Do you have a simple example?

rogierschouten commented 9 years ago

I'm making a database abstraction layer.

The writer needs to send end() in different code paths: 1) when its data is completely written 2) when the database driver sends an error 3) when an error is returned from the write() e.g. an abort error 4) when the pool is aborted and therefore all connections are cancelled and will or will not emit anything anymore depending on the database driver

This means that it is handy to be able to check that none of the other code paths has ended the stream yet. It is easiest and least error-prone if streams maintained this.

poelstra commented 9 years ago

Note that any isEndSent() would only really work on the 'source' stream, in your case. That is, the end() of subsequent streams is only called when the last value has been acknowledged through the whole chain. (It is possible to call end() half-way of the chain by some of your layers, but then any writes further upstream will be rejected with WriteAfterEnd, of course.)

If it's as diverse as in your example, it may be a better option to implement it as e.g.:

// somewhere where the source stream is created
let endStream: (t?: Thenable<void>) => void;
let streamEnded = new Promise((resolve) => {
    endStream = resolve;
});
streamEnded.done(() => myStream.end(), (err) => myStream.end(err));

// then in all of the other codepaths:
endStream(); // or
endStream(Promise.reject(new Error("boom"));

This makes use of the fact that a promise's fate can be sealed just once, but it won't solve the problem of having some drivers still call write() on the stream if another part has ended it.

I'd be happy to implement isEndSent(), but it seems that the use-case smells a bit, so maybe this is not the right way to solve it :) Maybe discuss this IRL tomorrow.

rogierschouten commented 9 years ago

Well I'd expect to use it only on a block where I'd also send end(), so that fits. The behavior as you describe it is also exactly as I would expect it. Looks like a rather large bit of code to be repeating across every database adapter.

poelstra commented 9 years ago

Implemented in 0.6.0: there are now isEnding(), isEnded() and isEndingOrEnded() (the latter being what you want, probably).