zalky / cues

Queues on cue: low-latency persistent blocking queues, processors, and graphs via Chronicle Queue
Apache License 2.0
105 stars 2 forks source link

Resultsets from jdbc.next not serializable with cues #8

Closed jamesnyika closed 1 year ago

jamesnyika commented 1 year ago

Hi Thanks for this amazing library wrapper. I am trying to create a processor graph that uses jdbc.next from Sean Corfield (https://cljdoc.org/d/com.github.seancorfield/next.jdbc/1.3.874/doc/datafy-nav-and-schema) and it returns functions for datafy and nav in its metadata. So trying to put a result set (even a plain map but with functions in the metadata) is causing the processors to fail since there are functions in the metadata automatically. I tried to set my graphs to ignore metadata but it does not seem to have an effect :

Here is the graph

(defn pgraph []
    {:id         ::fuel-graph
     :errors     ::graph-errors
     :source     ::source
     :queue-opts {::q/default {:queue-meta false}
                  :util.cues/notification_sources {:queue-meta false}
                  ::source {:queue-meta false}}
     :processors [{:id ::source}
                  {:id :util.processors/jdbc-reader
                   :queue-opts {::q/default {:queue-meta false}}
                   :in {:input ::source}
                   :out {:output :util.cues/notification_sources}}
                  ;; first logger
                  {:id  ::logger1
                   :fn  :util.processors/tap-processor
                   :in  {:input :util.cues/notification_sources}}]})

here is the processor list


(defmethod q/processor ::tap-processor
  ;; this processor taps out a message without change
  [ctx msg]
  (println "===> Processor: tap-processor")

  ;; simply taps out a message
  (tap> ctx)
  (tap> msg)
  nil)

(defmethod q/processor ::jdbc-reader
  [stuff {:keys [input]}]
  ;; uses a connection to read a message and return it
  (println "===> Processor: jdbc-reader")
  (let [c (:conspec input)
        cat (:cat input)
        q (:query input)
        ;; This is a core/async call below that will pull the data. works nicely.
        r  (async/<!! (wg/go-runner (eval (symbol q)) c cat) )
        skeys (map #(keyword (str "r" %)) (range 0 (count r)))
        k (map #(select-keys % [:source_uri :source_type]) r)
        z (zipmap skeys k)
        ]

     ;; execute query and return the results as output
    ;; putting z here fails because it somehow still sees the navify functions
    {:output {:rs z}}))

Here is the failure exception


===> Processor: jdbc-reader - skeys:  (:r0 :r1)
===> Processor: jdbc-reader - k:  ({:source_uri https://www.epra.go.ke/wp-content/uploads/2021/11/15th-November-14th-December-2021-1.csv, :source_type excel_file} {:source_uri https://www.epra.go.ke/wp-content/uploads/2021/11/15th-November-14th-December-2021-1.csv, :source_type excel_file})
===> Processor: jdbc-reader - z:  {:r0 {:source_uri https://www.epra.go.ke/wp-content/uploads/2021/11/15th-November-14th-December-2021-1.csv, :source_type excel_file}, :r1 {:source_uri https://www.epra.go.ke/wp-content/uploads/2021/11/15th-November-14th-December-2021-1.csv, :source_type excel_file}}
ERROR [cues.queue:1013] - :util.processors/jdbc-reader ({:input :util.cues/source} -> {:output :util.cues/notification_sources})
                              java.lang.Thread.run              Thread.java:  833
java.util.concurrent.ThreadPoolExecutor$Worker.run  ThreadPoolExecutor.java:  635
 java.util.concurrent.ThreadPoolExecutor.runWorker  ThreadPoolExecutor.java: 1136
               java.util.concurrent.FutureTask.run          FutureTask.java:  264
                                               ...                               
               clojure.core/binding-conveyor-fn/fn                 core.clj: 2047
                          cues.queue/start-join/fn                queue.clj: 1204
                         cues.queue/processor-loop                queue.clj: 1160
                         cues.queu
e/processor-step                queue.clj: 1005
                          cues.queue/processor-run                queue.clj:  952
                                               ...                               
                           cues.queue/eval16136/fn                queue.clj:  761
                           cues.queue/attempt-full                queue.clj:  747
                          cues.queue/wrap-write/fn                queue.clj:  464
                      cues.queue/wrap-throwable/fn                queue.clj:  735
                        cues.queue/wrap-attempt/fn                queue.clj:  715
                             cues.queue/encode-msg                queue.clj:  694
                      cues.queue/codec/reify/write                queue.clj:   80

         taoensso.nippy/freeze                nippy.clj: 1331
                             taoensso.nippy/freeze                nippy.clj: 1337
                 taoensso.nippy/call-with-bindings                nippy.clj: 1279
                          taoensso.nippy/freeze/fn                nippy.clj: 1356
                       taoensso.nippy/freeze/fn/fn                nippy.clj: 1356
                     taoensso.nippy/eval14559/fn/G                nippy.clj:  570
                       taoensso.nippy/eval14578/fn                nippy.clj:  578
                     taoensso.nippy/eval14531/fn/G                nippy.clj:  569
                       taoensso.nippy/eval14827/fn                nippy.clj: 1174
                          taoensso.nippy/write-map                nippy.clj:  8
86
                            clojure.core/reduce-kv                 core.clj: 6919
                       clojure.core.protocols/fn/G            protocols.clj:  175
                                   clojure.core/fn                 core.clj: 6908
                                               ...                               
                       taoensso.nippy/write-map/fn                nippy.clj:  886
                    taoensso.nippy/write-map/fn/fn                nippy.clj:  889
                     taoensso.nippy/eval14559/fn/G                nippy.clj:  570
                       taoensso.nippy/eval14578/fn                nippy.clj:  578
                     taoensso.nippy/eval14531/fn/G                nippy.clj:  569
                       taoensso.nippy/eval14827/fn          
      nippy.clj: 1174
                          taoensso.nippy/write-map                nippy.clj:  886
                            clojure.core/reduce-kv                 core.clj: 6919
                       clojure.core.protocols/fn/G            protocols.clj:  175
                                   clojure.core/fn                 core.clj: 6908
                                               ...                               
                       taoensso.nippy/write-map/fn                nippy.clj:  886
                    taoensso.nippy/write-map/fn/fn                nippy.clj:  889
                     taoensso.nippy/eval14559/fn/G                nippy.clj:  570
                       taoensso.nippy/eval14578/fn                nippy.clj:  577
                     taoensso.nippy/eval14531/fn/
G                nippy.clj:  569
                       taoensso.nippy/eval14827/fn                nippy.clj: 1174
                          taoensso.nippy/write-map                nippy.clj:  886
                            clojure.core/reduce-kv                 core.clj: 6919
                       clojure.core.protocols/fn/G            protocols.clj:  175
                                   clojure.core/fn                 core.clj: 6908
                                               ...                               
                       taoensso.nippy/write-map/fn                nippy.clj:  886
                    taoensso.nippy/write-map/fn/fn                nippy.clj:  889
                     taoensso.nippy/eval14559/fn/G                nippy.clj:  570
                       taoensso.nippy
/eval14578/fn                nippy.clj:  578
                     taoensso.nippy/eval14531/fn/G                nippy.clj:  569
                       taoensso.nippy/eval14867/fn                nippy.clj: 1255
                  taoensso.nippy/throw-unfreezable                nippy.clj: 1005
clojure.lang.ExceptionInfo: Unfreezable type: class next.jdbc.result_set$navize_row$fn__49308
    as-str: "#object[next.jdbc.result_set$navize_row$fn__49308 0x23c00cbf \"next.jdbc.result_set$navize_row$fn__49308@23c00cbf\"]"
      type: next.jdbc.result_set$navize_row$fn__49308
clojure.lang.ExceptionInfo: Unfreezable type: class next.jdbc.result_set$navize_row$fn__49308
      err.proc/config: {:id :util.processors/jdbc-reader,
                        :in {:input :util.cues/source},
                        :out {:output :util.cues/notification_sources},
                        :strateg
y :cues.queue/exactly-once}

..... (THERE IS A LOT MORE BUT YOU CAN SEE THE EXCEPTION INFO JUST 4-5 lines above here)

Any suggestions to help get around this ?

zalky commented 1 year ago

Hi, and thanks for submitting this issue!

At first glance I think there are two orthogonal problems:

  1. The Cues metadata options that you're trying to set via the {:queue-meta false} are conceptually different from the Clojure language metadata that is being returned on the result set by JDBC.

    {:queue-meta false} controls provenance data that Cues automatically adds to your message under the :q/meta attribute (information about associated queues and indices). It is not metadata in the sense of Clojure language metadata that is attached to collections inside your messages:

    {:your-data (with-meta {:data "data"} {:meta "meta"})  ; Clojure metadata in your message data
    :q/meta    {:tx/t 83872121356288}}                    ; Cues metadata controlled by `:queue-meta` options

    Any Clojure metadata inside your messages is transparent to the Cues implementation and is handled by the Nippy serialization layer.

    Hopefully that answers why the {:queue-meta false} options are not doing what you expect them to.

  2. As for the exception, the Nippy serialization layer can serialize metadata in your messages by default:

    (require '[cues.queue :as q])
    
    (def q (q/queue ::queue-id))
    (def t (q/tailer q))
    (def a (q/appender q))
    
    (q/write a {:your-data (with-meta {:data "data"} {:meta "meta"})})
    ;; => 
    83872121356288
    
    (meta (:your-data (q/read t)))
    ;; => 
    {:meta "meta"}

    However, it cannot serialize functions by default, and I think that's where the serialization error is being thrown.

If I'm correct up to this point, then to me it seems like you have two options:

  1. If you don't need to serialize the functions, simply drop them from your messages.
  2. If you need the functions serialized, the good news is that Nippy is highly extensible and you might be able to use nippy/extend-freeze and nippy/extend-thaw to do this. In fact I think this problem has already been tackled: have a look at this library for an implementation.

Hope this answered your questions! Let me know if you need further clarifications or you are still running into problems.

zalky commented 1 year ago

An addendum: thinking through it a bit more, there's a good chance you will not be able to serialize those functions. They are lazy producers that probably depend on runtime JDBC resources. Having never tried it myself, I don't want to categorically say it can't be done, but I would not hold out hope.

zalky commented 1 year ago

I guess one more question: do you care about preserving the Clojure metadata or are you just trying to write your query result to the queue?

jamesnyika commented 1 year ago

Hi @zalky First of all thank you for such a quick response. I appreciate and now understand what the :q/meta false flag is handling. Not clojure metadata, .. only cues metadata: got it!

As for this issue of the serialization of functions, I am not actively trying to serialize the metadata. I get a result set back from my JDBC query that I initiate inside the processor. This returned resultset is populated by next.jdbc with functions to support datafy and nav capability that comes with Clojure 1.10+

So when Nippy goes to serialize my regular looking maps returned by next.jdbc, it encounters this metadata with the datafy and nav functions inserted there by next.jdbc. I am trying like hell to get them out of there because I do not care about them at all, ... they just come with the next.jdbc resultset.

So I am happy to do this nippy/extend-freeze or extend/thaw but I do not know where or how to put them into my code. Where do I implement and place them to get around this issue ? that is where I am stuck .. .

jamesnyika commented 1 year ago

Just an addendum: My initial thought was to ask you to give us an escape hatch within cues that similar to your q/meta false flag that would STRIP out clojure metadata from messages before serialization. I have never had to deal with metadata which is probably why I have no idea how to implement the freeze/thaw solution from the Nippy library. Thank you for the library reference.. I will take a look at it

A tangential question for you is : why do processors in cues not place a value on the output queue if it is not in map form ? is there some particular reason why it just freezes unless i do

     (let [z {:hello "its me"}]
     ...
     ;; this works 
     {:output {:hello "its me"}}

      ;; this does NOT work (nothing is ever sent to the follow on queue
     {:output z}

     );; end let
jamesnyika commented 1 year ago

I just tried using the library your recommended and it simply move the error up the stack to the postgres connection class. TO be clear.. I included it in my deps.edn, then ensured it was required in my processor.clj file . Reran everything again unchanged...


              util.cues$eval49666$fn__49667.invoke             NO_SOURCE_FILE:  194
                     taoensso.nippy/freeze-to-out!                  nippy.clj: 1013
                     taoensso.nippy/eval14559/fn/G                  nippy.clj:  570
                       taoensso.nippy/eval14586/fn                  nippy.clj:  581
                     taoensso.nippy/eval14531/fn/G                  nippy.clj:  569
                       taoensso.nippy/eval14867/fn                  nippy.clj: 1255

                  taoensso.nippy/throw-unfreezable                  nippy.clj: 1005
clojure.lang.ExceptionInfo: Unfreezable type: class org.postgresql.jdbc.PgConnection
    as-str: "#object[org.postgresql.jdbc.PgConnection 0xaf236fe \"org.postgresql.jdbc.PgConnection@af236fe\"]"
      type: org.postgresql.jdbc.PgConnection
clojure.lang.ExceptionInfo: Unfreezable type: class org.postgresql.jdbc.PgConnection
      err.proc/config: {:id :util.processors/jdbc-reader,
                        :in {:input :util.cues/source},
                        :out {:output :util.cues/notification_sources},
                        :strategy :cues.queue/exactly-once}
    err.proc/messages: {:rs
                        {:r0
                         {:source_uri
                          "https://www.epra.go.ke/wp-content/uploads/2021/11/15th-November-14th-December-2021-1.csv",

...

Is there a way for me to make these calls outside the processor so that they are not subject to serialization and avoid this problem altogether ?

Does this mean it is not possible to write a source processor that can pull data from a DB to load on to a queue ? or how do you do that ?

zalky commented 1 year ago

@jamesnyika , ok, I think I understand what you're trying to accomplish.

Short answer is yes, it should definitely be possible to write a source processor that can pull data from a DB onto the queue, I do it all the time.

But there are some considerations here that don't really have much to do with the Cues implementation specifically. Unfortunately I can't respond to all your questions right now, but I'll try to get something a little more detailed tomorrow or Friday.

jamesnyika commented 1 year ago

@zalky Thanks... and I look forward to any examples or input you have. I will standby for now...

zalky commented 1 year ago

@jamesnyika , ok so I don't have perfect insight into what the following does:

(async/<!! (wg/go-runner (eval (symbol q)) c cat) )

but assuming it's returning a query result similar to what this returns:

(next.jdbc/execute! ds ["select * from address"])

Then you should be able to drop all of the metadata with a simple:

(map #(with-meta % nil) query-result)

This resolved the exception in the minimal repro I tried.

But there is one more consideration I would flag. You should have some kind of sense of the upper bound on your query result size, because you might end up experiencing performance issues if you try to write a very large query result in a single message.

As described in the README.md, the queue message size above which you start to experience performance inefficiencies is 16Mb with the default configuration. You can go as high as 1Gb if you tune the underlying Chronicle Queue configuration.

If you know your data is bounded, this might not be an issue. Otherwise, you might start to think about whether or not it makes sense to partition your result set into separate messages. It really depends on the problem you are trying to solve. Just something to think about.

Let me know if the above resolved your issue.

jamesnyika commented 1 year ago

Hi @zalky Thank you for the suggestion!!

     (async/<!! (wg/go-runner (eval (symbol q)) c cat) )

The above is just a go block that takes a function name as a string, converts to a symbol, evaluates it/executes it with c being a connection spec in next.jdbc and cat is a parameter to the query.

Qn: is a cue processor running in its own thread or is it part of the main clojure thread ? do I even need to fork out to a new thread to execute my JDBC query ?

But you are right : it's returning a query result similar to what this returns:

(next.jdbc/execute! ds ["select * from address"])

With respect the data size, It is not massive and should never be... i get back about 147 rows of 9 columns of short strings and numbers. it is bounded. So far.. no major problems ?

I will try to wipe out the metadata as you indicated and see if that clears it up!!! Standby

jamesnyika commented 1 year ago

So it looks like most of the serialization errors are gone, but I also seem to have lost my ability to tap> out values

Instead is this exception when i try to print out the results set:

 Exception in thread "async-dispatch-11" java.lang.ClassCastException: class clojure.lang.MapEntry cannot be cast to class clojure.lang.IObj (clojure.lang.MapEntry and clojure.lang.IObj are in unnamed module of loader 'app')

What does this mean ?

zalky commented 1 year ago

To answer you question in bold: each Cues processor runs in its own thread, so if all you're doing is a JDBC query, you probably don't need any additional concurrency.

So for example, you could pass your JDBC datasource directly in via the processor :opts and then run your query in the processor body:

(defn pgraph
  [datasource]
  {:id         ::fuel-graph
   ...
   :processors [{:id   ::jdbc-reader
                 ...
                 :opts {:datasource datasource}}
                ...]})

(defmethod q/processor ::jdbc-reader
  [{{:keys [datasource]} :opts} {:keys [input]}]
  {:output (->> (parse-query input)
                (jdbc/execute! datasource)
                (map #(with-meta % nil))
                (map #(select-keys % [:source_uri :source_type]))
                (zipmap (map #(keyword (str "r" %)) (range))))})

As for why you are getting an error when you tap, unfortunately the provided information is not enough to be able to say.

jamesnyika commented 1 year ago

Thank you @zalky I think the info provided is sufficient for now to close this issue. Maybe you can add some of this info in the docs ? ..particularly that metadata may contain functions that could cause havoc with the serialization ..