zio / interop-reactive-streams

Interoperability Layer Between ZIO and Reactive Streams
https://zio.dev/zio-interop-reactivestreams
Apache License 2.0
47 stars 27 forks source link

Add ZPipeline to Processor conversion #364

Open pragmaxim opened 1 year ago

pragmaxim commented 1 year ago

org.reactivestreams.Processor is an equivalent of ZPipeline or akka.stream.scaladsl.Flow and when integrating between different reactive "implementation", it is possible to create it using org.reactivestreams.FlowAdapters.toFlowProcessor ... Akka-stream has flow.toProcessor method but I can't find a way how to turn ZPipeline into a Processor.

Any idea?

runtologist commented 11 months ago

Not tested, bu shouldn't this work?

  def processorToZPipeline[In, Out](processor: Processor[In, Out]): ZPipeline[Scope, Throwable, In, Out] =
    ZPipeline.unwrap(
      for {
        signalErrorSink    <- processor.toZIOSink
        (signalError, sink) = signalErrorSink
        stream              = processor.toZIOStream()
        sinkPipe            = ZPipeline.fromSink(sink)
        sourcePipe          = ZPipeline.fromChannel(stream.channel)
      } yield sinkPipe >>> sourcePipe
    )
runtologist commented 11 months ago

Well, it doesn't.

mschuwalow commented 11 months ago

@runtologist are you looking at this one? If not -- I just hit a usecase for this at work, so I can take care of it.