poelstra / ts-stream

Type-safe object streams with seamless support for backpressure, ending, and error handling
MIT License
65 stars 13 forks source link

Error handling different for Stream.from() and new Stream<>() #39

Closed HaKr closed 5 years ago

HaKr commented 5 years ago

I was struggling a little with the error handling of a pipeline. Based on the examples I finally got it working. That is, on the test code, but not the real life version. The only difference is that the test code used a Stream.from(a: string[]), whereas the real code opens files.

The code below consolidates the main parts of my code, and demonstrates the different error handling when only a different source stream is used. The file test.dat contains five lines which are the same as the array elements in Stream.from().

When the file based stream is used, Node signals a (node:12648) UnhandledPromiseRejectionWarning and none of the result().catch() or aborted().catch() callbacks are activated.

import * as  fs from 'fs'
import * as nodestream from 'stream'
import Stream, { ReadableStream, FileSink } from 'ts-stream'

class NodeBufferToLines extends nodestream.Writable
{
    protected remainder: string = ''

    constructor( protected readonly target: Stream<string> )
    {
        super()
    }

    _write( chunk: Buffer, _encoding: string, callback: ( error?: Error | null ) => void ): void
    {
        const lines = ( this.remainder + chunk.toString() ).split( '\n' )
        this.remainder = lines.pop() || ''
        lines.forEach( line =>
        {
            this.target.write( line.length < 1 ? ' ' : line )
        } )
        callback()
    }

    _final( callback: ( error?: Error | null ) => void ): void
    {
        this.target.write( this.remainder )
        this.target.end()
        callback()
    }
}

const transformer = ( src: ReadableStream<string>, writable: Stream<string> ) => 
{
    writable.aborted().catch( ( err ) => { console.log( 'transformer writable aborted catch' ); src.abort( err ) } );
    src.aborted().catch( ( err ) => { console.log( 'magic transformer aborted catch' ); writable.abort( err ) } );
    src.forEach(
        ( value: string ) =>
        {
            if ( value.includes( '@' ) ) throw new Error( 'transformer error: ' + value )
            writable.write( value )
        },
        ( error?: Error ) => { console.log( 'transformer foreach ender ' + ( error ? 'failed' : 'ok' ) ); writable.end() }
    )

    src.result().catch( err => { console.log( 'transformer src result catch', err ) } )
}

const srcFile = './test.dat'
const sourcers = [Stream.from( ['one', 'two', 'three', '@', 'five'] ), createFileReader( srcFile )]
sourcers.forEach( ( sourcer, index ) =>
{
    const niceIndex = index + 1
    const outFile = `./test${niceIndex}.out`
    let fileResult = 'created'

    const magicStream = sourcer.transform( transformer )

    const fileSink = magicStream.pipe( new FileSink( outFile ) )

    fileSink.aborted().catch( err => { console.log( `fileSink ${niceIndex} abort catch `, err ); fileResult = 'removed'; fs.unlink( outFile, _ => { } ) } )
    fileSink.result().then( _ => console.log( `fileSink ready ${outFile} ${fileResult}.` ) )
} )

export function createFileReader( fileName: string )
{
    const src = fs.createReadStream( fileName )
    const result = new Stream<string>()
    src.pipe( new NodeBufferToLines( result ) )
    result.result().catch( err => { console.log( 'createFileReader result catch', err ); } ).then( _ => src.close() )
    result.aborted().catch( err => { console.log( 'createFileReader aborted catch', err ) } )

    return result
}
rogierschouten commented 5 years ago

Could it be that you forget to await/return the result of writable.write?

rogierschouten commented 5 years ago

Same for writable.end

HaKr commented 5 years ago

I did indeed came up with

    this.target.write( line.length < 1 ? ' ' : line )
        .catch( _ => { console.log( 'caught error', _ ), this.target.end( _ ) } )
    } )

Still trying to figure out why in this case the throw bubbles up into this Node stream, while in the Stream.from() variant it triggers the src.aborted().catch()

How would I return the result? (The code above awaits it?)

poelstra commented 5 years ago

@HaKr this behavior is by design: The docs from Stream.from() refer to Stream.writeEach() for error handling, which says:

If writing of a value fails (either by the `writer` throwing an error,
returning a rejection, or the write call failing), the stream is aborted
and ended with that error.

In your case (as Rogier mentioned), you didn't handle any error from write nor end, which causes the unhandled rejection to show up.

It is up to you how to handle the error in that case. Maybe you just want to 'ignore' it and retry with different value (without aborting the stream), or maybe you want to end the stream with or without an error, or you want to abort it.

If you want to have similar (and easy) behavior as Stream.from(), your best bet is probably to indeed call Stream.abort(yourError).

Be sure to always properly end() the stream too, just calling abort() is not enough. This ensures any other stream elements can properly dispose of any resources (such as file handles).

Maybe take a look at https://github.com/poelstra/ts-stream/blob/v2.0.0/src/test/test-idiomatic.ts#L141-L178 for some more inspiration on creating a source yourself, and handling the sometimes tricky cases of proper cleanup.