Akka Stream #1

Feb 10, 2020

Feb 10, 2020

Topic: Streaming all the things with Akka Streams by Johan Andrén Link: https://www.youtube.com/watch?v=jTUkmvPMlUE

Reactive Streams

Example Scenario:

Source -> 10 msg/s (fast producer) --async boundary-- Flow Sink <- 1 msg/s (slow consumer)

results in OutOfMemoryError, no back pressure

Q: How can we guarantee that in this scenario, source is notified that the data as been consumed? A: Async non blocking back pressure, downstream (Sink to flow) will always signal upstream Example: Sink's buffer can cater 2 elements it'll force the fast producer to slow down.

implicit val system = ActorSystem() implicit val mat = ActorMaterializer()

val source = Source(0 to 20000000) Int val flow = Flow[Int].map(.toString()) Int -> String val sink = Sink.foreach[String](println()) String val runnable = source.via(flow).to(sink) runnable.run()


Source(0 to 200000) .map(_.toString) .runForeach(println)

Pancake methaphor:

battersource --> pan 1 -> pan 2 -> hungrysink

-async has a cost

val fryingPan1: Flow[ScoopOfBatter, HalfCookedPancake, NotUsed] = Flow[HalfCookedPancake].map { halfCooked = > Pancake() } val fryingPan2: Flow[HalfCookedPancake, Pancake, NotUsed] = Flow[HalfCookedPancake].map { halfCooked = > Pancake() }

val pancakeChef: Flow[ScoopOfBatter, Pancake, NotUsed] = Flow[ScoopOfBatter, Pancake, NotUsed] = Flow[ScoopOfBatter].via(fryingPan1.async).via(fryingPan2.async)

FORK JOIN? (easy to scale, add more frying pans, increase the throughput, bad when requires order, complete task a first then task b )

val pancakeChef: FLow[ScoopOfBatter, Pancake, NotUsed] = Flow.fromGraph(GraphDSL.create()) { implicit builder => import GraphDSL.IMplicits._

val dispatchBatter = builder.add(Balance[ScoopofBatter](2))
val mergePancakes = builder.add(Merge[Pancake](2))

dispatchBatter.out(0) ~-> fryingPan1.async ~ fryingPan2.async ~> mergePancakes.in(0)
dispatchBatter.out(1) ~-> fryingPan1.async ~ fryingPan2.async ~> mergePancakes.in(1)

FLowSHape(dispatchBatter.in, mergePancakes.out)


pipeline vs parallel

Alpakka - modern camel???

GraphStage API - for unique snowflake use cases


default dispatcher - fork join pool


https://github.com/johanandren/akka-stream-samples/tree/jfokus-2017 pub sub reactive streams api java rx, reactor, vertix?

stream-graphs: https://doc.akka.io/docs/akka/2.5/stream/stream-graphs.html

Async boundary example:


Reactive Streams semantics

emits when the mapping function returns an element or there are still remaining elements from the previously calculated collection

backpressures when downstream backpressures or there are still available elements from the previously calculated collection

completes when upstream completes and all remaining elements has been emitted

operators are Source, Flow, Sink, Fan-In, Fan-Out, BidiFlow

mapConcat: https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/mapConcat.html transform each element into zero or more elements that are individually passed downstream,

Stream composition:


futures & promises: http://dist-prog-book.com/chapter/2/futures.html

Challenge: 1 input, N outputs, Solution: combining 1 to N (broadcast)


Fan-out approach: https://doc.akka.io/docs/akka/current/stream/stream-graphs.html

val sourcePath = Paths.get("src/main/resources/db/source.txt")
val sampleSource = FileIO.fromPath(sourcePath)

val sinkPath = Paths.get("src/main/resources/db/sink.txt")
val sampleSink = FileIO.toPath(sinkPath)

val sampleRunnableGraph: RunnableGraph[Future[IOResult]] = sampleSource.to(sampleSink)

sampleRunnableGraph.run().foreach( result =>
  println(s"result status: ${result.status}, ${result.count} bytes read")

Good websocket examples: https://github.com/akka/akka-http/blob/v10.1.11/docs/src/test/scala/docs/http/scaladsl/server/WebSocketExampleSpec.scala

WebSocket explanation: https://www.ably.io/concepts/websockets

WebSocket specs: https://tools.ietf.org/html/rfc6455

explantion of via/viaMat/to/toMat operators:


source.to(sink)    equivalent to   source.toMat(sink)(Keep.left)
flow1.via(flow2)   equivalent to   flow1.viaMat(flow2)(Keep.left)
somehow, combining 2 sinks:

  1. connect source to sink1
  2. materialize the value of sink1 using "alsoToMat" & "Keep.right"
  3. map the materialized value, in case of FileIO.toPath, a Future[IOResult]
Good documentation for Akka Streams: https://svezfaz.github.io/akka/streams/reactive/2017/03/27/on-akka-streams-stages-1.html

Good slide show for quick Akka Stream explanation:

HttpServer as a:
Flow[HttpRequest, HttpResponse]

HttpEntity as a:
Source[ByteString, _]

Websocket connection as a:
Flow[ws.Message, ws.Message]