twitter / scalding

A Scala API for Cascading
http://twitter.com/scalding
Apache License 2.0
3.5k stars 706 forks source link

Cancellable Executions #1920

Closed stephbian closed 5 years ago

stephbian commented 5 years ago

This change tries to enable cancellation of executions by stopping the underlying flow.

Prior to this change, parallel executions that have started running could not be cancelled, even if one execution failed. This is problematic because if one parallel execution fails, others will continue running and the overall job will not fail until all executions complete.

In this change, I define a CancellationHandler which is built up alongside its corresponding Future.

johnynek commented 5 years ago

I think travis is unhappy:

https://travis-ci.org/twitter/scalding/builds/589230369?utm_source=github_status&utm_medium=notification

maybe set oraclejdk8 to openjdk8 in the .travis.yml and I think CI will run?

johnynek commented 5 years ago

@ttim I assume you have no concerns since you haven't commented. I'll merge tomorrow morning unless you have some concerns.

The goal here is to abort large and expensive jobs when another part of the flow fails.

ttim commented 5 years ago

@johnynek I'll take a look until tomorrow's evening. Seems like a great change!

stephanie-stripe commented 5 years ago

i would hold off on merging this for now... i am doing some testing and noticing that jobs are killed before retries can happen, so trying to debug that now.

stephanie-stripe commented 5 years ago

i've tested this on the cluster, and it looks like things are being cancelled as expected (both in separate parallel executions and within the same one)! one strange thing though: since adding the FlowStepListener, i've been seeing a NPE from the main process where i kick off the job:

Exception in thread "main" java.lang.Throwable: If you know what exactly caused this error, please consider contributing to GitHub via following link.
https://github.com/twitter/scalding/wiki/Common-Exceptions-and-possible-reasons#javalangnullpointerexception
        at com.twitter.scalding.Tool$.main(Tool.scala:153)
        at com.twitter.scalding.Tool.main(Tool.scala)
Caused by: java.lang.NullPointerException
        at scala.util.Failure.get(Try.scala:207)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:190)
        at com.twitter.scalding.ExecutionJob$$anonfun$6.apply(Job.scala:526)
        at com.twitter.scalding.ExecutionJob$$anonfun$6.apply(Job.scala:525)
        at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
        at scala.util.Try$.apply(Try.scala:192)
        at scala.util.Success.map(Try.scala:237)
        at com.twitter.scalding.ExecutionJob.run(Job.scala:525)
        at com.twitter.scalding.Tool.start$1(Tool.scala:124)
        at com.twitter.scalding.Tool.run(Tool.scala:141)
        at com.twitter.scalding.Tool.run(Tool.scala:68)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
        at com.twitter.scalding.Tool$.main(Tool.scala:149)

i'm still not sure where this is coming from, but it doesn't seem to be affecting shutdown afaict.

stephanie-stripe commented 5 years ago

@johnynek and i paired to fix the npe with the latest commit, so this is good to go from my perspective!

johnynek commented 5 years ago

I'll merge when it's green!

Great work on this!