nodejs / node

Node.js JavaScript runtime ✨🐢🚀✨
https://nodejs.org
Other
106.91k stars 29.16k forks source link

Streams: finished Change in behavoir between 22.2.0 and 22.9.0. Throws Exception (Unhandled Rejection) #55152

Open markddrake opened 3 days ago

markddrake commented 3 days ago

Version

22.9.0 (Docker Latest)

Platform

Linux (Docker Latest)

root@0cd652c8fcb1:/# uname -a
Linux 0cd652c8fcb1 5.15.0-84-generic #93-Ubuntu SMP Tue Sep 5 17:16:10 UTC 2023 x86_64 GNU/Linux

Subsystem

STREAMS/PROMISES FINISHED

What steps will reproduce the bug?

Run the following code on 22.2 and 22.9 and note changed behavior - Unhandled Rejection


const { PassThrough} = require('stream');
const { pipeline, finished,  } = require('stream/promises');
const fs = require('fs');

class MyTransform extends PassThrough {

  constructor(is) {
    super()
    this.is = is
    this.counter = 0
  }

  async _transform(data,enc,callback) {
    this.counter++
    this.push(data)
    if (this.counter > 100) {
      this.is.close()
    }
    callback()
  }

}

async function runPipeline() {

  const is = fs.createReadStream('input.txt')
  is.on('error',(err) => {
    console.log(is.constructor.name,err)
  })
  const t = new MyTransform(is)
  t.on('error',(err) => {
    console.log(t.constructor.name,err)
  })
  const os =  fs.createWriteStream('output.txt')
  os.on('error',(err) => {
    console.log(os.constructor.name,err)
  })

  const streams = [is,t,os]
  const activeStreams = streams.map((s) => {
    return finished(s)
  })
  console.log(activeStreams)
  try {
    await pipeline(...streams);
    console.log(t.counter)
  } catch (err) {
    console.log(1)
    console.log(activeStreams)
    await Promise.allSettled(activeStreams)
    console.log(2)
    console.log(activeStreams)
    console.error('Pipeline error:', err);
  }
}

process.on('unhandledRejection', (e,p) => {
  console.log("Unhandled",e,p)
})

runPipeline().then(() => {console.log('success')}).catch((e) => {console.log(e)})

How often does it reproduce? Is there a required condition?

Always

What is the expected behavior? Why is that the expected behavior?

The 22.2.0 Behavoir appears to be correct to me

C:\Development\YADAMU\src\scratch\streams>docker cp input.txt NODE-22-2:/tmp

C:\Development\YADAMU\src\scratch\streams>docker cp test1.js NODE-22-2:/tmp

C:\Development\YADAMU\src\scratch\streams>docker exec -it NODE-22-2 bash
root@c873fb41f508:/# cd tmp
root@c873fb41f508:/tmp# node -v
v22.2.0
root@c873fb41f508:/tmp# node test1.js
[ Promise { <pending> }, Promise { <pending> }, Promise { <pending> } ]
MyTransform Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
    at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
    at ReadStream.emit (node:events:532:35)
    at emitCloseNT (node:internal/streams/destroy:148:10)
    at process.processTicksAndRejections (node:internal/process/task_queues:81:21) {
  code: 'ERR_STREAM_PREMATURE_CLOSE'
}
1
[
  Promise {
    <rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
        at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
        at ReadStream.emit (node:events:532:35)
        at emitCloseNT (node:internal/streams/destroy:148:10)
        at process.processTicksAndRejections (node:internal/process/task_queues:81:21) {
      code: 'ERR_STREAM_PREMATURE_CLOSE'
    }
  },
  Promise {
    <rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
        at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
        at ReadStream.emit (node:events:532:35)
        at emitCloseNT (node:internal/streams/destroy:148:10)
        at process.processTicksAndRejections (node:internal/process/task_queues:81:21) {
      code: 'ERR_STREAM_PREMATURE_CLOSE'
    }
  },
  Promise { <pending> }
]
WriteStream Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
    at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
    at ReadStream.emit (node:events:532:35)
    at emitCloseNT (node:internal/streams/destroy:148:10)
    at process.processTicksAndRejections (node:internal/process/task_queues:81:21) {
  code: 'ERR_STREAM_PREMATURE_CLOSE'
}
2
[
  Promise {
    <rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
        at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
        at ReadStream.emit (node:events:532:35)
        at emitCloseNT (node:internal/streams/destroy:148:10)
        at process.processTicksAndRejections (node:internal/process/task_queues:81:21) {
      code: 'ERR_STREAM_PREMATURE_CLOSE'
    }
  },
  Promise {
    <rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
        at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
        at ReadStream.emit (node:events:532:35)
        at emitCloseNT (node:internal/streams/destroy:148:10)
        at process.processTicksAndRejections (node:internal/process/task_queues:81:21) {
      code: 'ERR_STREAM_PREMATURE_CLOSE'
    }
  },
  Promise {
    <rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
        at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
        at ReadStream.emit (node:events:532:35)
        at emitCloseNT (node:internal/streams/destroy:148:10)
        at process.processTicksAndRejections (node:internal/process/task_queues:81:21) {
      code: 'ERR_STREAM_PREMATURE_CLOSE'
    }
  }
]
Pipeline error: Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
    at MyTransform.<anonymous> (node:internal/streams/pipeline:417:14)
    at MyTransform.emit (node:events:532:35)
    at emitCloseNT (node:internal/streams/destroy:148:10)
    at emitErrorCloseNT (node:internal/streams/destroy:130:3)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21) {
  code: 'ERR_STREAM_PREMATURE_CLOSE'
}
success
root@c873fb41f508:/tmp#
root@c873fb41f508:/tmp#
root@c873fb41f508:/tmp#
exit

What do you see instead?

root@0cd652c8fcb1:/tmp# node test1.js
[ Promise { <pending> }, Promise { <pending> }, Promise { <pending> } ]
MyTransform Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
    at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
    at ReadStream.emit (node:events:531:35)
    at emitCloseNT (node:internal/streams/destroy:148:10)
    at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
  code: 'ERR_STREAM_PREMATURE_CLOSE'
}
Unhandled Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
    at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
    at ReadStream.emit (node:events:531:35)
    at emitCloseNT (node:internal/streams/destroy:148:10)
    at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
  code: 'ERR_STREAM_PREMATURE_CLOSE'
} Promise {
  <rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
      at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
      at ReadStream.emit (node:events:531:35)
      at emitCloseNT (node:internal/streams/destroy:148:10)
      at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
    code: 'ERR_STREAM_PREMATURE_CLOSE'
  }
}
Unhandled Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
    at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
    at ReadStream.emit (node:events:531:35)
    at emitCloseNT (node:internal/streams/destroy:148:10)
    at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
  code: 'ERR_STREAM_PREMATURE_CLOSE'
} Promise {
  <rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
      at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
      at ReadStream.emit (node:events:531:35)
      at emitCloseNT (node:internal/streams/destroy:148:10)
      at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
    code: 'ERR_STREAM_PREMATURE_CLOSE'
  }
}
WriteStream Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
    at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
    at ReadStream.emit (node:events:531:35)
    at emitCloseNT (node:internal/streams/destroy:148:10)
    at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
  code: 'ERR_STREAM_PREMATURE_CLOSE'
}
1
[
  Promise {
    <rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
        at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
        at ReadStream.emit (node:events:531:35)
        at emitCloseNT (node:internal/streams/destroy:148:10)
        at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
      code: 'ERR_STREAM_PREMATURE_CLOSE'
    }
  },
  Promise {
    <rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
        at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
        at ReadStream.emit (node:events:531:35)
        at emitCloseNT (node:internal/streams/destroy:148:10)
        at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
      code: 'ERR_STREAM_PREMATURE_CLOSE'
    }
  },
  Promise {
    <rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
        at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
        at ReadStream.emit (node:events:531:35)
        at emitCloseNT (node:internal/streams/destroy:148:10)
        at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
      code: 'ERR_STREAM_PREMATURE_CLOSE'
    }
  }
]
2
[
  Promise {
    <rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
        at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
        at ReadStream.emit (node:events:531:35)
        at emitCloseNT (node:internal/streams/destroy:148:10)
        at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
      code: 'ERR_STREAM_PREMATURE_CLOSE'
    }
  },
  Promise {
    <rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
        at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
        at ReadStream.emit (node:events:531:35)
        at emitCloseNT (node:internal/streams/destroy:148:10)
        at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
      code: 'ERR_STREAM_PREMATURE_CLOSE'
    }
  },
  Promise {
    <rejected> Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
        at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
        at ReadStream.emit (node:events:531:35)
        at emitCloseNT (node:internal/streams/destroy:148:10)
        at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
      code: 'ERR_STREAM_PREMATURE_CLOSE'
    }
  }
]
Pipeline error: Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
    at ReadStream.onclose (node:internal/streams/end-of-stream:153:30)
    at ReadStream.emit (node:events:531:35)
    at emitCloseNT (node:internal/streams/destroy:148:10)
    at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
  code: 'ERR_STREAM_PREMATURE_CLOSE'
}
success
(node:13) PromiseRejectionHandledWarning: Promise rejection was handled asynchronously (rejection id: 1)
(Use `node --trace-warnings ...` to show where the warning was created)
(node:13) PromiseRejectionHandledWarning: Promise rejection was handled asynchronously (rejection id: 2)
root@0cd652c8fcb1:/tmp#
exit

Additional information

No response

markddrake commented 3 days ago

Sorry - Missed the commands and version from the 22.9 t/c


C:\Development\YADAMU\src\scratch\streams>docker cp input.txt NODE-LATEST:/tmp

C:\Development\YADAMU\src\scratch\streams>docker cp test1.js NODE-LATEST:/tmp

C:\Development\YADAMU\src\scratch\streams>docker exec -it NODE-LATEST bash
root@0cd652c8fcb1:/# node -v
v22.9.0
root@0cd652c8fcb1:/# cd tmp
root@0cd652c8fcb1:/tmp# node test1.js
``
markddrake commented 3 days ago

I have tried wrapping the return finished and Promise.allSettled in try/catch but still get the unhandled rejection


const { PassThrough} = require('stream');
const { pipeline, finished,  } = require('stream/promises');
const fs = require('fs');

class MyTransform extends PassThrough {

  constructor(is) {
    super()
    this.is = is
    this.counter = 0
  }

  async _transform(data,enc,callback) {
    this.counter++
    this.push(data)
    if (this.counter > 100) {
      this.is.close()
    }
    callback()
  }

}

async function runPipeline() {

  const is = fs.createReadStream('input.txt')
  is.on('error',(err) => {
    console.log(is.constructor.name,err)
  })
  const t = new MyTransform(is)
  t.on('error',(err) => {
    console.log(t.constructor.name,err)
  })
  const os =  fs.createWriteStream('output.txt')
  os.on('error',(err) => {
    console.log(os.constructor.name,err)
  })

  const streams = [is,t,os]
  const activeStreams = streams.map((s) => {
    try {
      return finished(s)
    } catch (e) {
      console.log('WTF-FINISHED',e)
    }
  })
  console.log(activeStreams)
  try {
    await pipeline(...streams);
    console.log(t.counter)
  } catch (err) {
    console.log(1)
    console.log(activeStreams)    
    try {
      await Promise.allSettled(activeStreams)
    } catch (e) {
      console.log('WTF-ALL-S',e)
    }
    console.log(2)
    console.log(activeStreams)
    console.error('Pipeline error:', err);
  }
}

process.on('unhandledRejection', (e,p) => {
  console.log("Unhandled",e,p)
})

runPipeline().then(() => {console.log('success')}).catch((e) => {console.log(e)})

The only workaround i have it to flag the error as handled in the stream on error and ignore it in the unhandled rejection code. It appears to be that finished must be throwing the exception, but I do not seem to be able to catch it, and I do not think it should be throwing anyway.