scramjetorg / scramjet

Public tracker for Scramjet Cloud Platform, a platform that bring data from many environments together.
https://www.scramjet.org
MIT License
253 stars 20 forks source link

StringStream.whenEnd produces UnhandledPromiseRejectionWarning on error #103

Closed mathieubergeron closed 3 years ago

mathieubergeron commented 3 years ago

Basics

Hi! I just discovered Scramjet, it looks awesome! I've previously tried highland but ran into some issues regarding how Readable/Writable streams are handled (or had to be handled) on errors - streams were not always closed properly on errors. Moreover, highland seems to be no longer supported. I think Scramjet is a very nice alternative.

So, I'm performing small experiments to understand how Scramjet handles nodejs Readable/Writable streams. More specifically, I want to verify if streams are always properly closed, even on error.

But first, here is a simple test case just to make sure everything works fine:

import { StringStream } from 'scramjet';
import { createReadStream, createWriteStream } from 'fs';

const inputStream = createReadStream('/tmp/in.txt', { encoding: 'utf-8' });
const outputStream = createWriteStream('/tmp/out.txt', { encoding: 'utf-8' });

inputStream.on('close', () => console.log('inputStream close'));
outputStream.on('close', () => console.log('outputStream close'));

StringStream.from(inputStream)
  .setOptions({ maxParallel: 1 })
  .lines()
  .map((line) => {
    console.log(`map -> ${line}`);
    return `${line} - ok\n`;
  })
  .tee(outputStream)
  .whenEnd()
  .then(() => {
    console.log('Done!');
  })
  .catch((err) => {
    console.log(`Error: ${err}`);
  });

Note: I'm not sure if using tee (or copy) instead of pipe is the proper way to write the result into a file. But ultimatelly, I want to return a Promise that will resolve on end, or reject as soon as an error occurs. On error, the stream should also stop processing new chunk. pipe does not allow me to do that.

The standard output of that code is:

map -> 1
map -> 2
map -> 3
map -> 4
map -> 5
map -> 6
map -> 7
map -> 8
map -> 9
map -> 10
map -> 11
map -> 12
map -> 13
map -> 14
map -> 15
map -> 16
map -> 
Done!
inputStream close
outputStream close

Obervations/Remarks:

Other then that, all seems great.

Describe the bug

Then I tried to simulate an error:

import { StringStream } from 'scramjet';
import { createReadStream, createWriteStream } from 'fs';

const inputStream = createReadStream('/tmp/in.txt', { encoding: 'utf-8' });
const outputStream = createWriteStream('/tmp/out.txt', { encoding: 'utf-8' });

inputStream.on('close', () => console.log('inputStream close'));
outputStream.on('close', () => console.log('outputStream close'));

let lineCounter = 0;

StringStream.from(inputStream)
  .setOptions({ maxParallel: 1 })
  .lines()
  .map((line) => {
    console.log(`map -> ${line}`);

    // Simulate error at line 5.
    lineCounter++;
    if (lineCounter === 5) {
      throw new Error('Simulated error');
    }

    return `${line} - ok\n`;
  })
  .tee(outputStream)
  .whenEnd()
  .then(() => {
    console.log('Done!');
  })
  .catch((err) => {
    console.log(`Catched: ${err}`);
  });

And faced two (potentially related) issues:

Here is the output:

map -> 1
map -> 2
map -> 3
map -> 4
map -> 5
Catched: Error: Simulated error
(node:19178) UnhandledPromiseRejectionWarning: Error: The "chunk" argument must be of type string or an instance of Buffer or Uint8Array. Received type boolean (true)
    at /home/mbergero/dev/speech-practice/dyno/node_modules/scramjet-core/lib/util/mk-transform.js:73:51
    at processTicksAndRejections (internal/process/task_queues.js:93:5)
  caused by:
    at WriteStream.Writable.write (internal/streams/writable.js:285:13)
    at StringStream.ondata (internal/streams/readable.js:719:22)
    at StringStream.emit (events.js:315:20)
    at StringStream.EventEmitter.emit (domain.js:467:12)
    at addChunk (internal/streams/readable.js:309:12)
    at readableAddChunk (internal/streams/readable.js:284:9)
    at StringStream.Readable.push (internal/streams/readable.js:223:10)
    at StringStream.Transform.push (internal/streams/transform.js:166:32)
    at /home/mbergero/dev/speech-practice/dyno/node_modules/scramjet-core/lib/util/mk-transform.js:71:38
    at processTicksAndRejections (internal/process/task_queues.js:93:5)
  --- raised in StringStream(2) constructed ---
    at new PromiseTransformStream (/home/mbergero/dev/speech-practice/dyno/node_modules/scramjet-core/lib/util/promise-transform-stream.js:65:27)
    at new DataStream (/home/mbergero/dev/speech-practice/dyno/node_modules/scramjet-core/lib/data-stream.js:43:9)
    at new StringStream (/home/mbergero/dev/speech-practice/dyno/node_modules/scramjet-core/lib/string-stream.js:34:9)
    at StringStream.map (/home/mbergero/dev/speech-practice/dyno/node_modules/scramjet-core/lib/data-stream.js:197:26)
    at Object.<anonymous> (/home/mbergero/dev/speech-practice/dyno/scramjet-error-handling.ts:15:4)
    at Module._compile (internal/modules/cjs/loader.js:1063:30)
    at Module.m._compile (/opt/nodejs/node-v14.15.4-linux-x64/lib/node_modules/ts-node/src/index.ts:1056:23)
    at Module._extensions..js (internal/modules/cjs/loader.js:1092:10)
    at Object.require.extensions.<computed> [as .ts] (/opt/nodejs/node-v14.15.4-linux-x64/lib/node_modules/ts-node/src/index.ts:1059:12)
    at Module.load (internal/modules/cjs/loader.js:928:32)
(Use `node --trace-warnings ...` to show where the warning was created)
(node:19178) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 5)
(node:19178) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

inputStream close

To Reproduce

Would you prefer me to create a public repo with that code? That would not be a problem.

Expected behavior

I guess my first question actually is: am I wrong in thinking that closing the output stream on error is the responsability of scramjet? If I am wrong, how should I handle that properly?

Test case

If possible, please provide an automated test case to include, better yet in a forked scramjet repo in test/cases.

Let me know if you'd like me to do that.

System

Thanks a lot! Let me know if I can do anything to help.

MichalCz commented 3 years ago

Hi Mathieu,

Thanks for the extensive report. I will be looking into this over the weekend.

You're probably quite right this happens due to an error in tee, but at the same time I'm not sure if this is the best way to achieve what you're trying here...

I'll propose a temporary solution and go after the tee'd error.

Best, M.

MichalCz commented 3 years ago

Hi, I managed to identify the issue - the raise method seems to return true which does stop the stream, but in a strange way: by pushing a true chunk which in fact does fail. I'm looking into solutions.

MichalCz commented 3 years ago

Ok, so I did find a problem and a solution was found, however it breaks almost all current tests and cannot be supported in the current interface.

Sadly no elegant workaround is available, but this works:

https://github.com/scramjetorg/scramjet-issue-tests/blob/main/103-whenend/index-workaround.js

In April/May we will be starting work on a new API for Scramjet v5 - v4 interface will be supported as is through a compatibility layer.

mathieubergeron commented 3 years ago

Thank you for your investigation! I will be looking forward for v5 then :)