alanshaw / stream-to-it

🚰 Convert Node.js streams to streaming iterables
Other
18 stars 6 forks source link

transform does not pass through thrown errors #14

Closed Bluebie closed 3 years ago

Bluebie commented 3 years ago
import { pipeline } from 'streaming-iterables'
import { transform } from 'stream-to-it'
import { PassThrough } from 'stream'

const iter = pipeline(
  async function * generateError () {
    yield 'welcome and...'
    throw new Error('foo')
    yield 'hello world'
  },
  transform(new PassThrough())
)

console.log('starting...')
try {
  for await (const val of iter) {
    console.log(val)
  }
} catch (err) {
  console.log('oh no!', err)
}
console.log('finished')

My expectation was transform wrapping createBrotliCompress would pass through thrown errors from the rejected promise output by the generator's next() function. Instead it seemed like the error was silently ignored.

This makes it quite tricky to manage situations where you have validation or failure states inside a source async iterable, which would normally naturally flow through various chained generators and be catchable with a try {} catch {} around the for await loop in the example above.

Maybe an adequate fix for this would be to change https://github.com/alanshaw/stream-to-it/blob/master/sink.js#L78 to writable.destroy(err) ? I haven't tested if that would solve my troubles, but it seems likely to be enough. Calling destroy without an Error argument just causes the stream to end and clear out any underlying resources, it doesn't trigger any error to be emitted from the stream

Test script above currently produces:

starting...
finished

Ideally it should produce something like:

starting...
welcome and...
oh no! Error: foo
    at generateError (file:///Users/bluebie/sti.mjs:8:11)
    at generateError.next (<anonymous>)
    at file:///Users/bluebie/sti.mjs:16:20
finished
Bluebie commented 3 years ago

For now I've been able to work around this issue well enough by implementing my own transform factory function:

const toDuplex = require('stream-to-it/duplex')
const toTransform = (transform) => async function * (source) {
  const duplex = toDuplex(transform)
  // In a transform the sink and source are connected, an error in the sink
  // will be thrown in the source also. Catch the sink error to avoid unhandled
  // rejections and yield from the source.

  let error
  const sinkPromise = duplex.sink(source).catch(err => { error = err })

  for await (const val of duplex.source) {
    if (error) {
      if (transform.destroy) transform.destroy()
      if (duplex.source.return) duplex.source.return()
      throw error
    } else {
      yield val
    }
  }
  if (error) throw error
  await sinkPromise
  if (error) throw error
}

But it would be great to have a robust tested solution in the main package.

alanshaw commented 3 years ago

Hmm, seems unexpected. Any chance you can send a PR with a test for this or even a fix?

Bluebie commented 3 years ago

Yeah, I reckon I can whip something up.

Bluebie commented 3 years ago

15 minimal fix

alanshaw commented 3 years ago

Thank you - released in v0.2.4