apa512 / clj-rethinkdb

Eclipse Public License 1.0
204 stars 42 forks source link

Change feed into core.async channel #139

Closed favetelinguis closed 8 years ago

favetelinguis commented 8 years ago

Are there any example on how to turn a change feed into core.async channel that receives each update?

danielytics commented 8 years ago

The simplest way is to pass {:async true} to run, which will then return a channel that will receive the changes:

https://github.com/apa512/clj-rethinkdb/blob/3ee5796643fe73e55c517fd2dc8da17304fe6bf3/test/rethinkdb/core_test.clj#L171-L174

If you need the changes to go into an existing channel, you can use (core.async/pipe chan-from-run destination-chan)

Instead of passing {:async true} to run, you can also pass it to connect: https://github.com/apa512/clj-rethinkdb#coreasync

favetelinguis commented 8 years ago

Trying this but c never gets a message in the example below:

(defn get-changes []
  (let [c
        (-> (r/table "ticks")
            (r/changes)
            (r/run conn {:async? true}))]
    (go-loop []
      (when-let [msg (<! c)]
        (println msg)
        (recur)))))

However doing this in the data explorer works fine:

r.db("bfg").table('ticks').changes()

Anything im missing?

lenaschoenburg commented 8 years ago

Are your sure that the connection you are using stays open? For example, wrapping the above in a with-open would not work because the go-loop returns immediately so the connection gets closed and all running queries are stopped.

favetelinguis commented 8 years ago

Yes the connection is open, it is stored in an atom.

apa512 commented 8 years ago

It still sounds to me like you're passing in a non-active connection somewhere. Can you show a working example where this happens?

The previous example certainly wouldn't work if conn is an atom.

favetelinguis commented 8 years ago

Sorry, needed a secound pair of eyes, I was missing:

(r/run @conn {:async? true})