twitter / scalding

A Scala API for Cascading
http://twitter.com/scalding
Apache License 2.0
3.5k stars 706 forks source link

TypedPipe.toClosableIteratorExecution #1874

Closed dieu closed 5 years ago

dieu commented 5 years ago

Hey,

At Twitter, some customers want to use scalding in long-running services to run scalding workflows by demand and then read data back via toIterableExecution.

toIterableExecution has an assumption that JVM will eventually shut down and we will perform clean up of temporary files, what is not gonna work for long-running services, because we capture a lot of data (hadoop configuration) in the shutdown hook and that leads us to OOM eventually.

So, by introducing CloseableIterator, we give to this customers mechanism where they can control when resources should be deleted.

johnynek commented 5 years ago

This looks like it is against a pretty old version.

I did a lot of work to factor out how these executions are created. This jumps in before all that work.

johnynek commented 5 years ago

it would be nice to think about how we can make this as safe as possible. Execution is an effect so the Close operation. Could be an Execution[Unit] that users combine in when it is safe to remove.

dieu commented 5 years ago

This looks like it is against a pretty old version.

I did a lot of work to factor out how these executions are created. This jumps in before all that work.

Since we still using 0.17.x I made changes to be able to make safe release internally, but definitely will port this to the new version.

dieu commented 5 years ago

it would be nice to think about how we can make this as safe as possible. Execution is an effect so the Close operation. Could be an Execution[Unit] that users combine in when it is safe to remove.

I'm not sure how to make it useful, we are allocating some resources and I attach them to iterator who is responsible for this resources.

Also, our customers convert Execution into Feature and supply iterator to Finagle stack to server data back via API.

What's do you think?

johnynek commented 5 years ago

so, if we had an API like:

def toIterableWithClose: Execution[(Iterable[A], Execution[Unit])]

then you would have some function like

def use[A](res: (Iterable[A], Execution[Unit])): Execution[B] = {
  val (itA, close) = res
  for {
    b <- myFunction(itA)
    _ <- close
  } yield b
}

In fact, it suggests a better API:

// on TypedPipe
def useAsIterable[B](fn: Iterable[A] => Execution[B]): Execution[B] =
  toIterableWithClose.flatMap { case (iterable, close) =>
    val exb = fn(iterable)
    exb.flatMap { b => close.map(_ => b) }
}

This API makes it impossible to use the Iterable after the function.

I prefer this API since we don't rely on the user to close.

Scalding has been becoming more functional, and more composable as a result. Passing a () => Unit function reverses this, so I don't want to use that API and would prefer using an Execution[Unit] based API, which composes well. A type like () => Unit is really dangerous because it is not clear when it is safe to call it when combined with a pure API like Execution.

dieu commented 5 years ago

@johnynek that's looks good

Can you clarify for me two aspects:

  1. at which point close: Execution will be executed? if we combine everything together, then if I right understand close will be executed in the same context right after the previous one. That forces the user to materialize Iterator before we finishing all executions.
  2. with this API user cannot keep iterator lazy outside Execution?
johnynek commented 5 years ago

The intent of the Execution API is that it is run at the "end of the world". This can be a main function in scala or it can be a request that terminates state when it is closed.

For instance, you could have a method like:

def runToFuture[A](ex: Execution[A]): Future[A] = ...

which can return a twitter future. That should be the final return value to the finagle service. if you want to do other future things, you should lift it into Execution not the other way around (because the Execution does state and cache management, we don't want to run many of them, but ideally only one context).

So, yes, what you are saying is true, that you need to be inside an outer Execution to run close in this picture, but that is the way the API is designed.

I don't want to bolt on some unsafe thing because people don't want to use the API that way. I would rather tell them to manage the state themselves and not use toIterableExecution:

They can make their own sink to write data into and read it outside of Execution and remove the data themselves.

johnynek commented 5 years ago

Put another way, what I would suggest to your team is to make an ExecutionService API for your users that has the user returning an Execution rather than a Future.

dieu commented 5 years ago

@johnynek I see your point, I'm only concerning about early materializing of Iterator which make impossible to do streaming API since we need to load data before we escaping Execution.

dieu commented 5 years ago

@johnynek can you take a look at the new changes?

dieu commented 5 years ago

I wonder if we should pass the Iterable here. We have it.

What do you think?

I thinking since data only available here and not available after, by Iterator we force the user to use it once and right away.

But I'm okay to change to Iterable.

dieu commented 5 years ago

Lets add to the comment: you must fully use the Iterator BEFORE you return Execution[B]. It will be an error, for instance, to put the Iterator inside an Execution and return it.

Maybe we should give an example of good usage, e.g. something that goes through the Iterator and keeps the maximum and puts it in Execution.from.

yep, sounds good, will take care of it.

dieu commented 5 years ago

This refactoring is nice but won’t merge with 0.18.

I didn't look close to 0.18, what's do you think this new API portable to 0.18?

johnynek commented 5 years ago

Well in 0.18 typedpipe and execution don’t depend on cascading. There is a cascading backend so the API can be ported but the implementation will have to be plumbed through

dieu commented 5 years ago

Well in 0.18 typedpipe and execution don’t depend on cascading. There is a cascading backend so the API can be ported but the implementation will have to be plumbed through

sounds reasonable.

dieu commented 5 years ago

build is falling because:

[info] scalding-core: found 2 potential binary incompatibilities while checking against com.twitter:scalding-core_2.11:0.17.3 
[error]  * method useAsIterator(scala.Function1)com.twitter.scalding.Execution in trait com.twitter.scalding.typed.TypedPipe is present only in current version
[error]    filter with: ProblemFilters.exclude[ReversedMissingMethodProblem]("com.twitter.scalding.typed.TypedPipe.useAsIterator")
[error]  * method toIterableExecutionWithClose()com.twitter.scalding.Execution in trait com.twitter.scalding.typed.TypedPipe is present only in current version
[error]    filter with: ProblemFilters.exclude[ReversedMissingMethodProblem]("com.twitter.scalding.typed.TypedPipe.toIterableExecutionWithClose")
johnynek commented 5 years ago

yeah, you can't add this and not break binary compatibility.

You can do it with an enrichment:

implicit class IteratorCloseMethods[A](val pipe: TypedPipe[A]) extends AnyVal {
...
}

I am a bit worried we have already forked scalding since we have been on 0.18 branch for more than 6 months and twitter still can't seem to get on it.

dieu commented 5 years ago

yeah, you can't add this and not break binary compatibility.

You can do it with an enrichment:

implicit class IteratorCloseMethods[A](val pipe: TypedPipe[A]) extends AnyVal {
...
}

I am a bit worried we have already forked scalding since we have been on 0.18 branch for more than 6 months and twitter still can't seem to get on it.

  1. can we simply ignore since is not really backward compatibility issue?
  2. Yep, I'm with you, but we are stuck with resources, I hope it will move soon.
johnynek commented 5 years ago

I am not really crazy about ignoring the error since in the past we have ignored things we thought were safe, but were wrong.

johnynek commented 5 years ago

if you put the implicit class in the TypedPipe companion, it should work just as well.

dieu commented 5 years ago

I am not really crazy about ignoring the error since in the past we have ignored things we thought were safe, but were wrong.

can you share your experience why it's wrong?

if you put the implicit class in the TypedPipe companion, it should work just as well.

will do.

johnynek commented 5 years ago

as to how we can get it wrong, I'm just saying that ignoring tooling warnings is inherently dangerous, and if we can side step any risk by doing the implicit class approach, it is safer.

At stripe, we did have a few subclasses of TypedPipe which would have been hit by this (which was not a safe thing to do previously, but it was done).

dieu commented 5 years ago

as to how we can get it wrong, I'm just saying that ignoring tooling warnings is inherently dangerous, and if we can side step any risk by doing the implicit class approach, it is safer.

At stripe, we did have a few subclasses of TypedPipe which would have been hit by this (which was not a safe thing to do previously, but it was done).

Yep, forgot about a case of subclasses TypedPipe.