reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.99k stars 1.21k forks source link

What is the alternative way to return a Subscriber that is connected to a Flux without using FluxProcessors? #3940

Closed david-masters closed 1 day ago

david-masters commented 1 day ago

Documentation Issue

A use case for Processors is to create a Processor, use it as a publisher for a downstream subscriber, and then return the processor as a subscriber. Is there a way to do this using sinks?

Improvement Suggestion

Additional context

Part of my specific use case is that I am using RSocket to create a reactive stream over the network, and using that reactive stream to create a pekko Sink. Pekko sinks can be constructed with a reactive stream Subscriber, but AFAIK, you can't just create a pekko Source (analogous to but not the same as a reactive stream Publisher) and call Publisher.subscribe on it, and even if you could, a significant part of our project would need restructured because we have a lot of code that is expecting a pekko Sink rather than providing a Source to "subscribe" to.

This is the minimal (scala) code I wrote that works flawlessly for my use case, but I can't figure out a way to do this without processors.

def serverSubscriber : Subscriber[Payload] = {
  val processor : Processor[Payload, Payload] = EmitterProcessor.create(1)
  RSocketServer.create(
    SocketAcceptor.forRequestStream(payload =>
      Flux.from(processor)
  )).bindNow(TcpServerTransport.create("localhost", 3141))
  processor
}
chemicL commented 1 day ago

Hey @david-masters !

As far as I can tell there is no intention to expose Sinks as Subscriber at all. If you search through the issues here you'll find some interesting discussions. The one that feels most relevant to your question is #2667. It ends with a suggestion to avoid wrapping a sink with a Subscriber, but rather provides means to subscribe to an existing Publisher via e.g.

Sinks.ManyWithUpstream<Integer> adapter = Sinks.unsafe().many().multicast().onBackpressureBuffer();
Publisher<Integer> upstream = ...;
Disposable disposable = adapter.subscribeTo(upstream);

More interesting issues are linked in 3.4.0 release notes.

Perhaps there is a better way to combine these APIs without even resorting to use the Sinks API or Processors.

I'm not familiar with Pekko and the relevant APIs, so perhaps you'll find more help on Stackoverflow or Gitter?

david-masters commented 22 hours ago

That's really unfortunate because I am working with code that explicitly expects a reactive-streams subscriber. Is there anything at all in reactor that supports processors?

chemicL commented 20 hours ago

I'm not well versed in Scala nor Pekko, but perhaps a slight paradigm shift can help? As I understand you want to feed the SocketAcceptor with data coming from a Pekko Sink (obtained via Pekko Source?). What if you could do something (pseudocode) like this:

  // sink is given
  RSocketServer.create(
    SocketAcceptor.forRequestStream(payload =>
      Flux.from(sink.asPublisher(fanout = false))
  )).bindNow(TcpServerTransport.create("localhost", 3141))

Is that an option for your use case?

david-masters commented 19 hours ago

Thanks for getting back to me though. I posed my question to stack overflow. Hopefully there's a way to solve this from a different angle that I wasn't think of.

david-masters commented 18 hours ago

@chemicL I didn't see your last comment when I just replied. I think what you are suggesting won't work. I'll try to explain why without getting too in the weeds. Pekko and Akka sources/flows/sinks work by creating a "graph" data structure that is later used to actually create the code that runs.

When the "graph" actually runs, you can't change the graph, so you need to supply the subscribers/publishers upfront when you create the graph, so, as far as I know, there's no analogous code to publisher.subscribeTo in pekko because it tries to be as functional as possible, and a subscribeTo function that modifies its input is inherently imperative.

Edit: Actually I need to think about this a little more. I realized I misunderstood your comment and I have to test it out. I think I've already tried something like that and it didn't work, but I need to play around with it some more to double check.

david-masters commented 17 hours ago

Ok, I've adapted your example to something that I thought would work, but I'm struggling to figure out why it won't work and I'm not sure if it is a reactor issue, RSocket issue, or pekko issue or something I did wrong.

def serverSink : Sink[Payload, NotUsed] = {
    val sink : Sink[Payload, Publisher[Payload]] = Sink.asPublisher(false)
    sink.mapMaterializedValue { pub =>
      RSocketServer.create(
        SocketAcceptor.forRequestStream(payload =>
          Flux.from(pub)
      )).bindNow(TcpServerTransport.create("localhost", 3141))
      NotUsed.notUsed()
    }
  }

Appears fine, but for some reason when the downstream client tries to pull from the stream the upstream server doesn't seem to receive the request and push anything.

david-masters commented 16 hours ago

Ok, I have a solution, but it's a little hacky. For reasons I don't understand, the publisher from pekko needs a subscriber other than the ones over the ones from RSocket. This works, but with the caveat that the sink will consume everything it can if there are no connected clients (which is actually fine in my use case)

def serverSink : Sink[Payload, NotUsed] = {
    val sink : Sink[Payload, Publisher[Payload]] = Sink.asPublisher(true)
    sink.mapMaterializedValue { pub =>
      pub.subscribe(new Subscriber[Payload] {
        override def onComplete(): Unit = ()
        override def onError(t: Throwable): Unit = ()
        override def onNext(t: Payload): Unit = ()
        override def onSubscribe(s: Subscription): Unit = s.request(Long.MaxValue)
      })
      RSocketServer.create(
        SocketAcceptor.forRequestStream(payload =>
          Flux.from(pub)
      )).bindNow(TcpServerTransport.create("localhost", 3141))
      NotUsed.notUsed()
    }
  }

Thanks so much for your help!