twitter / scalding

A Scala API for Cascading
http://twitter.com/scalding
Apache License 2.0
3.48k stars 703 forks source link

Support CounterPipe in spark-backend #1989

Closed daniel-sudz closed 2 years ago

daniel-sudz commented 2 years ago

Is your feature request related to a problem? Please describe. Currently marked as todo. I am interested in implementing this.

johnynek commented 2 years ago

note: many old school jobs would directly get a handle on a cascading item and increment it to make a side-effect of incrementing counters.

There is a pure API for incrementing counters in TypedPipe e.g. https://github.com/twitter/scalding/blob/1813d8f32904598d34b92ffd71010f85c18d58db/scalding-base/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala#L545

The pure one is relatively easy to support I think. The old fashioned one is a harder to support and may involve getting cascading on the classpath:

https://github.com/twitter/scalding/blob/1813d8f32904598d34b92ffd71010f85c18d58db/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala#L84

Although, I guess when you are allowing global mutable state you could reach in and create a pointer from the UniqueID to point to the platform counter system... so it is probably doable to support that as well, you just need some mechanism to run some initialization code before the Execution starts to dispatch from the UniqueID onto the right incrementer.

daniel-sudz commented 2 years ago

I would start with the pure API for now. I do have a general question: it seems like a lot of code for the spark-backend is being pushed into just a handful of files. Does it sense to split it up more for readability?

johnynek commented 2 years ago

Yes. In general one file per type is a good guideline but sometimes that is violated.

On Mon, Apr 11, 2022 at 12:05 Daniel Sudzilouski @.***> wrote:

I would start with the pure API for now. I do have a general question: it seems like a lot of code for the spark-backend is being pushed into just a handful of files. Does it sense to split it up more for readability?

— Reply to this email directly, view it on GitHub https://github.com/twitter/scalding/issues/1989#issuecomment-1095617444, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAAQS5QFGEUVGMK6AUXDSLLVESO3DANCNFSM5TEL5MPQ . You are receiving this because you commented.Message ID: @.***>

-- P. Oscar Boykin, Ph.D. | http://twitter.com/posco | http://pobox.com/~boykin

daniel-sudz commented 2 years ago

one more general question if you don't mind:

on the spark backend a lot of operations wrap RDDs with Future. It is my understanding that spark follows a lazy evaluation model for all operations. What is the purpose of having everything go through this pattern:

  final case class Transformed[Z, A](input: Op[Z], fn: RDD[Z] => RDD[A]) extends Op[A] {
    @transient private val cache = new FutureCache[SparkSession, RDD[_ <: A]]

    def run(session: SparkSession)(implicit ec: ExecutionContext): Future[RDD[_ <: A]] =
      cache.getOrElseUpdate(session, input.run(session).map(rdd => fn(widen(rdd))))
  }

it seems to me that the very first implementation that you wrote just had operations directly done on the RDDs: https://github.com/twitter/scalding/commit/db64ad36522d9ac7593ab2df6be4dee905790484

Is there some context that I am missing? Thanks!

daniel-sudz commented 2 years ago

speaking of which changing the above block to this passes all test:

~~final case class Transformed[Z, A](input: Op[Z], fn: RDD[Z] => RDD[A]) extends Op[A] { def run(session: SparkSession)(implicit ec: ExecutionContext): Future[RDD[_ <: A]] = input.run(session).map(rdd => fn(widen(rdd))) }~~

maybe I'm not understanding something but isn't cache not doing anything here?

nevermind that helps avoid recomputation

johnynek commented 2 years ago

Why do we basically have a function to future? Some spark operations are not lazy. E.g. collect, which blocks.

Since you can always put a lazy thing in a future as well, there is no problem with lifting to Future. But if you don't, you get blocking if you want to do some blocking action (which I think we do: toIterableExecution basically is a wrapper of collect, or could be).

daniel-sudz commented 2 years ago

I looked into accumulators in spark at this idea seems promising to me: https://stackoverflow.com/questions/66438570/create-accumulator-on-executor-dynamically

because spark expects executors to be created "statically" on the driver prior to execution, we probably would find it difficult to support managing the creation of accumulator by name at planning phase.

however, we can emulate having multiple accumulators by initializing a single accumulator using the V2 API and managing counter groups within the single accumulator on the executors themselves.

daniel-sudz commented 2 years ago

here is the sketch: https://github.com/twitter/scalding/pull/1992