itkpi / trembita

Model complex data transformation pipelines easily
Apache License 2.0
46 stars 3 forks source link
akka-streams cassandra cats collections data-pipeline dsl finite-state-machine functional infinispan lazy log4j2 parallel phantom slf4j spark typelevel-programming typesafe

Project: Trembita Current version: 0.8.5-SNAPSHOT Scala version: 2.11.12, 2.12.8

codecov Build Status Cats Friendly Badge

trembita

Description

Project Trembita - Functional Data Pipelining library. Lets you query and transform your data in a pure functional, typesafe & declarative way. Trembita allows you to make complicated transformation pipelines where some of them are executed locally sequentially, locally in parallel on in other environments (for instance on Spark cluster, see below)

resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"
libraryDependencies ++= {
  val trembitaV = "0.8.5-SNAPSHOT"
  Seq(
    "ua.pp.itkpi" %% "trembita-kernel" % trembitaV, // kernel,

    "ua.pp.itkpi" %% "trembita-cassandra" % trembitaV, // cassandra

    "ua.pp.itkpi" %% "trembita-phantom" % trembitaV, // phantom

    "ua.pp.itkpi" %% "trembita-slf4j" % trembitaV // slf4j, for logging    
  )
}

Core features

Available Integrations

Processing modules

Data sources

Miscelone

implicit val sc: SparkContext = ??? // requires implicit SparkContext in scope implicit val timeout: Timeout = Timeout(5.minutes) // requires implicit timeout for async operations implicit val ec: ExecutionContext = ???

val cachedThreadPool = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())

Input .sequentialF[SerializableFuture, Seq] .create(SerializableFuture.pure(Seq(1, 2, 3, 20, 40, 60))) .to[Spark] // will be executed on spark .map(_ + 1) .mapM { i: Int => val n = SerializableFuture.start { i + 1 }(cahedThreadPool) val b = SerializableFuture .start { val x = 1 + 2 x * 3 } .flatTap( xx => SerializableFuture.start { println(s"spark debug: $xx") // you won't see this in submit logs } )

val result: SerializableFuture[Int] =
  n.bind { nx =>
    b.where(nx > _).fmap(_ + nx)
  }

result.attempt

} .mapK(serializableFutureToIO) .map(.getOrElse(-100500)) .mapM { i => IO { scala.util.Random.nextInt(10) + i } } // will be executed locally in parallel .to[Parallel] .info(i => s"parallel debug: $i") // you will see it in console .map( + 1)

Trembita will do the best to transform async lambda into serializable format.
By default a special macro detects all references to `ExecutionContext` within lambda you pass into `mapM`.
All `ExecutionContext`'s should be globally accessible (e.g. need to be `def` or `val` in some object).
If not - your code won't compile with appropriate error.
If everything is ok - macro creates helper object with references to all found `ExecutionContext`s making them `@transient lazy val` (well known technique) and rewrites your lambda so that all async transformations references to fields in that object.
You can find full example [here](./examples/src/main/scala/com/examples/spark/Main.scala).

Happy to say that using `cats.effect.IO` on spark is also supported =)
### FSM on Spark Datasets
You can now define stateful transformations on Spark Dataset using Finite State Machines.
It's implemented using `Dataset.mapWithState`.
Defining FSM for Spark is as simple as defining FSM for regular pipeline except of state is preserved only at level for specific `key` (due to `mapWithState` limitation).
To do so, use `fsmByKey`:
```scala
val pipeline: DataPipelineT[F, A, Spark] = ???
pipeline.fsmByKey(getKey = ???)(... /* your FSM definition here */)

Full example can be found here.

Typesafe QL on RDD

See the full example here

Limitations

Examples

You can find a script to run the example on spark cluster within docker:

# in project root
sbt trembita-examples/assembly # prepare fat jar for spark-submit
sh examples/src/main/resources/spark/cluster/run.sh

To run Spark FSM example in docker use the following script:

# in project root
sbt trembita-examples/assembly # prepare fat jar for spark-submit
sh examples/src/main/resources/spark/cluster/run_fsm.sh

To run Spark QL example in docker use the following script:

# in project root
sbt trembita-examples/assembly # prepare fat jar for spark-submit
sh examples/src/main/resources/spark/cluster/run_ql.sh

Before running QL please remove spire jars from spark classpath to avoid dependency conflicts

Akka streams support

Trembita now supports running a part of your transformations on akka-streams. To use it, add the following dependency:

libraryDependencies += "ua.pp.itkpi" %% "trembita-akka-streams" % trembitaV

You can run existing pipeline through akka stream or create a pipeline from source directly:

import akka.stream.scaladsl._
import trembita.akka_streams._

val fileLines =
  Input.fromSourceF[IO, ByteString, Future[IOResult]](IO {
    FileIO
      .fromPath(Paths.get("examples/src/main/resources/words.txt"))
  })

Akka streaming pipelines also support FSM using custom graph state:

val pipeline: DataPipelineT[IO, Int, Akka] = ???
val stateful = pipeline.fsm(/* your FSM definition here */)

You can find full examples here

Seamless Akka to Spark integration

Add the following dependency if you wan't to run your pipeline through both akka streams and spark RDD:

libraryDependencies += "ua.pp.itkpi" %% "trembita-seamless-akka-spark" % trembitaV

It goal is to avoid additional overhead when switching between akka and spark. Akka -> Spark is implemented using custom Sink. Spark -> Akka is implemented using toLocalIterator

Spark streaming support

Trembita now allows to write QL and FSM upon spark DStreams.

libraryDependencies += "ua.pp.itkpi" %%  "trembita-spark-streaming" % trembitaV

For examples see here Run scripts:

java.util.stream integration

libraryDependencies += "ua.pp.itkpi" %%  "trembita-java-streams" % trembitaV

See sources and tests for examples

Seamless akka infinispan integration

libraryDependencies += "ua.pp.itkpi" %% "trembita-seamless-akka-infinispan" % trembitaV

Allows to cache akka stream. See example

To be done

Additional information

My speec about trembita at Scalaua conference: https://youtu.be/PDBVCVv4mVc

What means trembita?

trembita

Trembita is a alpine horn made of wood. It is common among Ukrainian highlanders Hutsuls who used to live in western Ukraine, eastern Poland, Slovakia and northern Romania. In southern Poland it's called trombita, bazuna in the North and ligawka in central Poland.

Contributors