krasserm / streamz

A combinator library for integrating Functional Streams for Scala (FS2), Akka Streams and Apache Camel
Apache License 2.0
282 stars 44 forks source link

Creating race conditions when converting akka streams to fs2 streams is too easy #54

Closed Daenyth closed 4 years ago

Daenyth commented 6 years ago

The problem: Akka streams have materialized values, but the default .toStream() or .toSink() discard them. I've created race conditions by converting sinks with a materialized Future result and not waiting on that future.

Ad-hoc solution: I created this:

implicit class RichAkkaSink[A, B](val sink: AkkaSink[A, Future[B]])
        extends AnyVal {

      /** Converts an akka sink with a success-status-indicating Future[B]
        * materialized result into an fs2 stream which will fail if the Future fails.
        * The stream returned by this will emit the Future's value one time at the end,
        * then terminate.
        */
      def toSinkWithStatusMat(
          implicit ec: ExecutionContext,
          m: Materializer
      ): Pipe[IO, A, B] =
        Fs2AkkaCompat.toSinkWithStatusMat(sink)

      /** The same as toSinkWithStatusMat, but ignoring the materialized value */
      def toSinkWithStatusMat_(
          implicit ec: ExecutionContext,
          m: Materializer
      ): Sink[IO, A] =
        in => in.through(Fs2AkkaCompat.toSinkWithStatusMat(sink)).void
    }

  /** Converts an akka sink with a success-status-indicating Future[B]
    * materialized result into an fs2 stream which will fail if the Future fails.
    * The stream returned by this will emit the Future's value one time at the end,
    * then terminate.
    */
  def toSinkWithStatusMat[A, B](
      akkaSink: AkkaSink[A, Future[B]]
  )(
      implicit ec: ExecutionContext,
      m: Materializer
  ): Pipe[IO, A, B] = {
    val mkPromise = Promise.empty[IO, Either[Throwable, B]]
    // `Sink` is just a function of Stream[F, A] => Stream[F, Unit], so we take a stream as input.
    in =>
      Stream.eval(mkPromise).flatMap { p =>
        // Akka streams produce a materialized value as a side effect of being run.
        // streamz-converters allows us to have a `Future[Done] => Unit` callback when that materialized value is created.
        // This callback tells the akka materialized future to store its result status into the Promise
        val captureMaterializedResult: Future[B] => Unit = _.onComplete {
          case Failure(ex)    => p.complete(Left(ex)).unsafeRunSync
          case Success(value) => p.complete(Right(value)).unsafeRunSync
        }
        // toSink is from streamz-converters; convert an akka sink to fs2 sink with a callback for the materialized values
        val fs2Sink: Sink[IO, A] = akkaSink.toSink(captureMaterializedResult)

        val fs2Stream: Stream[IO, Unit] = fs2Sink.apply(in)
        val materializedResultStream: Stream[IO, B] = Stream.eval {
          p.get // Async wait on the promise to be completed; => IO[Either[Throwable, B]]
          .rethrow // IO[Either[Throwable, B]] => IO[B]
        }
        // Run the akka sink for its effects and then run stream containing the effect of getting the Promise results
        fs2Stream.drain ++ materializedResultStream
      }
  }

The problem with my approach is that the user is still required to realize they need the alternate converter method.

A more comprehensive solution: I'd like to make a breaking API change before we make the 1.x release.

// currently
def toSink(onMaterialization: M => Unit = _ => ())

Proposed:

package streamz.converter {
  // For convenience on wildcard imports
  val Discard: M => Unit = _ => ()
}

def toSink(onMaterialization: M => Unit)
def toSink()(implicit ev: M =:= Akka.NotUsed)

This guarantees that a caller must either provide an explicit onMaterialization, for which Discard will be wildcard imported for convenience, or if they don't want to pass one, that M must be NotUsed.

I haven't tested this yet, but I believe the general approach should work.

If you're willing to take this, I'll send a PR implementing it

krasserm commented 6 years ago

I like the general idea of exposing the materialized value as FS2 stream instead of providing a callback (which makes composition on FS2 side hard or impossible). But for better symmetry with Akka Sink|Source|Flow plus materialized value what do you think about adding converters like toSinkMat, toSourceMat and toPipeMat returning a tuple containing the converted Sink|Stream|Pipe plus a Stream that emits the single materialized value. For example (in pseudocode):

val (fs2Sink, fs2MatStream) = akkaSink.toSinkMat()
val (fs2Source, fs2MatStream) = akkaSource.toSourceMat()
...

This would allow applications to compose the result tuple elements as they like in FS2. Also, when converting an Akka Source to an FS2 Stream, the Stream element type and the materialized value type usually differ and cannot be combined into a single Stream (unless you use something like Either[A,B] as element type but this has less symmetry to Akka Streams).

The onMaterialization parameter of existing converters toSink, toStream and toPipe could then be removed/deprecated. WDYT?

Daenyth commented 6 years ago

That's not a bad idea - we can have toFoo(implicit ev: Mat =:= NotUsed) and toFooMat returning the tuple. That sounds like a nice interface!

krasserm commented 6 years ago

Sounds good, would be great to see a PR! I'm not sure if toFoo(implicit ev: Mat =:= NotUsed) is too restrictive but we can discuss that in the PR later. Thanks for your useful proposal!

Daenyth commented 6 years ago

I don't think it's too bad - If there's a used Mat value, I think discarding it should be done explicitly, hence the Discard constant that I'd put in the package object to be imported with the implicits

Daenyth commented 6 years ago

I have a WIP proposal here: https://github.com/krasserm/streamz/compare/master...Daenyth:matvalue

I'm playing around a bit to see if I can keep the single method name next, while still having the differing return types based on Mat

krasserm commented 6 years ago

I don't think it's too bad - If there's a used Mat value, I think discarding it should be done explicitly, hence the Discard constant that I'd put in the package object to be imported with the implicits

Understood. If discarding can be done explicitly than it's fine. I thought users are forced to use toFooMat then. So please ignore my previous concern then.