krasserm / streamz

A combinator library for integrating Functional Streams for Scala (FS2), Akka Streams and Apache Camel
Apache License 2.0
282 stars 44 forks source link

Replace deprecated actor publisher/subscriber #52

Closed notxcain closed 6 years ago

notxcain commented 6 years ago

This PR (ref #32) removes the usage of deprecated AkkaSink.actorSubscriber and AkkaSource.actorPublisher in favour of AkkaSink.queue and AkkaSource.queue correspondently.

@krasserm I was wrong about this change being non-breaking as it adds Concurrent constraint to some methods to be able to use Stream#interruptWhen.

@daenyth I would be very glad if you could review the implementation of publisherStream, it looks a bit clumsy to me, all tests pass though.

notxcain commented 6 years ago

@daenyth there’re no other breaking changes.

krasserm commented 6 years ago

LGTM, thanks for the PR! Will merge after subscriber cancellation is done in onFinalize.

Daenyth commented 6 years ago

Shift seems right to me

On Thu, Sep 27, 2018, 5:30 AM Denis Mikhaylov notifications@github.com wrote:

@notxcain commented on this pull request.

In streamz-converter/src/main/scala/streamz/converter/Converter.scala https://github.com/krasserm/streamz/pull/52#discussion_r220855131:

@@ -133,34 +130,31 @@ trait Converter { }).mapMaterializedValue(_ => NotUsed) }

  • private def subscriberStream[F[_], A](subscriber: ActorRef)(implicit context: ContextShift[F], F: Async[F]): Stream[F, A] = {
  • val pull = context.shift >> F.async((callback: Callback[Option[A]]) => subscriber ! Request(callback))
  • private def subscriberStream[F[_], A](subscriber: SinkQueueWithCancel[A])(implicit context: ContextShift[F], F: Async[F]): Stream[F, A] = {
  • val pull = context.shift >> F.liftIO(IO.fromFuture(IO(subscriber.pull())))

@Daenyth https://github.com/Daenyth @krasserm https://github.com/krasserm Do we really need context.shift here?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/krasserm/streamz/pull/52#pullrequestreview-159358983, or mute the thread https://github.com/notifications/unsubscribe-auth/AAA5NJV-rptt1flAHdHLibj76tYqyBKKks5ufJrQgaJpZM4W10NQ .

notxcain commented 6 years ago

@Daenyth should it be added to publish as well?

Daenyth commented 6 years ago

That seems reasonable. @SystemFw might be able to say better than me

notxcain commented 6 years ago

Have you seen this PR?

Daenyth commented 6 years ago

It was a separate repo before. I've used it, works fine AFAIK

krasserm commented 6 years ago

@notxcain @Daenyth what do you think of a complete re-implementation of the converters on top of https://github.com/functional-streams-for-scala/fs2/pull/1259 ?

notxcain commented 6 years ago

@krasserm I think it's very reasonable.

Daenyth commented 6 years ago

It's possible. Do you have benchmarks? It feels like it would be going through multiple layers of indirection when it doesn't necessarily need that. I also wonder about managing the Mat values - it seems like you'd have to convert akka-streams to reactive streams first, then from that to fs2.

notxcain commented 6 years ago

@Daenyth I think it's the same amount of indirection, we just replace Akka Queue level with Reactive Streams level. Benchmarks would be great to have, indeed.

notxcain commented 6 years ago

Well, it's not as straightforward as I thought it would be :)

krasserm commented 6 years ago

No benchmarks, unfortunately ...

Well, it's not as straightforward as I thought it would be :)

@notxcain can you elaborate? Would it still reduce complexity in the streamz converter module or just create new complexity somewhere else. I didn't look into the FS2 - RS integration yet but naively thought that building on top of it would off-load most of the integration complexity to FS2 as Akka Streams already has an RS interface i.e. is an RS implementation. Also, RS advertises that implementations are easy to connect (I never tried though), so could this even make converter module obsolete?

notxcain commented 6 years ago

@krasserm The hard part is converting Pipe to Flow and vice versa. Everything else seems to be easy. If there is time, I'll give it another shot tomorrow.

krasserm commented 6 years ago

@notxcain I think conversion of sources and sinks is actually more important. Would be really interesting to see an implementation of it!

Have you and/or @Daenyth ever used the Pipe <-> Flow conversion? I never did, just added it for completeness. Would you see it as a problem to drop support for that?

Daenyth commented 6 years ago

Yes, I've used it. I probably don't need it, but would prefer to keep it if possible.

It's avoidable by doing

val p: fs2.Pipe...
val as: Source[A, NU]
val snk: AkkaSink[A, NU]

as.toStream[IO].through(p).toSource().runWith(snk)

and vice-versa. But it's not as nice.

krasserm commented 6 years ago

@Daenyth sure, then let's keep it. Maybe it makes sense to let the existing converter implementation co-exist with another one based on FS2-RS and then see if we can deprecate the current implementation at some time.

krasserm commented 6 years ago

LGTM. @notxcain do you plan further changes in this PR or ready to merge?

notxcain commented 6 years ago

@krasserm I think that’s it.

krasserm commented 6 years ago

Thanks a lot @notxcain, great contribution!

notxcain commented 6 years ago

@krasserm my pleasure 😃