zio / zio-flow

Resilient, distributed applications powered by ZIO
https://zio.dev/zio-flow
Apache License 2.0
142 stars 39 forks source link

ZFlow.recurseSimple is very slow and takes up huge amount of space in key-value store #567

Open mikrasilnikov opened 1 year ago

mikrasilnikov commented 1 year ago

Hi.

I've tried to run a simple flow that recursively counts from 0 to 100. It ran for ~30 seconds and key-value store took up 150Mb of disk space. I guess this is not something expected to happen.

libraryDependencies ++= Seq(
  "dev.zio" %% "zio" % "2.0.15",
  "dev.zio" %% "zio-flow" % "1.0.0-RC4",
  "dev.zio" %% "zio-flow-runtime" % "1.0.0-RC4",
  "dev.zio" %% "zio-flow-rocksdb" % "1.0.0-RC4",
  "dev.zio" %% "zio-config" % "4.0.0-RC16",
)
import zio._
import zio.flow._
import zio.flow.rocksdb._
import zio.flow.runtime._
import zio.flow.runtime.internal.PersistentState
import zio.flow.runtime.serialization

object App extends ZIOAppDefault {

  val recursiveFlow = ZFlow.recurseSimple[Any, ZNothing, Int](0) { case (prev, rec) =>
    ZFlow.ifThenElse(prev > 100)(
      ifTrue = ZFlow.succeed(prev),
      ifFalse =
        ZFlow.log(rs"prev = ${prev.toString}") *>
        rec(prev + 1)
    )
  }

  override def run: ZIO[ZIOAppArgs, Any, Any] = {

    val flowId = "23217c97-a5b7-4770-a902-8d8fbb7df03c"

    ZFlowExecutor.run(FlowId.unsafeMake(flowId), recursiveFlow)
      .provide(
        Configuration.inMemory,
        ZFlowExecutor.defaultJson,
        RocksDbKeyValueStore.layer,
        RocksDbIndexedStore.layer,
        ZLayer.succeed(serialization.json),
        PersistentState.configured(PersisterConfig.SnapshotsOnly)
      )
      .timed
      .debug
  }
}
vigoo commented 1 year ago

Thanks for reporting, I will take a look!

vigoo commented 1 year ago

Sorry for taking so long, but I finally had the time to check this. Unfortunately zio-flow is not optimized for performance at all currently - most of the work was put into making a correct durable executor and trying to make the DSL as ergonomic as possible.

That said the above options, especially PersisterConfig.SnapshotsOnly can be tweaked to achieve much better results. Snapshots only was used during the development but never was intended to be the persister used in real world scenarios. You can switch it to PersisterConfig.PeriodicSnapshots(None, None) to have journal only, and configure it's two parameters to make a snapshot after every N operation or after some elapsed time. The snapshots are big (containing the full stack of the executor) and they should only be used to speed up recovery time from the journal in case of long running flows.

You can further reduce the storage size by using the built-in protobuf mode instead of json.

With journal-only and protobuf the above example runs in around 8 seconds and takes ~2.5M storage.

For better results we would need general optimisations in the executor, there is no specific bug to fix related to this issue.