zalky / cues

Queues on cue: low-latency persistent blocking queues, processors, and graphs via Chronicle Queue
Apache License 2.0
105 stars 2 forks source link

Processor Output: Dynamic Maps causing "Key must be an integer" #11

Closed jamesnyika closed 1 year ago

jamesnyika commented 1 year ago

Hi I have some processor code that dynamically builds the output queue maps but it seems to fail on an IllegalArgumentException.

Code

The processor simply does stuff to generate a map with two keys and each key's value is basically a vector of maps.


(defmethod q/processor ::data-splitter
  ;; this processor splits a message across multiple channels
  [ctx input]

  (println "===> Processor: data-spliter")
  (tap> "======input=========")
  (tap> ctx)
  (tap> "======input=========")
  (tap> input)
  ;; need to take the input and split it up
  (let [proc-ctx (:ctx (:config ctx))
        outkeys  (keys (:out (:config ctx)))
        ikey (:receivekey proc-ctx)
        ;; channel keys are the splitkeys
        fullinput (if (nil? ikey) {} (get-in input ikey))
        splitdata (select-keys fullinput outkeys)
        output  (into {} (map vector (keys splitdata) (vals splitdata)))
        ]
    (println "--data split complete--")
    (tap> "----------------->> Output class:")
    (tap> (class output))
    (tap> "----------------->> Output keys:")
    (tap> output)

    output

    ))

The resulting data just looks like so:

    {
       :scheduled_arrivals [ {:flt 1 , :gate "C2} {:flt 2 , :gate "C3} {:flt 3 , :gate "B2}]
      :scheduled_departuress [ {:flt 4 , :gate "D2} {:flt 5 , :gate "G3} {:flt 6 , :gate "T2}]
     }

Error Message

2023-10-27T01:26:31.829Z Aurelius ERROR [cues.queue:1013] - :util.processors/airports-schedule-splitter ({:input :util.cues/airports-schedule-raw} -> {:scheduled_arrivals :util.cues/arrivals, :scheduled_departures :util.cues/departures})
                              java.lang.Thread.run              Thread.java:  833
java.util.concurrent.ThreadPoolExecutor$Worker.run  ThreadPoolExecutor.java:  635
 java.util.concurrent.ThreadPoolExecutor.runWorker  ThreadPoolExecutor.java: 1136
               java.util.concurrent.FutureTask.run          FutureTask.java:  264
                                               ...                               
               clojure.core/binding-conveyor-fn/fn                 core.clj: 2047
                          cues.queue/start-join/fn                queue.clj: 1204
                         cues.queue/processor-loop                
queue.clj: 1160
                         cues.queue/processor-step                queue.clj: 1005
                          cues.queue/processor-run                queue.clj:  951
                         cues.queue/default-run-fn                queue.clj:  941
                             cues.queue/merge-meta                queue.clj:  921
                         cues.queue/assoc-meta-out                queue.clj:  909
                            clojure.core/reduce-kv                 core.clj: 6928
                       clojure.core.protocols/fn/G            protocols.clj:  175
                                   clojure.core/fn                 core.clj: 6917
                                               ...                               
                      cues.queue/assoc-meta-out/fn  
              queue.clj:  912
                               clojure.core/update                 core.clj: 6239
                                clojure.core/assoc                 core.clj:  193
                                               ...                               
java.lang.IllegalArgumentException: Key must be integer

What is interesting is that if I build the map myself .. ie. hard code in a map at the end of the processor, it seems to work but I cannot seem to do it dynamically. Any suggestions on how to fix this ?

Thank you for all help, .. in advance!

jamesnyika commented 1 year ago

Just to add context: the supplied input is

{:arrivals [{:baggage_claim nil
                  :filed_airspeed 360
                  :actual_on "2023-10-30T22:36:28Z"
                  :filed_ete 6300
                  :terminal_origin "2"
                  :estimated_on "2023-10-30T22:36:28Z"
                  :route_distance 724
                  :gate_destination nil
                  :scheduled_on "2023-10-30T22:10:00Z"
                  :gate_origin nil
                  :diverted false
                  :actual_off "2023-10-30T20:57:16Z"
                  :scheduled_out "2023-10-30T20:15:00Z"
                  :atc_ident nil
                  :fa_flight_id "ETH308-1698524165-schedule-1701p"
                  :type "Airline"
                  :progress_percent 100
                  :ident_iata "ET308"
                  :aircraft_type "B738 "
                  :route nil
                  :estimated_off "2023-10-30T20:57:16Z"
                  :actual_in nil
                  :scheduled_off "2023-10-30T20:25:00Z"
                  :cancelled false
                  :ident_icao "ETH308"
                  :actual_out nil
                  :position_only false
                  :estimated_out "2023-10-30T20:15:00Z"
                  :status "Landed / Taxiing"
                  :arrival_delay 1936
                  :seats_cabin_coach nil
                  :codeshares_iata ["KP1039" "XY6308" "UL3654" "AI7546"]
                  :terminal_destination "1A"
                  :scheduled_in "2023-10-30T22:20:00Z"
                  :ident "ETH308"
                  :operator "ETH"
                  :operator_icao "ETH"
                  :inbound_fa_flight_id "ETH830-1698502691-schedule-86p"
                  :operator_iata "ET"
                  :origin {:code "HAAB"
                           :code_icao "HAAB"
                           :code_iata "ADD"
                           :code_lid nil
                           :timezone "Africa/Addis_Ababa"
                           :name "Bole Int'l"
                           :city "Addis Ababa"
                           :airport_info_url "/airports/HAAB"}
                  :blocked false
                  :codeshares ["SKK1039" "KNE6308" "ALK3654" "AIC7546"]
                  :seats_cabin_business nil
                  :flight_number "308"
                  :seats_cabin_first nil
                  :registration "ET-ASJ"
                  :estimated_in "2023-10-30T22:52:16Z"
                  :filed_altitude nil
                  :departure_delay 0
                  :destination {:code "HKJK"
                                :code_icao "HKJK"
                                :code_iata "NBO"
                                :code_lid nil
                                :timezone "Africa/Nairobi"
                                :name "Jomo Kenyatta Int'l"
                                :city "Nairobi"
                                :airport_info_url "/airports/HKJK"}}]
      :departures []
      :scheduled_arrivals []
      :scheduled_departures []
      :links {:next "/airports/HKJK/flights?start=2023-10-30T17%3A46%3A41Z&end=2023-10-31T17%3A46%3A41Z&cursor=1f0de2e17fbf8e5"}
      :num_pages 1}

...and this it chokes on this method in cues.queue/assoc-meta-out

.... 

;; on line 912 - the assoc seems to get a key that is not an integer and it expects one

(defn- assoc-meta-out
  [out m]
  (reduce-kv
   (fn [out k v]
     (cond-> out
       v (assoc k (update v :q/meta merge-deep m))))  ;; <--- this assoc is the choke point
   {}
   out))
zalky commented 1 year ago

Hi @jamesnyika , thanks for posting the issue, I'll have a look at this over the next couple of days.

zalky commented 1 year ago

@jamesnyika , to clarify, when you say "The resulting data just looks like so":

   {
      :scheduled_arrivals [ {:flt 1 , :gate "C2} {:flt 2 , :gate "C3} {:flt 3 , :gate "B2}]
     :scheduled_departuress [ {:flt 4 , :gate "D2} {:flt 5 , :gate "G3} {:flt 6 , :gate "T2}]
    }

Do you mean that this is the value of the output binding that you return in your ::data-splitter processor?

If so, then a couple of observations:

  1. Double check that the keys in your output map correspond exactly to your output bindings: :scheduled_departuress looks like a typo.
  2. The output binding map that processors return are a one-to-one mapping between binding and message:

    {:binding-1 msg-1
    :binding-2 msg-2}

    Here msg-1 and msg-2 are each a single message, and each must be a map. But your values above appear to be vectors of maps, which I think is the cause the error you are seeing (I should probably have a more robust check on the processor output values to provide a more informative error message).

    If your intention was to pass each vector as a single message, then you have to wrap your vector in a map, maybe something like:

    {:scheduled_arrivals   {:flights [{:flt 1 :gate "C2"} ...]}
    :scheduled_departures {:flights [{:flt 4 :gate "D2"} ...]}}

    However, if your intention was actually to put multiple messages on the output queues, then your problem boils down to what you asked about in the other issue that you posted: #9 . In which case I would recommend the solutions we discussed in that thread.

  3. This is unrelated to your error, but I noticed that you're getting (:receivekey proc-ctx) from the processor :config property. Is this a user-specified processor configuration option? Then the recommended way to do that is via :opts:

    {:id   ::data-splitter
    ...
    :opts {:receivekey :my-key}}
    
    (defmethod q/processor ::data-splitter
     [{{:keys [receivekey]} :opts} msgs]
     ...)

    While specifying arbitrary user options directly in the top level processor config technically works:

    {:id   ::data-splitter
    ...
    :my-options {:receivekey :my-key}}

    This was not the intended API usage and may not be supported in the future.

jamesnyika commented 1 year ago

Hi ..and thank you for your answer

Your Qn: Do you mean that this is the value of the output binding that you return in your ::data-splitter processor?

Ans: Yes.. This is correct. Thank you for pointing out the typo but it was only in this message to you that I made it. It is correct in my code.

I think the breakthrough information here is ...

Here msg-1 and msg-2 are each a single message, and each must be a map. But your values above appear to be vectors of maps, which I think is the cause the error you are seeing (I should probably have a more robust check on the processor output values to provide a more informative error message).

the fact that the values to a binding MUST BE MAPS. It did not occur to me. Even if you do not change your error message, it might be good to highlight this in the documentation (or both depending on your bandwidth). I will attempt to wrap the data in a map to start with and see how that goes. It is the easiest solution for now. If that solves it I will close this out with a confirmation message.

On the last item you mentioned, I did not realize that this key could be used in that way. I pass in :receivekey as a way for the processor to know what key to use in the incoming data to pull out what it needs to process. I will check and see what you have written with respect to this keyword ... could be nice code clarification technique on my end and reduce the size of my context maps for each processor.

thanks and stay tuned!

zalky commented 1 year ago

Even if you do not change your error message, it might be good to highlight this in the documentation (or both depending on your bandwidth).

This is a good suggestion. The queue primitives section does mention this, but the graph processors section should definitely also explicitly do so. At minimum I will update the docs to reflect this.

On the last item you mentioned, I did not realize that this key could be used in that way. I pass in :receivekey as a way for the processor to know what key to use in the incoming data to pull out what it needs to process. I will check and see what you have written with respect to this keyword ... could be nice code clarification technique on my end and reduce the size of my context maps for each processor.

I don't know how helpful it will be, but at least it will bring your code in alignment with the intended API usage. The intention was for the top level processor attributes to be restricted to the API, and for :opts to be the way to pass arbitrary configuration options to your processor.

{; only API attributes allowed here
 :id   ::processor
 :in   {:input-binding ::input-queue}
 :out  {:output-binding ::output-queue}
 :opts {; here is where you put user config
        :receivekey :my-key}}
jamesnyika commented 1 year ago

Ok.. I can confirm that ensuring the value of the output map must ALSO be a map fixed the issue. I quite literally performed this

{:scheduled_arrivals   {:flights [{:flt 1 :gate "C2"} ...]}
 :scheduled_departures {:flights [{:flt 4 :gate "D2"} ...]}} 

Thank you .. this solves and closes the issue