onyx-platform / onyx

Distributed, masterless, high performance, fault tolerant data processing
http://www.onyxplatform.org
Eclipse Public License 1.0
2.05k stars 204 forks source link

Fix running an output plugin with :onyx.core/params longer than 0 #894

Open sundbry opened 5 years ago

sundbry commented 5 years ago

When :onyx.core/params is set, the (identity) function in operations.cljc throws because it can't handle more than one argument. Instead of usinging identity, we return the last argument (the segment).

This is sort of an unusual thing to do, to attach params to a lifecycle affecting an output task, but it is for a 'generic' set of lifecycle calls in my use case in the mongodb plugin I am developing. We provide the :mongo connection as a parameter so you can do queries in normal :function tasks, and use the same lifecycles for the :output tasks to open/close the connection.

#error {
 :cause "Segment threw exception"
 :data {:exception #error {
 :cause "Wrong number of args (2) passed to: clojure.core/identity"
 :via
 [{:type clojure.lang.ArityException
   :message "Wrong number of args (2) passed to: clojure.core/identity"
   :at [clojure.lang.AFn throwArity "AFn.java" 429]}]
 :trace
 [[clojure.lang.AFn throwArity "AFn.java" 429]
  [clojure.lang.AFn invoke "AFn.java" 36]
  [clojure.core$partial$fn__5824 invoke "core.clj" 2624]
  [onyx.peer.transform$collect_next_segments$fn__38085 invoke "transform.clj" 8]
  [onyx.peer.transform$collect_next_segments invokeStatic "transform.clj" 8]
  [onyx.peer.transform$collect_next_segments invoke "transform.clj" 7]
  [onyx.peer.transform$apply_fn_single$fn__38090 invoke "transform.clj" 17]
  [clojure.core$map$fn__5851 invoke "core.clj" 2753]
  [clojure.lang.LazySeq sval "LazySeq.java" 42]
  [clojure.lang.LazySeq seq "LazySeq.java" 51]
  [clojure.lang.RT seq "RT.java" 531]
  [clojure.core$seq__5387 invokeStatic "core.clj" 137]
  [clojure.core$dorun invokeStatic "core.clj" 3133]
  [clojure.core$doall invokeStatic "core.clj" 3148]
  [clojure.core$doall invoke "core.clj" 3148]
  [onyx.peer.transform$apply_fn_single invokeStatic "transform.clj" 17]
  [onyx.peer.transform$apply_fn_single invoke "transform.clj" 14]
  [onyx.peer.transform$apply_fn invokeStatic "transform.clj" 54]
  [onyx.peer.transform$apply_fn invoke "transform.clj" 48]
  [onyx.peer.task_lifecycle$build_apply_fn$fn__64006 invoke "task_lifecycle.clj" 620]
  [onyx.peer.task_lifecycle$wrap_lifecycle_metrics$fn__64159 invoke "task_lifecycle.clj" 1098]
  [onyx.peer.task_lifecycle.TaskStateMachine exec "task_lifecycle.clj" 1071]
  [onyx.peer.task_lifecycle$run_task_lifecycle_BANG_ invokeStatic "task_lifecycle.clj" 550]
  [onyx.peer.task_lifecycle$run_task_lifecycle_BANG_ invoke "task_lifecycle.clj" 540]
  [onyx.peer.task_lifecycle$start_task_lifecycle_BANG_$fn__64180 invoke "task_lifecycle.clj" 1156]
  [clojure.core.async$thread_call$fn__11217 invoke "async.clj" 442]
  [clojure.lang.AFn run "AFn.java" 22]
  [java.util.concurrent.ThreadPoolExecutor runWorker "ThreadPoolExecutor.java" 1149]
  [java.util.concurrent.ThreadPoolExecutor$Worker run "ThreadPoolExecutor.java" 624]
  [java.lang.Thread run "Thread.java" 748]]
solatis commented 5 years ago

So the use case you're trying to accomodate is that you can pass the actual connection as an argument to the output task, rather than the details how it can establish the connection?

Is there any reason why you're not using the "regular" pattern of opening a connection upon construction of the output plugin, e.g. in your situation here https://github.com/arctype-co/onyx-mongo/blob/master/src/onyx/plugin/mongo.clj#L129 ?

sundbry commented 5 years ago

Hello Leon @solatis,

I first developed the plugin using that pattern, which I will probably go back to for the output instead of requiring the lifecycle hooks because it is simpler to program for the end user that way. The lifecycle hook adding the function params is for supporting reads/queries in :onyx/type :function tasks, so the same lifecycle hooks can be used in either context.

Regardless however of my design choices there, I still think this is a bug, because if I for example added a lifecycle hook with some :onyx.core/params targeting :all tasks, which would be a more valid use case, it would also raise this exception.

solatis commented 5 years ago

Ok, I think accepting this patch would be fine, but I wouldn't mind the input of @lbradstreet on this -- perhaps there was a better reason why these params only accept a single arg function.

lbradstreet commented 5 years ago

I'm not sure I completely understand the use case, but the way I would think you would make this work is you would check whether it's an output task in https://github.com/arctype-co/onyx-mongo/blob/master/src/onyx/plugin/mongo.clj#L90 (via inspecting the task map), and you would just not inject into :onyx.core/params if an :onyx/fn is not supplied via the task-map.

Ignoring the params as you've done also works, but I think you're more likely to code something which drops params silently. It's probably fine though.