typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.36k stars 596 forks source link

Odd behavior of chunkBy #86

Closed fthomas closed 10 years ago

fthomas commented 10 years ago

As I was writing more tests for process1 processes I noticed an odd behavior of chunkBy:

Process(1, 3, 8, 5, 2, 4, 7).chunkBy(_ < 4).toList
res2: List[Vector[Int]] = List(Vector(1, 3, 8), Vector(5, 2, 4), Vector(7))

The emitted chunks contain elements that satisfy and that do not satisfy the predicate. Is this behavior intended? Because, I would have expected the following result:

List(Vector(1, 3), Vector(8, 5), Vector(2), Vector(4, 7))

Since there is no test for chunkBy I do not know what the result should have been. So either the documentation should be extended or the implementation should be fixed.

Btw, here is a version of chunkBy that produces the result I had expected:

def chunkByFixed[I](f: I => Boolean): Process1[I, Vector[I]] = {
  def go(acc: Vector[I], last: Boolean): Process1[I, Vector[I]] =
    await1[I].flatMap { i =>
      val cur = f(i)
      if (cur == last) go(acc :+ i, cur)
      else emit(acc) fby go(Vector(i), cur)
    } orElse emit(acc)
  await1[I].flatMap(i => go(Vector(i), f(i)))
}
runarorama commented 10 years ago

This is a strictness bug in the Await constructor. It is fixed in the "lazy" branch and the "await" branch pending a merge into trunk.— Sent from Mailbox for iPhone

On Sat, Feb 1, 2014 at 5:31 PM, Frank S. Thomas notifications@github.com wrote:

As I was writing more tests for process1 processes I noticed an odd behavior of chunkBy: Process(1, 3, 8, 5, 2, 4, 7).chunkBy(_ < 4).toList res2: List[Vector[Int]] = List(Vector(1, 3, 8), Vector(5, 2, 4), Vector(7)) The emitted chunks contain elements that satisfy and that do not satisfy the predicate. Is this behavior intended? Because, I would have expected the following result: List(Vector(1, 3), Vector(8, 5), Vector(2), Vector(4, 7)) Since there is no test for chunkBy I do not know what the result should have been. So either the documentation should be extended or the implementation should be fixed. Btw, here is version of chunkBy that produces the result I had expected: def chunkByFixed[I](f: I => Boolean): Process1[I, Vector[I]] = { def go(acc: Vector[I], last: Boolean): Process1[I, Vector[I]] = await1[I].flatMap { i => val cur = f(i) if (cur == last) go(acc :+ i, cur) else emit(acc) fby go(Vector(i), cur) } orElse emit(acc) await1[I].flatMap(i => go(Vector(i), f(i)))

}

Reply to this email directly or view it on GitHub: https://github.com/scalaz/scalaz-stream/issues/86

fthomas commented 10 years ago

I checked this in the "lazy" and "await/trampolined" branches and the result of Process(1, 3, 8, 5, 2, 4, 7).chunkBy(_ < 4).toList was always List(Vector(1, 3, 8), Vector(5, 2, 4), Vector(7)).

runarorama commented 10 years ago

Oh, sorry, I didn't read your message carefully enough. I thought this was a different problem.

The behavior of chunkBy looks correct. The purpose of this method is to split the stream on elements that don't match the predicate. The expected behavior is that it emit a chunk when an an element makes the predicate false, as indicated by the documentation for the method.

For example, if you had the stream "foo bar baz", chunkBy(_ != " ") would turn that into ["foo ", "bar ", "baz"].

On Sat, Feb 1, 2014 at 5:47 PM, Frank S. Thomas notifications@github.comwrote:

I checked this in the "lazy" and "await/trampolined" branches and the result of Process(1, 3, 8, 5, 2, 4, 7).chunkBy(_ < 4).toList was always List(Vector(1, 3, 8), Vector(5, 2, 4), Vector(7)).

Reply to this email directly or view it on GitHubhttps://github.com/scalaz/scalaz-stream/issues/86#issuecomment-33885797 .

pchlupacek commented 10 years ago

Runar,

du you think your laziness fix is tested enough for stack safety etc?

I mean there is separate branch on switching signature of await to

Await(rcv: Throwable\/ R => Trampoline[Process[F,A]], clenup : Option[Throwable] => Trampoline[Process[F,Nothing]]) which is in middle of progress.

I am just curious what will happened if we merge your code into master and if the effort with trampolining is worth of.

P.

On 02 Feb 2014, at 01:42, Rúnar notifications@github.com wrote:

Oh, sorry, I didn't read your message carefully enough. I thought this was a different problem.

The behavior of chunkBy looks correct. The purpose of this method is to split the stream on elements that don't match the predicate. The expected behavior is that it emit a chunk when an an element makes the predicate false, as indicated by the documentation for the method.

For example, if you had the stream "foo bar baz", chunkBy(_ != " ") would turn that into ["foo ", "bar ", "baz"].

On Sat, Feb 1, 2014 at 5:47 PM, Frank S. Thomas notifications@github.comwrote:

I checked this in the "lazy" and "await/trampolined" branches and the result of Process(1, 3, 8, 5, 2, 4, 7).chunkBy(_ < 4).toList was always List(Vector(1, 3, 8), Vector(5, 2, 4), Vector(7)).

Reply to this email directly or view it on GitHubhttps://github.com/scalaz/scalaz-stream/issues/86#issuecomment-33885797 .

— Reply to this email directly or view it on GitHub.

fthomas commented 10 years ago

Ok, now chunkBy and its description makes sense to me. Thanks for the clarification Runar. I'll send a PR with a test for it and an example in the docstring.

Btw, what would be a proper name for the chunkByFixed that I proposed? Maybe partition, groupBy, or chunkWith? Would it be a useful addition to process1?

runarorama commented 10 years ago

Hi Pavel,

I do not know how stack-safe it is. Do you have any specific suspicions where stack overflows are possible? I can think of two places I might suspect trampolining is needed, but I can refute them pretty easily. I know of one company that is using the lazy branch right now and they haven't run into SOEs yet.

The current behavior of chunk |> filter is totally wrong, and has been for a long time.

Runar

On Sun, Feb 2, 2014 at 2:05 AM, Pavel Chlupacek notifications@github.comwrote:

Runar,

du you think your laziness fix is tested enough for stack safety etc?

I mean there is separate branch on switching signature of await to

Await(rcv: Throwable\/ R => Trampoline[Process[F,A]], clenup : Option[Throwable] => Trampoline[Process[F,Nothing]]) which is in middle of progress.

I am just curious what will happened if we merge your code into master and if the effort with trampolining is worth of.

P.

On 02 Feb 2014, at 01:42, Rúnar notifications@github.com wrote:

Oh, sorry, I didn't read your message carefully enough. I thought this was a different problem.

The behavior of chunkBy looks correct. The purpose of this method is to split the stream on elements that don't match the predicate. The expected behavior is that it emit a chunk when an an element makes the predicate false, as indicated by the documentation for the method.

For example, if you had the stream "foo bar baz", chunkBy(_ != " ") would turn that into ["foo ", "bar ", "baz"].

On Sat, Feb 1, 2014 at 5:47 PM, Frank S. Thomas < notifications@github.com>wrote:

I checked this in the "lazy" and "await/trampolined" branches and the result of Process(1, 3, 8, 5, 2, 4, 7).chunkBy(_ < 4).toList was always List(Vector(1, 3, 8), Vector(5, 2, 4), Vector(7)).

Reply to this email directly or view it on GitHub< https://github.com/scalaz/scalaz-stream/issues/86#issuecomment-33885797> .

Reply to this email directly or view it on GitHub.

Reply to this email directly or view it on GitHubhttps://github.com/scalaz/scalaz-stream/issues/86#issuecomment-33894016 .

pchlupacek commented 10 years ago

Yeah, not sure non that.

Paul was bringing some issues with this before, so maybe he has something more on this.

Anyhow I also solved the left-assotiated flatMap performance issues in trampoline branch (not sure if I can do it w/o trampoline), so maybe I gonna still take this path. Was more thinking if we can`t get that code of yours merged so we can have something in quickly.

Also one idea is if we should`t just drop case in Await and make his signature lazy

I mean

instead of

case class Await[F[_],A,+O] private[stream](req: F[A], recv: A => Process[F,O], fallback1: %28%29 => Process[F,O] = %28%29 => halt, cleanup1: %28%29 => Process[F,O] = %28%29 => halt) extends Process[F,O]

just do

class Await[F[_],A,+O] private[stream](req: F[A], recv: A => Process[F,O], fallback1: => Process[F,O] = halt, cleanup1: => Process[F,O] = halt) extends Process[F,O] { lazy val fb = fallback1 lazy val c = cleanup1 }

object Await { def apply() … def unapply() ...

}

That will maybe solve many issues w/o changing the signature heavily and still be non-strict ?

???

P.

On 02 Feb 2014, at 15:15, Rúnar notifications@github.com wrote:

Hi Pavel,

I do not know how stack-safe it is. Do you have any specific suspicions where stack overflows are possible? I can think of two places I might suspect trampolining is needed, but I can refute them pretty easily. I know of one company that is using the lazy branch right now and they haven't run into SOEs yet.

The current behavior of chunk |> filter is totally wrong, and has been for a long time.

Runar

On Sun, Feb 2, 2014 at 2:05 AM, Pavel Chlupacek notifications@github.comwrote:

Runar,

du you think your laziness fix is tested enough for stack safety etc?

I mean there is separate branch on switching signature of await to

Await(rcv: Throwable\/ R => Trampoline[Process[F,A]], clenup : Option[Throwable] => Trampoline[Process[F,Nothing]]) which is in middle of progress.

I am just curious what will happened if we merge your code into master and if the effort with trampolining is worth of.

P.

On 02 Feb 2014, at 01:42, Rúnar notifications@github.com wrote:

Oh, sorry, I didn't read your message carefully enough. I thought this was a different problem.

The behavior of chunkBy looks correct. The purpose of this method is to split the stream on elements that don't match the predicate. The expected behavior is that it emit a chunk when an an element makes the predicate false, as indicated by the documentation for the method.

For example, if you had the stream "foo bar baz", chunkBy(_ != " ") would turn that into ["foo ", "bar ", "baz"].

On Sat, Feb 1, 2014 at 5:47 PM, Frank S. Thomas < notifications@github.com>wrote:

I checked this in the "lazy" and "await/trampolined" branches and the result of Process(1, 3, 8, 5, 2, 4, 7).chunkBy(_ < 4).toList was always List(Vector(1, 3, 8), Vector(5, 2, 4), Vector(7)).

Reply to this email directly or view it on GitHub< https://github.com/scalaz/scalaz-stream/issues/86#issuecomment-33885797> .

Reply to this email directly or view it on GitHub.

Reply to this email directly or view it on GitHubhttps://github.com/scalaz/scalaz-stream/issues/86#issuecomment-33894016 .

— Reply to this email directly or view it on GitHub.

runarorama commented 10 years ago

That works too, Pavel, although then wherever anyone is using Await, they will have to say new Await.

On Sun, Feb 2, 2014 at 9:27 AM, Pavel Chlupacek notifications@github.comwrote:

Yeah, not sure non that.

Paul was bringing some issues with this before, so maybe he has something more on this.

Anyhow I also solved the left-assotiated flatMap performance issues in trampoline branch (not sure if I can do it w/o trampoline), so maybe I gonna still take this path. Was more thinking if we can`t get that code of yours merged so we can have something in quickly.

Also one idea is if we should`t just drop case in Await and make his signature lazy

I mean

instead of

case class Await[F[_],A,+O] private[stream](req: F[A], recv: A => Process[F,O], fallback1: %28%29 => Process[F,O] = %28%29 => halt, cleanup1: %28%29 => Process[F,O] = %28%29 => halt) extends Process[F,O]

just do

class Await[F[_],A,+O] private[stream](req: F[A], recv: A => Process[F,O], fallback1: => Process[F,O] = halt, cleanup1: => Process[F,O] = halt) extends Process[F,O] { lazy val fb = fallback1 lazy val c = cleanup1 }

object Await { def apply() ... def unapply() ...

}

That will maybe solve many issues w/o changing the signature heavily and still be non-strict ?

???

P.

On 02 Feb 2014, at 15:15, Rúnar notifications@github.com wrote:

Hi Pavel,

I do not know how stack-safe it is. Do you have any specific suspicions where stack overflows are possible? I can think of two places I might suspect trampolining is needed, but I can refute them pretty easily. I know of one company that is using the lazy branch right now and they haven't run into SOEs yet.

The current behavior of chunk |> filter is totally wrong, and has been for a long time.

Runar

On Sun, Feb 2, 2014 at 2:05 AM, Pavel Chlupacek < notifications@github.com>wrote:

Runar,

du you think your laziness fix is tested enough for stack safety etc?

I mean there is separate branch on switching signature of await to

Await(rcv: Throwable\/ R => Trampoline[Process[F,A]], clenup : Option[Throwable] => Trampoline[Process[F,Nothing]]) which is in middle of progress.

I am just curious what will happened if we merge your code into master and if the effort with trampolining is worth of.

P.

On 02 Feb 2014, at 01:42, Rúnar notifications@github.com wrote:

Oh, sorry, I didn't read your message carefully enough. I thought this was a different problem.

The behavior of chunkBy looks correct. The purpose of this method is to split the stream on elements that don't match the predicate. The expected behavior is that it emit a chunk when an an element makes the predicate false, as indicated by the documentation for the method.

For example, if you had the stream "foo bar baz", chunkBy(_ != " ") would turn that into ["foo ", "bar ", "baz"].

On Sat, Feb 1, 2014 at 5:47 PM, Frank S. Thomas < notifications@github.com>wrote:

I checked this in the "lazy" and "await/trampolined" branches and the result of Process(1, 3, 8, 5, 2, 4, 7).chunkBy(_ < 4).toList was always List(Vector(1, 3, 8), Vector(5, 2, 4), Vector(7)).

Reply to this email directly or view it on GitHub<

https://github.com/scalaz/scalaz-stream/issues/86#issuecomment-33885797>

.

Reply to this email directly or view it on GitHub.

Reply to this email directly or view it on GitHub< https://github.com/scalaz/scalaz-stream/issues/86#issuecomment-33894016> .

Reply to this email directly or view it on GitHub.

Reply to this email directly or view it on GitHubhttps://github.com/scalaz/scalaz-stream/issues/86#issuecomment-33901550 .

pchlupacek commented 10 years ago

Ok for this we can define Await.apply perhaps?

P.

On 02 Feb 2014, at 16:02, Rúnar notifications@github.com wrote:

That works too, Pavel, although then wherever anyone is using Await, they will have to say new Await.

On Sun, Feb 2, 2014 at 9:27 AM, Pavel Chlupacek notifications@github.comwrote:

Yeah, not sure non that.

Paul was bringing some issues with this before, so maybe he has something more on this.

Anyhow I also solved the left-assotiated flatMap performance issues in trampoline branch (not sure if I can do it w/o trampoline), so maybe I gonna still take this path. Was more thinking if we can`t get that code of yours merged so we can have something in quickly.

Also one idea is if we should`t just drop case in Await and make his signature lazy

I mean

instead of

case class Await[F[_],A,+O] private[stream](req: F[A], recv: A => Process[F,O], fallback1: %28%29 => Process[F,O] = %28%29 => halt, cleanup1: %28%29 => Process[F,O] = %28%29 => halt) extends Process[F,O]

just do

class Await[F[_],A,+O] private[stream](req: F[A], recv: A => Process[F,O], fallback1: => Process[F,O] = halt, cleanup1: => Process[F,O] = halt) extends Process[F,O] { lazy val fb = fallback1 lazy val c = cleanup1 }

object Await { def apply() ... def unapply() ...

}

That will maybe solve many issues w/o changing the signature heavily and still be non-strict ?

???

P.

On 02 Feb 2014, at 15:15, Rúnar notifications@github.com wrote:

Hi Pavel,

I do not know how stack-safe it is. Do you have any specific suspicions where stack overflows are possible? I can think of two places I might suspect trampolining is needed, but I can refute them pretty easily. I know of one company that is using the lazy branch right now and they haven't run into SOEs yet.

The current behavior of chunk |> filter is totally wrong, and has been for a long time.

Runar

On Sun, Feb 2, 2014 at 2:05 AM, Pavel Chlupacek < notifications@github.com>wrote:

Runar,

du you think your laziness fix is tested enough for stack safety etc?

I mean there is separate branch on switching signature of await to

Await(rcv: Throwable\/ R => Trampoline[Process[F,A]], clenup : Option[Throwable] => Trampoline[Process[F,Nothing]]) which is in middle of progress.

I am just curious what will happened if we merge your code into master and if the effort with trampolining is worth of.

P.

On 02 Feb 2014, at 01:42, Rúnar notifications@github.com wrote:

Oh, sorry, I didn't read your message carefully enough. I thought this was a different problem.

The behavior of chunkBy looks correct. The purpose of this method is to split the stream on elements that don't match the predicate. The expected behavior is that it emit a chunk when an an element makes the predicate false, as indicated by the documentation for the method.

For example, if you had the stream "foo bar baz", chunkBy(_ != " ") would turn that into ["foo ", "bar ", "baz"].

On Sat, Feb 1, 2014 at 5:47 PM, Frank S. Thomas < notifications@github.com>wrote:

I checked this in the "lazy" and "await/trampolined" branches and the result of Process(1, 3, 8, 5, 2, 4, 7).chunkBy(_ < 4).toList was always List(Vector(1, 3, 8), Vector(5, 2, 4), Vector(7)).

Reply to this email directly or view it on GitHub<

https://github.com/scalaz/scalaz-stream/issues/86#issuecomment-33885797>

.

Reply to this email directly or view it on GitHub.

Reply to this email directly or view it on GitHub< https://github.com/scalaz/scalaz-stream/issues/86#issuecomment-33894016> .

Reply to this email directly or view it on GitHub.

Reply to this email directly or view it on GitHubhttps://github.com/scalaz/scalaz-stream/issues/86#issuecomment-33901550 .

— Reply to this email directly or view it on GitHub.

runarorama commented 10 years ago

That works.

On Sun, Feb 2, 2014 at 10:03 AM, Pavel Chlupacek notifications@github.comwrote:

Ok for this we can define Await.apply perhaps?

P.

On 02 Feb 2014, at 16:02, Rúnar notifications@github.com wrote:

That works too, Pavel, although then wherever anyone is using Await, they will have to say new Await.

On Sun, Feb 2, 2014 at 9:27 AM, Pavel Chlupacek < notifications@github.com>wrote:

Yeah, not sure non that.

Paul was bringing some issues with this before, so maybe he has something more on this.

Anyhow I also solved the left-assotiated flatMap performance issues in trampoline branch (not sure if I can do it w/o trampoline), so maybe I gonna still take this path. Was more thinking if we can`t get that code of yours merged so we can have something in quickly.

Also one idea is if we should`t just drop case in Await and make his signature lazy

I mean

instead of

case class Await[F[_],A,+O] private[stream](req: F[A], recv: A => Process[F,O], fallback1: %28%29 => Process[F,O] = %28%29 => halt, cleanup1: %28%29 => Process[F,O] = %28%29 => halt) extends Process[F,O]

just do

class Await[F[_],A,+O] private[stream](req: F[A], recv: A => Process[F,O], fallback1: => Process[F,O] = halt, cleanup1: => Process[F,O] = halt) extends Process[F,O] { lazy val fb = fallback1 lazy val c = cleanup1 }

object Await { def apply() ... def unapply() ...

}

That will maybe solve many issues w/o changing the signature heavily and still be non-strict ?

???

P.

On 02 Feb 2014, at 15:15, Rúnar notifications@github.com wrote:

Hi Pavel,

I do not know how stack-safe it is. Do you have any specific suspicions where stack overflows are possible? I can think of two places I might suspect trampolining is needed, but I can refute them pretty easily. I know of one company that is using the lazy branch right now and they haven't run into SOEs yet.

The current behavior of chunk |> filter is totally wrong, and has been for a long time.

Runar

On Sun, Feb 2, 2014 at 2:05 AM, Pavel Chlupacek < notifications@github.com>wrote:

Runar,

du you think your laziness fix is tested enough for stack safety etc?

I mean there is separate branch on switching signature of await to

Await(rcv: Throwable\/ R => Trampoline[Process[F,A]], clenup : Option[Throwable] => Trampoline[Process[F,Nothing]]) which is in middle of progress.

I am just curious what will happened if we merge your code into master and if the effort with trampolining is worth of.

P.

On 02 Feb 2014, at 01:42, Rúnar notifications@github.com wrote:

Oh, sorry, I didn't read your message carefully enough. I thought this was a different problem.

The behavior of chunkBy looks correct. The purpose of this method is to split the stream on elements that don't match the predicate. The expected behavior is that it emit a chunk when an an element makes the predicate false, as indicated by the documentation for the method.

For example, if you had the stream "foo bar baz", chunkBy(_ != " ") would turn that into ["foo ", "bar ", "baz"].

On Sat, Feb 1, 2014 at 5:47 PM, Frank S. Thomas < notifications@github.com>wrote:

I checked this in the "lazy" and "await/trampolined" branches and the result of Process(1, 3, 8, 5, 2, 4, 7).chunkBy(_ < 4).toList was always List(Vector(1, 3, 8), Vector(5, 2, 4), Vector(7)).

Reply to this email directly or view it on GitHub<

https://github.com/scalaz/scalaz-stream/issues/86#issuecomment-33885797>

.

Reply to this email directly or view it on GitHub.

Reply to this email directly or view it on GitHub<

https://github.com/scalaz/scalaz-stream/issues/86#issuecomment-33894016>

.

Reply to this email directly or view it on GitHub.

Reply to this email directly or view it on GitHub< https://github.com/scalaz/scalaz-stream/issues/86#issuecomment-33901550> .

Reply to this email directly or view it on GitHub.

Reply to this email directly or view it on GitHubhttps://github.com/scalaz/scalaz-stream/issues/86#issuecomment-33902344 .

fthomas commented 10 years ago

WRT to naming chunkByFixed, scalaz.Foldable has splitWith:

Vector(1, 3, 8, 5, 2, 4, 7).splitWith(_ < 4)
res3: List[List[Int]] = List(List(1, 3), List(8, 5), List(2), List(4, 7))