type T = ...
type M = ...
val source: Source[T, M] = ...
val sink: Sink[T, M]
val runnableGraph = source.toMat(sink)(Keep.Left) //Keep.LeftはSource側のmat valueをとってSinkがわのmat valueを捨てる
val materalizedValue = runnableGraph.run()
ここまではよくある話ですが、Source.asSubscriberは
/**
* Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]
*/
def asSubscriber[T]: Source[T, Subscriber[T]] =
fromGraph(new SubscriberSource[T](DefaultAttributes.subscriberSource, shape("SubscriberSource")))
というシグネチャで(Subscriber[T]に注目)
val runnableGraph = Source.asSubscriber[T].toMat(sink, Keep.Left)
val subscriber = runnableGraph.run()
釈迦に説法ですが Source[T,M]は[T]型のelementをpushします。
で、もう一つSourceの型に[M]が入っているのでこれはMaterialized Valueの型[M]ですね。 Materialized Valueはrun()して初めて取り出せるわけです。
とりだすというのは以下のようにrunの返り値になることですね
ここまではよくある話ですが、Source.asSubscriberは
というシグネチャで(Subscriber[T]に注目)
という形でrunした結果取り出せるmaterialize valueがsubscriberです。 このsubscriberはReactive Streamの(Akka Streamより下位のAPIの)Subscriberです。
で、このSubscriberにElementを送ると
そのまま同じElementがSourceを通じてAkka Streamを流れます。
で、これをSink側とこのようなコードで組み合わせると
のような形になるわけです。このまま走らせると
上の
subscriber, publisher
のtupleがmaterialized valueになってしまいます。Akka Streamを走らせたのに、低レベルAPIであるReactive Streamの
subscriber, publisher
が返ってくるのがうれしくないですね。というわけでmaterialized valueをAkka StreamのSinkとSourceにアップグレードするのが以下です
そして最終的に出来上がるのが、下の図の3段目にあるやつです。さいしょのSink.asSubscriberとSource.asPublisherをrunした後に新たにSourceとSinkが作られるという不思議な仕組みになっています。
で、これどういうときに使うの???
さあwwww