scramjetorg / scramjet

Public tracker for Scramjet Cloud Platform, a platform that bring data from many environments together.
https://www.scramjet.org
MIT License
253 stars 20 forks source link

It would be nice to have a way to stop processing records on first error. #69

Closed DOOMDANY closed 4 years ago

DOOMDANY commented 4 years ago

think in an scenario using the following code:

await DataStream
    .from(productsStream)
    .setOptions({ maxParallel : 1 })
    .map(doSomething)
    .batch(25)
    .map(doSomethingElse)
    .catch(e => {
        e.stream.end();
    })
    .run();

When any error occurs the streams is closed but for some reason scramjet is still processing data, is there a way to stop processing the data and return/throw/rise the error?

MichalCz commented 4 years ago

Hi @danielplascencia,

The batch method doesn't pass the the maxParallel settings further, since it does break the chain of transforms. If you push the setOptions code lower, it should stop immediately (mind you it will have an impact on performance).

The reason for this happening is that scramjet runs operations in parallel and because all operations are treated as asynchronous (or more specifically chained promises), a number of those is scheduled in advance.

Sadly I know of some implementations already using the fact that some options are not conveyed and this change would need a major release and since that is supposed to bring a complete rewrite of the framework, that's not going the be happening within weeks...

stale[bot] commented 4 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.