Existing stream-based iterators (stream-processors) such as Map/Filter/Reduce/For/etc. can't stop stream producer from continuing producing more elements, even thought there's nothing to process.
Even with the :err outport and error propagation from handler node, we we'll have to wait until the end of the stream (And it's possible to imagine use-case where stream itself is infinite and is processed until some condition is met).
Example: client sent request to insert 1M records. On server we get array, turn it into a stream and process it. On record e.g. №389 we see it's invalid and we throw an error. It's expected to stop processing rest of the collection immediately but we won't do that. Despite getting an error by client and this assuming request should be repeated (with e.g. different input), actually on the server we will continue process rest of the previous stream and update db.
Even if e.g. For is implemented in a way, that after first error it simply skips all the rest elements, until new stream is started, we still have to wait end of that stream. So in a worse case this affects correctness, in best - performance.
Proposal
Introduce interface IStreamProducer that describes abstract stream producer, that is expected to implement "iterator protocol" - a pattern, where producer emits stream elements one by one after receiving a signals from stream consumer. That is, it's a bi-directional communication. One might say it's a "call-return" implemented through "send-receive".
p IStreamProducer<int>
c For<int>{XXX}
---
[:sig, c:res] -> p:sig
c:err -> p:err
p:res -> c
Problem 1 - How to successfully break?
Not always we wanna stop because something is wrong, maybe we just did what we wanted not interested in rest of the stream. There are different ways of extending producer but we need to think carefully: maybe, union, specific error like streams.ErrBreak (Go-like way), maybe even introducing Result<T>?
Problem
Existing stream-based iterators (stream-processors) such as
Map/Filter/Reduce/For/etc.
can't stop stream producer from continuing producing more elements, even thought there's nothing to process.Even with the
:err
outport and error propagation from handler node, we we'll have to wait until the end of the stream (And it's possible to imagine use-case where stream itself is infinite and is processed until some condition is met).Example: client sent request to insert 1M records. On server we get array, turn it into a stream and process it. On record e.g. №389 we see it's invalid and we throw an error. It's expected to stop processing rest of the collection immediately but we won't do that. Despite getting an error by client and this assuming request should be repeated (with e.g. different input), actually on the server we will continue process rest of the previous stream and update db.
Even if e.g.
For
is implemented in a way, that after first error it simply skips all the rest elements, until new stream is started, we still have to wait end of that stream. So in a worse case this affects correctness, in best - performance.Proposal
Introduce interface
IStreamProducer
that describes abstract stream producer, that is expected to implement "iterator protocol" - a pattern, where producer emits stream elements one by one after receiving a signals from stream consumer. That is, it's a bi-directional communication. One might say it's a "call-return" implemented through "send-receive".Usage example:
Problem 1 - How to successfully break?
Not always we wanna stop because something is wrong, maybe we just did what we wanted not interested in rest of the stream. There are different ways of extending producer but we need to think carefully:
maybe
,union
, specific error likestreams.ErrBreak
(Go-like way), maybe even introducingResult<T>
?