apache / pekko

Build highly concurrent, distributed, and resilient message-driven applications using Java/Scala
https://pekko.apache.org/
Apache License 2.0
1.17k stars 139 forks source link

Add a `Sink.extract` function which pre-materializes data from a `Sink` and lets you replay that entire data stream into an existing Sink #867

Open mdedetrich opened 8 months ago

mdedetrich commented 8 months ago

This is another commonplace problem that happens when using pekko-streams which arises when you have a Sink that has some parameters (i.e. lets say an id) but the data to create that id is in the stream that is being sent to the Sink itself.

More concretely, lets assume we have a simple sink that stores some data in some object storage, i.e. def putData(id: String): Sink[ByteString, _]. In order to call putData we need an id however that is contained within the data being sent to the sink itself, i.e.

val source = Source.single(
   ByteString("id:1;moreData")
)

source.to(putData(???))

In this case we want the extract the id (i.e. ??? in above code snippet) from the actual data (i.e. "id:1"). Most importantly is that assuming we do figure out a way to extract the id, we still want all of the data (i.e. "id:1;moreData") to be sent to the Sink.

Now while there are tools to do this, i.e.Framing they aren't exactly idea. Framing usually only works when you have a very basic/primitive structure (i.e. lets say comma or new line delimited). If we have more complex data structures, lets say JSON we do have options available but solving the issue is still messy because honestly Framing is not really the right tool, i.e. its not that we want to frame the data coming in in a certain way but rather we wan't to consume + buffer the data until we manage to extract some data (i.e. the id) field and then once that id field is extract then send the buffered data along with the rest of the incoming data to a supplied Sink, i.e.

/**
 * Defers invoking the `create` function to create a sink until data from upstream is sent
 * to the `extractor` Sink and that `extractor` Sink completes. The materialized value
 * of completed `extractor` Sink is provided as a parameter to `create`.
 *
 * The primary use of this function is when you have a pre-existing `Sink` that requires
 * input which is derived from the data itself, i.e. the Sink requires an id as a parameter
 * however that id is contained within the upstream data that is being sent to the Sink
 *
 * @param extractor A Sink which upon completion sends the materialized value as
 *                  a parameter into the `create` function
 * @param extractorFinalizer A sink which tells how to complete the extractor Flow.
 * @param inclusive Whether the already sent data to the `extractor` Sink should be included
 *                  alongside with the rest of the upstream data that is yet to be processed
 *                  to the `create` Sink.
 */
def extract[T, E, M](extractor: Graph[FlowShape[T, E], _], extractorFinalizer: Graph[SinkShape[E], _], inclusive: Boolean, create: E => Sink[T, M]): Sink[T, M] =
  ???

def extract[T, E, M](extractor: Graph[FlowShape[T, E], _], extractorFinalizer: Graph[SinkShape[E], _], create: E => Sink[T, M]): Sink[T, M] =
  extract(extractor, extractorFinalizer, inclusive = true, create)

The design of this is still debatable, i.e. of particular note is how to model the extractor part. My initial thought was to have extractor as a Sink but I realized that practically creating a Sink from a Flow (which is how the extractor's are going to be typically designed) is quite cumbersome so I came up with this API instead, the usage would look like this

val json: String =
  s"""
     |{
     |  "id":"someId",
     |  "size": 3,
     |  "rows": [
     |    {"doc": "doc1",
     |    {"doc": "doc2",
     |    {"doc": "doc3"
     |  ]
     |}
  """.stripMargin
val source: Source[ByteString, NotUsed] = Source.single(ByteString(json))
val extractorFlow: Flow[ByteString, ByteString, NotUsed] = JsonReader.select("$.id")
source.via(Sink.extract(extractorFlow, Sink.head, id => putData(id.utf8String)))

With the above following example, we would extract the "someId" field from "id" using JsonReader.select("$.id") and then pass the someId into the putData sink argument and the entire contents of the Source will be fed into putData Sink since inclusive = true.

@He-Pin @pjfanning @jrudolph @raboof @jxnu-liguobin Thoughts?

He-Pin commented 8 months ago

And seems you can do this with lazyFutureSink and prefixAndTail or Sink.lazyInit too.

IIRC, in reactor-core, there is a SwitchOnFirst method, which the Flux including the first element, do you think we should add that method too, which I think is much easier to use.