scramjetorg / scramjet

Public tracker for Scramjet Cloud Platform, a platform that bring data from many environments together.
https://www.scramjet.org
MIT License
253 stars 20 forks source link

Stateful stream transformation #111

Closed prange closed 2 years ago

prange commented 3 years ago

Hi,

i looked in the docs, but could't find a suitable combinator for stateful stream transformation.

I would like to achieve something like this:

const initialstate : S = ...
const streamOfAs : DataStream<Input> = ....
const streamOfCs : DataStream<Output> = stream.statefulMap(init:S, (value:Input, state:S)=> [Output,S] )

Is this within reach with the current API?

MichalCz commented 3 years ago

Hi @prange,

You could use use:

Docs: DataStream..use

outerStream.use(stream => { // this is the same stream as outerStream
    const state: S = new State();
    const out = stream.map(value => someStateMapper(value, state));
    return out;
})

I think you could also try DataStream..reduceNow, but that depends on the use case.

@f1ames @jan-warchol @ErykSol - guys, while working on the new version 5 interface, take a look at this. :)

jan-warchol commented 3 years ago

@prange Could you provide a specific example? It would make it easier to reason about how this should work.

MichalCz commented 2 years ago

I'll close this as a stale issue. @prange if you'd like to follow up - reopen this issue, or perhaps reach out to me and @jan-warchol on Scramjet's Slack