onyx-platform / onyx

Distributed, masterless, high performance, fault tolerant data processing
http://www.onyxplatform.org
Eclipse Public License 1.0
2.05k stars 204 forks source link

Catch unserializable exception errors and re-wrap them #893

Closed sundbry closed 5 years ago

sundbry commented 5 years ago

Problem: unserializable exceptions throw more exceptions when we try to write them to the zookeeper log, and consequently don't kill the job that caused them, and get us stuck in a retry loop

Solution: The clojure.lang.ExceptionInfo: Unfreezable type exception thrown by nippy when this happens is itself serialzable, so we just catch that and serialize it instead of the original exception.

Here's an example stack trace where this could happen, in this case a schema error in one of my lifecycle setup functions

Exception in thread "async-thread-macro-45" clojure.lang.ExceptionInfo: Unfreezable type: class clojure.lang.ExceptionInfo {:type clojure.lang.ExceptionInfo, :as-str "#error {\n :cause \"Input to my-function does not match schema: ...", :prod$run_task_lifecycle_BANG_.invokeStatic(task_lifecycle.clj:550)
        at onyx.peer.task_lifecycle$run_task_lifecycle_BANG_.invoke(task_lifecycle.clj:540)
        at onyx.peer.task_lifecycle$start_task_lifecycle_BANG_$fn__43878.invoke(task_lifecycle.clj:1155)
        at clojure.core.async$thread_call$fn__11217.invoke(async.clj:442)
        at clojure.lang.AFn.run(AFn.java:22)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
:original-exception :clojure.lang.ExceptionInfo}\n   :at [onyx.plugin.mongo$eval62494$write_op__62499 invoke \"mongo.clj\" 39]}]\n :trace\n [[onyx.plugin.mongo$eval62494$write_op__62499 invoke \"mongo.clj\" 39]\n  [onyx.plugin.mongo.MongoOutput$fn__62531 invoke \"mongo.clj\" 84]\n  [onyx.plugin.mongo.MongoOutput write_batch \"mongo.clj\" 81]\n  [onyx.peer.task_lifecycle$write_batch invokeStatic \"task_lifecycle.clj\" 164]\n  [onyx.peer.task_lifecycle$write_batch invoke \"task_lifecycle.clj\" 160]\n  [onyx.peer.task_lifecycle$wrap_lifecycle_metrics$fn__43857 invoke \"task_lifecycle.clj\" 1097]\n  [onyx.peer.task_lifecycle.TaskStateMachine exec \"task_lifecycle.clj\" 1070]\n  [onyx.peer.task_lifecycle$run_task_lifecycle_BANG_ invokeStatic \"task_lifecycle.clj\" 550]\n  [onyx.peer.task_lifecycle$run_task_lifecycle_BANG_ invoke \"task_lifecycle.clj\" 540]\n  [onyx.peer.task_lifecycle$start_task_lifecycle_BANG_$fn__43878 invoke \"task_lifecycle.clj\" 1155]\n  [clojure.core.async$thread_call$fn__11217 invoke \"async.clj\" 442]\n  [clojure.lang.AFn run \"AFn.java\" 22]\n  [java.util.concurrent.ThreadPoolExecutor runWorker \"ThreadPoolExecutor.java\" 1149]\n  [java.util.concurrent.ThreadPoolExecutor$Worker run \"ThreadPoolExecutor.java\" 624]\n  [java.lang.Thread run \"Thread.java\" 748]]}"}
        at taoensso.nippy$throw_unfreezable.invokeStatic(nippy.clj:720)
        at taoensso.nippy$throw_unfreezable.invoke(nippy.clj:718)
        at taoensso.nippy$eval33876$fn__33877.invoke(nippy.clj:924)
        at taoensso.nippy$eval33573$fn__33574$G__33564__33581.invoke(nippy.clj:314)
        at taoensso.nippy$eval33628$fn__33629.invoke(nippy.clj:331)
        at taoensso.nippy$eval33601$fn__33602$G__33592__33609.invoke(nippy.clj:315)
        at taoensso.nippy$freeze$fn__33904.invoke(nippy.clj:982)
        at taoensso.nippy$freeze.invokeStatic(nippy.clj:982)
        at taoensso.nippy$freeze.invoke(nippy.clj:958)
        at onyx.compression.nippy$zookeeper_compress.invokeStatic(nippy.clj:22)
        at onyx.compression.nippy$zookeeper_compress.invoke(nippy.clj:21)
        at onyx.log.zookeeper$eval44984$fn__44986.invoke(zookeeper.clj:585)
        at clojure.lang.MultiFn.invoke(MultiFn.java:244)
        at onyx.peer.task_lifecycle$handle_exception.invokeStatic(task_lifecycle.clj:178)
        at onyx.peer.task_lifecycle$handle_exception.invoke(task_lifecycle.clj:168)
        at onyx.peer.task_lifecycle.TaskLifeCycle$handle_exception_fn__43936.invoke(task_lifecycle.clj:1229)
        at onyx.peer.task_lifecycle$run_task_lifecycle_BANG_.invokeStatic(task_lifecycle.clj:562)
        at onyx.peer.task_lifecycle$run_task_lifecycle_BANG_.invoke(task_lifecycle.clj:540)
        at onyx.peer.task_lifecycle$start_task_lifecycle_BANG_$fn__43878.invoke(task_lifecycle.clj:1155)
        at clojure.core.async$thread_call$fn__11217.invoke(async.clj:442)
        at clojure.lang.AFn.run(AFn.java:22)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
solatis commented 5 years ago

Would probably be a good idea to still log a warning when this happens; other than that, looks good to go, been running into this as well.

sundbry commented 5 years ago

Agreed @solatis

solatis commented 5 years ago

Thanks!