typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.37k stars 601 forks source link

Process1 with stateful computation #309

Closed prange closed 9 years ago

prange commented 9 years ago

As far i can tell are there no combinators to perform a stateful computation where the output of a Process1 is computed by a state and an input. If i am not mistaken scan outputs the same state that is input to the "next" step, and zipWithState carries the original input and new state to the output.

I imagine either with some type carrying the state data (S):

stateful: ((S,A) => (S,B)) => S => Process1[A,B]

or with some Mealy machine

type Mealy[A,B] = (a:A)=>(Mealy[A,B],B)

mealy: Mealy[A,B] => Process1[A,B]

Am i mistaken, or is this easily achieved with the existing combinators?

djspiewak commented 9 years ago

It's not easily achieved with the existing combinators. I would absolutely die of happiness if we had some sort of (A => State[S, B]) => S => Process1[A, B] combinator. I tried pretty hard to implement this here, but the result isn't generalized.

fthomas commented 9 years ago

I think you could write mapStateful in terms of zipWithScan1 by squeezing the S and O types into zipWithScan1's B type like this:

  def mapStateful2[S,I,O](s: S)(g: (S, I) => (S, O)): Process1[I,O] =
    zipWithScan1[I, (S, Option[O])]((s, None)) { case (a, (sn, _)) =>
      val (s2, o) = g(sn, a)
      (s2, Some(o))
    }.map(_._2._2).pipe(stripNone)

But this is neither easy nor pretty.

pchiusano commented 9 years ago

I like the mapStateful2 signature and would accept a PR for that. Not sure what to call it though... mapStateful is pretty weird. I think in the book we called it loop, but I'm not a big fan of that name either..

On Tue Jan 27 2015 at 5:56:36 PM Frank S. Thomas notifications@github.com wrote:

I think you could write mapStateful in terms of zipWithScan1 by squeezing the S and O types into zipWithScan1's B type like this:

def mapStateful2[S,I,O](s: S)(g: (S, I) => (S, O)): Process1[I,O] = zipWithScan1[I, (S, Option[O])](%28s, None%29) { case (a, (sn, )) => val (s2, o) = g(sn, a) (s2, Some(o)) }.map(._2._2).pipe(stripNone)

But this is neither easy nor pretty.

— Reply to this email directly or view it on GitHub https://github.com/scalaz/scalaz-stream/issues/309#issuecomment-71746205 .

fthomas commented 9 years ago

The signature looks a bit like Traversable's mapAccum*:

mapAccumL :: Traversable t => (a -> b -> (a, c)) -> a -> t b -> (a, t c) 

but without emitting the accumulator. Should we generalize to

def mapAccum[S,I,O](s: S)(g: (S, I) => (S, O)): Process1[I, (S, O)]

?

prange commented 9 years ago

+1 for mapAccum. @fthomas : Although i am for generalization i have a feeling that the S would be thrown away in most use cases. Using a "right" operator Process1[(I1,I2),I2] will give the best from both worlds:

mapAccum(a)((a,b)=>(a+b,a)) pipe right

pchlupacek commented 9 years ago

how about this signature :


def mapAccum[S,I,O](s: S)(g: (S, I) => (S, O)): Writer1[S, I, O]

// w/o `S`

mapAccum.stripW

// `S` signal

mapAccum.stripO
pchiusano commented 9 years ago

I like mapAccum, signature suggested by @fthomas. Except let's call it mapAccumulate, since I hate abbreviations. I prefer the version that returns pairs. You can always convert that to a Writer1 or just do .map(_._1) if you need to discard. @prange would you like to prepare a PR since you submitted the issue?

pchlupacek commented 9 years ago

@pchiusano I see what you say. There are however situations when you want to emit S even without receiving the I which may not be possible if you do output only in tuples.

fthomas commented 9 years ago

@pchlupacek The only situation I can imagine where mapAccumulate could emit a S without receiving an I is when its input is empty and it would just emit the initial S. If it would be implemented like that I wouldn't call it map... because it isn't structure-preserving then. Mapping over an empty Process should result in an empty Process.

pchlupacek commented 9 years ago

@fthomas yes you are right, It won't be consistent behaviour with name. Is like difference actually between scan and scan1. Lets go with mapAccum as it is, is likely different combinator then.

prange commented 9 years ago

I will prepare the PR, based on your implementation example above.

As i understand there is consensus for the following signature:

def mapAccumulate[S,I,O](s: S)(g: (S, I) => (S, O)): Process1[I, (S, O)]

If you are opposed, speak now or forever hold your tongue (well, at least wait for some time)

fthomas commented 9 years ago

@prange My implementation of mapStateful2 was not meant to be taken seriously! Using zipWithScan1 there was a gross hack and I did it only to show that it can be implemented with the current combinators. Sorry!

I would implement mapAccumulate similar to how zipWithScan1 is implemented.

prange commented 9 years ago

@fthomas , well, it works like a charm. ;) But yeah, i will take a closer look. The propsed signatures is also slightly different.

fthomas commented 9 years ago

@prange what is the status of this?