Closed bhurlow closed 9 years ago
upon further investigation this looks to be a problem with thread access to the connection object:
;; ====== MAKE IT EXPLODE =====
(defn dummy-insert [conn]
(-> (r/db "test")
(r/table :things)
(r/insert [{:prop (rand-int 5000)}])
(r/run conn)))
(defn explode []
(let [conn (connect)]
(pmap (fn [v] (dummy-insert conn))
(range 50))))
(explode)
This will invariably throw an Assert failed (= token recvd-token)
exception thus making the library almost impossible to use in multi-threaded situations e.g. with a webserver like http-kit.
That's bizarre, I've never had issues with it. I suppose I could have just been lucky.
I suppose a temporary solution would be to open a new connection before each query (and close it after) so that threads don't try and use the same connection at any given time. Could perhaps also be fixed through connection pooling #23, which I may take a look at next week.
I have no idea what it would take to fix the connection to be thread safe though. I'm willing to give it a try if @apa512 can point me in the right direction, though.
Ok, I think I see the issue:
Since all calls to run
on the same connection read the same DataInputStream
, there is a race condition and the responses may not go to the call to send-query
that sent it, hence token
won't match and the assert fails.
A solution might be to store the response someplace so that if its not the one that this thread expects (ie token
does not match), it tries again and leaves the response for another thread to pick up.
@apa512 do you have any thoughts or preferences as to how that should (or you'd like it to) be done? I can give it a shot over the weekend then.
Actually, now that I think about it, a connection isn't thread safe at all, because when you send a query, it makes multiple writes[1] to the connections DataOutputStream
, which can of course end up interleaved and therefore send corrupt requests. Similarly for reads. So both reading and writing would need to be serialised.
[1] https://github.com/apa512/clj-rethinkdb/blob/master/src/rethinkdb/net.clj#L55-L57
Yep, this library is basically unusable in production since a lot of tools like webservers will parallelize where possible automatically. Revise for instance uses agents to handle the race condition and it works pretty well: https://github.com/bitemyapp/revise/blob/master/connections.md
seems like a quick update to support parallel use cases would make a big difference. Otherwise this library is solid and pretty up to date
Maybe we can do a similar design.
I see also that they use core.async internally. It would be reasonably straightforward to serialise communication to/from connections through core.async channels. If this approach were taken, then there would already be a dependency on core.async, so I'd suggest also (as an option) supporting core.async (as well as callbacks) as discussed in #20. If going this route, it may be worth thinking about the connection approach as a whole to make sure things like changefeeds are properly streamed too (I have not actually tried them with clj-rethinkdb, so don't know how they currently behave).
Anyway, as I said, I'd be happy to work on this issue over the weekend if @apa512 can chime in and let us know how he wishes to proceed.
I can confirm that this is the case. You have to use one connection per thread.
This will be fixed when I (or someone else) takes the time to implement async querying.
cool! I will look into forking and adding if I have time
@apa512 do you have any thoughts/ideas on how you would like async querying to work? I have a little bit of time on my hands right now and would happily try it, but if you have any thoughts would like to hear them before I spend time on it.
If you don't then I'll go with a core.async approach and see where that leads.
@bhurlow let me know if you do. Perhaps we can collaborate.
Let me do a little looking around. Revise basically maps queries to tokens so their responses can be handled asynchronously. Not sure you need core.async necessarily, seems like Agents may be a lighter approach but I'm not totally sure yet. Will post back here soon. How are you using rethinkdb currently?
This commit https://github.com/bitemyapp/revise/commit/2d52fe2efa230f738480972138877c26508d88a8 replaces promises with core.async channels. Not sure if we need that here?
I'm not using it right now (stopped working on the project a week ago). Didn't encounter any issues when I was, though, but I guess it was just luck.
Ahh I see
I work with @bhurlow and I can help too.
We can also use connection pooling. As a data point, Carmine (https://github.com/ptaoussanis/carmine) uses the Apache Commons Pool library to good effect, I've used it in a busy (15req/s) production system for more than two years now. The pooling code is in https://github.com/ptaoussanis/carmine/blob/master/src/taoensso/carmine/connections.clj
One thing I noticed when building systems with Carmine is that a db library should not impose a connection policy. There are cases where I want pooling, there are cases where I want some pooling, e.g. I don't want a different connection from the pool on every library call, and there are cases where I want to control connections manually, for example with long-lived connections that persist for the entire life of the process.
As an example, carmine's wcar
macro isn't usable for me, because it always gets a connection from the pool.
Whatever ends up implemented here, I'd suggest considering different use cases.
@jwr I agree that pools should be optional and the connection thread safety should be fixed regardless of what happens with connection pooling.
I've also taken a look at Carmine as you suggested and I think that using Apache Commons Pool looks like a pretty nice solution for connection pooling.
One thing I noticed when building systems with Carmine is that a db library should not impose a connection policy.
What if rethinkdb.query/run
takes either a pool or connection as its second parameter? That is, the current behaviour doesn't change (you can pass a single un-pooled connection) but it also accepts a pool, in which case it will take a connection from the pool and use that. As for your second use case, perhaps you can manually request a connection from a pool and then call run
with that connection. That is, run
takes two forms: (run query connection)
and (run query connection-pool)
and your second use case uses the first form after getting a connection from a pool manually.
I'm going to think on it and then put something together later. If anyone has ideas or concerns, please chime in (especially @apa512 as its your library and I would like to make sure whatever I try has a reasonable chance of getting merged).
I'm thinking basic set of requirements are:
Thoughts?
@danielytics Everything you wrote makes sense. I haven't given the thread safety issue a lot of thought but using channels seems reasonable.
Something like
:+1: to #'rethinkdb.query/run
accepting a connection or a connection pool.
It seems to me that enforcing the proposed channel model without a connection pool alternative would create an undesirable bottleneck in parallelized clients.
In our (@bhurlow) implementations, we often (read: almost always) have concurrent threads making requests and it would be greatly preferable not to force them to wait in line, so to speak.
Something really important to consider is that this library is on v3 of the RethinKDB client protocol, not v4. The best source of release notes I could find on this was this Google Group thread.
Of particular note is:
New protocol version that supports parallel execution
We added a new protocol version V0_4: https://github.com/rethinkdb/rethinkdb/blob/next/src/rdb_protocol/ql2.proto#L50 . V0_3 is still supported, and has the same behavior as before, but if your driver sends V0_4 as its protocol version and then sends multiple queries, the server will execute those queries in parallel and return results in whatever order they finish. (If you’re currently relying on results being returned in order, you should instead look at the token of the response to figure out which query it corresponds to.)
This is especially useful for projects in asynchronous languages like JS, and for projects that do prefetching for streams (which we strongly recommend).
This is particularly important if you're working with change feeds. From what I can tell (without too much testing), under v3 if you execute a changefeed query followed by a regular query, the regular query will never return from RethinkDB until the changefeed query is finished, i.e. potentially never.
I've made a first pass implementation using core.async here: https://github.com/danielytics/clj-rethinkdb/tree/connection-core.async (specifically https://github.com/danielytics/clj-rethinkdb/blob/connection-core.async/src/rethinkdb/core.clj#L47 and https://github.com/danielytics/clj-rethinkdb/blob/connection-core.async/src/rethinkdb/net.clj#L138 Note that for now I've left the old code for testing)
My unscientific benchmark has running connect query disconnect
100 times at about 7% slower than it was before, but running query 100 times against the same connection is the same as before. This version also runs @bhurlow's explode
function without exploding.
I'm going to look into doing a version that doesn't use core.async next and after that I'll look into connection pooling. The code still needs to be cleaned up, but if anyone has any feedback, I'd like to hear it.
Now would also probably be a good time to decide on a solution to #20
@danielytics, I swapped [rethinkdb "0.8.39"]
with your branch in one of our apps and can confirm that does not explode with real usage either. Given the way this particular app works, after about five seconds and maybe one successful write, the old version would explode.
With your version, it processed several hundred jobs without any hiccups.
@danielytics Good job! I haven't been able to test this (only access to stupid Windows computer) but this seems very promising.
@danielytics core.async branch looks good. :+1: for trying to get up the latest protocol version too like @danielcompton mentioned though perhaps thats a separate issue
From what I can see, the only difference between protocol v3 and v4 is that v4 will issue queries in parallel while v3 does it sequentially. If that's correct, then my new version is compatible with v4 and we can upgrade.
I'm going to play around with this some more (and also fix #20) and then do a code cleanup before doing a PR.
@danielytics that was pretty much it, there was also a few term name changes in v4, these were fixed in #25. Also, I think you've got this, but to be clear the change between v3 and v4 is that the server can return results out of order of delivery.
@danielytics whats the status on this? your fork looks solid
@bhurlow I was working on #20 and wanted to get that working before doing a PR, but that issue seems more complicated than anticipated. I'll clean up my fork and do a PR later tonight so that this issue can be closed and then continue thinking about/working on #20 separately.
@danielytics Thanks for the work you've put into this. #20 looks desirable as well. If you're feeling comfortable with your branch for this issue I'd say go ahead and do a PR. I for one am looking forward to using it.
:+1: to separation of concerns (and PRs)
I need to clean up the code a bit. Should get time within the next few hours.
Apologies for the delay. I've done a PR now: #32
:+1: At least at a cursory glance, it looks good to me.
I think we can safely close this now. Reopen it if there's more to discuss though.
if the rethnkdb server socket is down or disconnected, the assert here: https://github.com/apa512/clj-rethinkdb/blob/master/src/rethinkdb/net.clj#L45 throws before a java.net.ConnectException.
I found this a bit confusing in production as I wasn't sure if the db had been disconnected or what.