onyx-platform / onyx-local-rt

A pure, deterministic Clojure(Script) runtime for Onyx
Eclipse Public License 1.0
32 stars 5 forks source link

Compiler NullPointerException thrown with core-async lifecycle entries #2

Open enragedginger opened 7 years ago

enragedginger commented 7 years ago

Summary: A generic compiler NullPointerException is thrown when lifecycle entries reference the core-async plugin with onyx-local-rt and (api/stop) is called.

Expected behavior: Generic NPEs are very difficult to debug. Is there a way that we can validate this ahead of time and throw an exception with a helpful message about such things not being supported?

Code:

(defn my-inc [segment]
  (update-in segment [:n] inc))

(def job
  {:workflow [[:in :inc] [:inc :out]]
   :catalog [{:onyx/name :in
              :onyx/plugin :onyx.plugin.core-async/input
              :onyx/medium :core.async
              :onyx/max-peers 1
              :onyx/type :input
              :onyx/batch-size 20}
             {:onyx/name :inc
              :onyx/type :function
              :onyx/fn ::my-inc
              :onyx/batch-size 20}
             {:onyx/name :out
              :onyx/plugin :onyx.plugin.core-async/output
              :onyx/medium :core.async
              :onyx/max-peers 1
              :onyx/type :output
              :onyx/batch-size 20}]
   :lifecycles []})

(def borken-job
  (merge job
         {:lifecycles
          [{:lifecycle/task :in,
            :core.async/id #uuid"26617cbb-c689-4e7e-8fa0-71c782677db3",
            :core.async/size 1000,
            :lifecycle/calls :onyx.plugin.core-async/in-calls}
           {:lifecycle/task :in, :lifecycle/calls :onyx.plugin.core-async/reader-calls}
           {:lifecycle/task :out,
            :core.async/id #uuid"5cdf4749-4587-43cd-bb9e-17d0686d469b",
            :core.async/size 1001,
            :lifecycle/calls :onyx.plugin.core-async/out-calls}
           {:lifecycle/task :out, :lifecycle/calls :onyx.plugin.core-async/writer-calls}]}))

(defn do-the-job [jorb]
  (-> (api/init jorb)
      (api/new-segment :in {:n 41})
      (api/new-segment :in {:n 84})
      (api/drain)
      (api/stop)))

;;works
(do-the-job job)

;;throws NPE
(do-the-job borken-job)

print-cause-trace output:

clojure.lang.Compiler$CompilerException: java.lang.NullPointerException, compiling:(/some/path/to/my/src/basic_test.clj:103:1)
 at clojure.lang.Compiler.load (Compiler.java:7391)
    user$eval27897.invokeStatic (form-init7241215670317235052.clj:1)
    user$eval27897.invoke (form-init7241215670317235052.clj:1)
    clojure.lang.Compiler.eval (Compiler.java:6927)
    clojure.lang.Compiler.eval (Compiler.java:6890)
    clojure.core$eval.invokeStatic (core.clj:3105)
    clojure.core$eval.invoke (core.clj:3101)
    clojure.main$repl$read_eval_print__7408$fn__7411.invoke (main.clj:240)
    clojure.main$repl$read_eval_print__7408.invoke (main.clj:240)
    clojure.main$repl$fn__7417.invoke (main.clj:258)
    clojure.main$repl.invokeStatic (main.clj:258)
    clojure.main$repl.doInvoke (main.clj:174)
    clojure.lang.RestFn.invoke (RestFn.java:1523)
    clojure.tools.nrepl.middleware.interruptible_eval$evaluate$fn__650.invoke (interruptible_eval.clj:87)
    clojure.lang.AFn.applyToHelper (AFn.java:152)
    clojure.lang.AFn.applyTo (AFn.java:144)
    clojure.core$apply.invokeStatic (core.clj:646)
    clojure.core$with_bindings_STAR_.invokeStatic (core.clj:1881)
    clojure.core$with_bindings_STAR_.doInvoke (core.clj:1881)
    clojure.lang.RestFn.invoke (RestFn.java:425)
    clojure.tools.nrepl.middleware.interruptible_eval$evaluate.invokeStatic (interruptible_eval.clj:85)
    clojure.tools.nrepl.middleware.interruptible_eval$evaluate.invoke (interruptible_eval.clj:55)
    clojure.tools.nrepl.middleware.interruptible_eval$interruptible_eval$fn__695$fn__698.invoke (interruptible_eval.clj:222)
    clojure.tools.nrepl.middleware.interruptible_eval$run_next$fn__690.invoke (interruptible_eval.clj:190)
    clojure.lang.AFn.run (AFn.java:22)
    java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142)
    java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617)
    java.lang.Thread.run (Thread.java:745)
Caused by: java.lang.NullPointerException: null
 at clojure.core$deref_future.invokeStatic (core.clj:2208)
    clojure.core$deref.invokeStatic (core.clj:2228)
    clojure.core$deref.invoke (core.clj:2214)
    onyx.plugin.core_async$log_retry_count$fn__25647.invoke (core_async.clj:31)
    clojure.lang.Delay.deref (Delay.java:37)
    clojure.core$deref.invokeStatic (core.clj:2228)
    clojure.core$deref.invoke (core.clj:2214)
    taoensso.timbre$_log_BANG_.invokeStatic (timbre.clj:351)
    taoensso.timbre$_log_BANG_.invoke (timbre.clj:338)
    onyx.plugin.core_async$log_retry_count.invokeStatic (core_async.clj:31)
    onyx.plugin.core_async$log_retry_count.invoke (core_async.clj:29)
    onyx.lifecycles.lifecycle_compile$compile_lifecycle_functions$fn__17780$fn__17781.invoke (lifecycle_compile.cljc:63)
    clojure.core$comp$fn__4727.invoke (core.clj:2460)
    onyx_local_rt.impl$eval26933$fn__26934.invoke (impl.cljc:499)
    clojure.lang.MultiFn.invoke (MultiFn.java:238)
    onyx_local_rt.impl$integrate_task_updates$fn__26952.invoke (impl.cljc:554)
    clojure.core.protocols$fn__6755.invokeStatic (protocols.clj:167)
    clojure.core.protocols/fn (protocols.clj:124)
    clojure.core.protocols$fn__6710$G__6705__6719.invoke (protocols.clj:19)
    clojure.core.protocols$seq_reduce.invokeStatic (protocols.clj:31)
    clojure.core.protocols$fn__6736.invokeStatic (protocols.clj:75)
    clojure.core.protocols/fn (protocols.clj:75)
    clojure.core.protocols$fn__6684$G__6679__6697.invoke (protocols.clj:13)
    clojure.core$reduce.invokeStatic (core.clj:6545)
    clojure.core$reduce.invoke (core.clj:6527)
    onyx_local_rt.impl$integrate_task_updates.invokeStatic (impl.cljc:551)
    onyx_local_rt.impl$integrate_task_updates.invoke (impl.cljc:550)
    onyx_local_rt.api$eval26986$fn__26987.invoke (api.cljc:82)
    clojure.lang.MultiFn.invoke (MultiFn.java:233)
    onyx_local_rt.api$stop.invokeStatic (api.cljc:99)
    onyx_local_rt.api$stop.invoke (api.cljc:94)
    twitter_much.jobs.basic_test$do_the_job.invokeStatic (basic_test.clj:99)
    twitter_much.jobs.basic_test$do_the_job.invoke (basic_test.clj:94)
    twitter_much.jobs.basic_test$eval27901.invokeStatic (basic_test.clj:103)
    twitter_much.jobs.basic_test$eval27901.invoke (basic_test.clj:103)
    clojure.lang.Compiler.eval (Compiler.java:6927)
    clojure.lang.Compiler.load (Compiler.java:7379)
    user$eval27897.invokeStatic (form-init7241215670317235052.clj:1)
    user$eval27897.invoke (form-init7241215670317235052.clj:1)
    clojure.lang.Compiler.eval (Compiler.java:6927)
    clojure.lang.Compiler.eval (Compiler.java:6890)
    clojure.core$eval.invokeStatic (core.clj:3105)
    clojure.core$eval.invoke (core.clj:3101)
    clojure.main$repl$read_eval_print__7408$fn__7411.invoke (main.clj:240)
    clojure.main$repl$read_eval_print__7408.invoke (main.clj:240)
    clojure.main$repl$fn__7417.invoke (main.clj:258)
    clojure.main$repl.invokeStatic (main.clj:258)
    clojure.main$repl.doInvoke (main.clj:174)
    clojure.lang.RestFn.invoke (RestFn.java:1523)
    clojure.tools.nrepl.middleware.interruptible_eval$evaluate$fn__650.invoke (interruptible_eval.clj:87)
    clojure.lang.AFn.applyToHelper (AFn.java:152)
    clojure.lang.AFn.applyTo (AFn.java:144)
    clojure.core$apply.invokeStatic (core.clj:646)
    clojure.core$with_bindings_STAR_.invokeStatic (core.clj:1881)
    clojure.core$with_bindings_STAR_.doInvoke (core.clj:1881)
    clojure.lang.RestFn.invoke (RestFn.java:425)
    clojure.tools.nrepl.middleware.interruptible_eval$evaluate.invokeStatic (interruptible_eval.clj:85)
    clojure.tools.nrepl.middleware.interruptible_eval$evaluate.invoke (interruptible_eval.clj:55)
    clojure.tools.nrepl.middleware.interruptible_eval$interruptible_eval$fn__695$fn__698.invoke (interruptible_eval.clj:222)
    clojure.tools.nrepl.middleware.interruptible_eval$run_next$fn__690.invoke (interruptible_eval.clj:190)
    clojure.lang.AFn.run (AFn.java:22)
    java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142)
    java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617)
    java.lang.Thread.run (Thread.java:745)
MichaelDrogalis commented 7 years ago

Plugin resolution intentionally never takes place. Since the local-rt doesn’t support I/O plugins, we don’t go in and look at the plugin to set up connections/state/sockets or whatever. Since you used the core.async plugin, Onyx tried to use it anyway without plugin resolution having happened, and it’s trying to rederef and atom that’s normally there.

I need to think on what the best place to would be to capture and stop plugins from being executed. We don’t want to make it a rule that you can’t submit them because the spirit is that you can take your same job and run it on both runtimes.