Closed tinco closed 6 years ago
Hi Tinco,
This is very cool!
I have a few high level comments:
.changes(:include_states => true)
for defer_subscription_confirmation
and transmit_subscription_confirmation
.changes(:include_initial => true)
?Let me know what your thoughts are.
Have a good one, Nico
Should we do .changes(:include_initial => true) ?
This is the reason why I made it have an optional options hash that gets passed into changes, I think include_initial really depends on the use case, though I expect include_intial would be most common so we could make it the default.
I'm a little unclear on the concurrency model. Are we doing one database connection per browser connection? if so, this might be difficult to scale. Should we use EventMachine? Here's some example: http://nobrainer.io/docs/real_time_tutorial/
I've discussed this with someone of the RethinkDB team about a year back, I think Daniel Mewes, they said that having in the thousands of database connections should be no issue. It is required by the way to have a connection per connection, there's no other way to do a changefeed.
The bigger problem is that I block a global_io_executor thread per connection I think. We need some kind of asynchronous socket to circumvent this, or really do use a thread per connection (outside the global_io_executor). It would be nice if we could do without depending on EventMachine. Celluloid has https://github.com/celluloid/nio4r, which is a nice and small library which might allow us to use evented sockets without depending on EM or the whole of Celluloid. I will read up on exactly what Rails does, it internally has to keep track of websockets, surely they use evented sockets as well for that.
Should we be using .changes(:include_states => true) for defer_subscription_confirmation and transmit_subscription_confirmation
You are right. This will allow us to send subscription_confirmation as soon as we get the :initializing message.
I am a bit short on time at the moment, hopefully I'll get some time to get to the bottom of these concerns later this week.
This is the reason why I made it have an optional options hash that gets passed into changes, I think include_initial really depends on the use case, though I expect include_intial would be most common so we could make it the default.
The options hash is good. Let's leave the rethinkdb default which is false.
I think Daniel Mewes, they said that having in the thousands of database connections should be no issue. Seems less than ideal to me, esp for performance.
It is required by the way to have a connection per connection, there's no other way to do a changefeed. Actually, the example at http://nobrainer.io/docs/real_time_tutorial/ uses a single connection
The bigger problem is that I block a global_io_executor thread per connection I think. We need some kind of asynchronous socket to circumvent this, or really do use a thread per connection (outside the global_io_executor). It would be nice if we could do without depending on EventMachine.
I agree. I don't think it's a good idea to use a thread+connection per channel. The faye-websocket gem depends on EventMachine, so maybe it's not a big deal to depend on EventMachine.
I am a bit short on time at the moment, hopefully I'll get some time to get to the bottom of these concerns later this week.
Awesome! let me know how it goes :+1:
Also, if you need to write some tests, run them with EM=1
, this will configure NoBrainer to use EventMachine.
Thanks, Nico
Actually, the example at http://nobrainer.io/docs/real_time_tutorial/ uses a single connection
I don't really get how that works. Doesn't the 'changes' function always fully block a rethinkdb connection? When I researched last year I think that is what I was told.
Let me show you real quick: ssh 3BHZt0Awbd96nmqLzVbCsTMI4@ny2.tmate.io or https://tmate.io/t/3BHZt0Awbd96nmqLzVbCsTMI4
I guess I'll show you another time, but yes, changes
doesn't block the connection with EventMachine.
Hey sorry, that was a very cool invitation, unfortunately I was at work knee-deep in customer relation stuff :(
Alright, I found how you did it, this doc: https://rethinkdb.com/docs/async-connections/#ruby-with-eventmachine very cool, I'll dig into that some more so I can use that in my own rethinkdb wrapper as well.
Yes, NoBrainer uses the EventMachine API internally with Fibers: https://github.com/nviennot/nobrainer/blob/master/lib/no_brainer/query_runner/em_driver.rb
I mentioned that we should have an async API instead being tied to EventMachine: https://github.com/rethinkdb/rethinkdb/issues/4630#issuecomment-128055788
awesome stuff!
Oh jeez I forgot all about this PR, I went on a holiday and it just slipped my mind.
Very interesting! You are probably already aware, but in case it helps Rails ActionCable uses the following three libraries under the hood:
(perhaps they may come in handy here)
Just posting a wip as a reminder for me. My heart isn't really in it, I really have some kind of a principle against EM so I guess that demotivated me a bit. Hopefully tomorrow I'll feel like implementing some specs. And since we nobrainer is a ORM we should probably deserialize the values, right?
@tinco no worries. I think it's okay to wait until the rethinkdb driver offer a better async API.
I made a PR to RethinkDB that implements #async_run, and I got it to work in NoBrainer, hopefully I'll have a nice patch tomorrow. The RethinkDB team has been a little slow in giving feedback, but I think we'll get something in.
Alright I pushed what I have right now. @nviennot could you advise me on this commit? There's two things I do that are awkward:
I check out a new connection from your ConnectionManager that I call .raw
on, ideally I would do that in such a way that the connection is automatically re-established etc or maybe even gotten from a pool, or at least thread local since writing to sockets is not threadsafe.
The second thing is that I call to_rql
on the query before running changes
on it to avoid some code in your drivers, I should probably integrate better but your driver/middleware system is a little complex and I couldn't immediately figure out what I should do.
Hi @tinco :)
It makes me happy that you are working on this :) :) We'll get it working.
I don't have much understanding with ActionCable internals and their API semantics. Can you explain a bit how it works? Does the user "controller" returns once it calls stream_from
?. The websocket remains open until when? How does it get closed? Does writing to that websocket happens in a synchronous fashion? I see that ActionCable depends on websocket-driver-ruby, which depends on EventMachine. So is eventmachine powering all that under the hood? If you could explain a bit on how everything works with ActionCable and their API semantics, that'd be great, otherwise, it's a little hard for me to validate your design.
Let's worry about connections later :)
The middleware stack is definitely something we want to integrate with, because it handles various things like dealing with creating database or tables on demand, reconnections, logging, run options, etc. The middleware stack is comparable to the Rack middleware stack, it's essentially a bunch of "around_filter" to speak in rails controller land. Each middleware implements a call() method, which is responsible to call the next middleware. The problem with the middlewares is that they are designed to be synchronous. With eventmachine, the stack is the following: https://github.com/nviennot/nobrainer/blob/master/lib/no_brainer/query_runner.rb#L45-L55 It works by using a custom driver to make the eventmachine calls synchronous with the help of fibers. I would be able to help a bit more if I understand what ActionCable needs.
Does the user "controller" returns once it calls stream_from?
It just adds the subscription, you could do more things if you like in the subscribed
method, like for example stream_from
a second collection, which is actually something I've done before.
The websocket remains open until when? How does it get closed?
It gets closed when either the application explicitly calls close
on the channel, or the TCP connection dies, or the client explicitly calls close or kills the websocket connection. So by default it stays open forever (Rails will automatically send heartbeats)
Does writing to that websocket happens in a synchronous fashion?
Yes at the moment, it might be decoupled later though.
I see that ActionCable depends on websocket-driver-ruby, which depends on EventMachine. So is eventmachine powering all that under the hood?
No, all eventmachine code has been eliminated from Rails 5, the dependency you see is only a dev-dependency, I suppose for their test suite. (according to this gemspec)
If you could explain a bit on how everything works with ActionCable and their API semantics, that'd be great, otherwise, it's a little hard for me to validate your design.
There's a great introduction here. The basic idea is very simple. Instead of a Controller
you have a Channel
. All public methods are actually RPC-exposed, so a client could call a function you define in the channel. The subscribed
and unsubscribed
are special lifecycle methods you can override to set the user up. In this case we use them to subscribe them to a stream.
So as I write this I realize it might not be such a good idea to override stream_from
this way. Normally you pass in a string to stream_from
that identifies a redis data stream. I like how my code makes it listen to a RethinkDB query instead, but to play nice in the ecosystem stream_from
should somehow still support generic event streams as well. Either by calling the 'super' method if it's not a RQL, or by actually letting RethinkDB manage generic streams as well (which I guess would be cool but I don't know if it competes well with Redis and I guess we would have to let the user decide that, so calling super is probably the superior option, even if super will just use a rethinkdb driver for that).
I see.
So the following is more of a brain dump. I'm just laying out my thoughts for what we do moving forward.
Regarding EventMachine, I see this: https://github.com/rails/rails/blob/master/actioncable/lib/action_cable/channel/periodic_timers.rb#L66 -- it seems that there's an event loop running in the background given the name of connection.server.event_loop
.
Also, this comment: https://github.com/rails/rails/blob/master/actioncable/lib/action_cable/channel/streams.rb#L118-L119 suggest that there's one event loop, assisted by a bunch of worker threads. The messages are received by the loop, and the callbacks are invoked from the workers.
Also, in https://github.com/rails/rails/tree/master/actioncable/lib/action_cable/subscription_adapter it seems that they have two implementations for redis. one that runs on eventmachine, one that doesn't.
If we end up using eventmachine, we'll use EM under the hood and not expose a thing to the user (like the evented redis implemented they have). I don't really like EM, so if we can avoid it, that'd be great. Hopefully your PR to rethinkdb will make it through. I feel like we could have both flavors, similar to what they have for redis. Using EM or not should not be relevant from a user perspective. We should offer the same API regardless.
Let's talk about the API that we can expose to the user.
I don't think we should use stream_from
. I don't think it's a good idea to reuse their "stream" naming and semantics (their stream are paired with the notion of broadcast). Their API have the notion of "coders" for JSON marshalling as arguments in these methods, which is irrelevant to us.
Their API have https://github.com/rails/rails/blob/master/actioncable/lib/action_cable/channel/broadcasting.rb#L21 which essentially provides the string related to a model, which is useful for https://github.com/rails/rails/blob/master/actioncable/lib/action_cable/channel/streams.rb#L99 (e.g. stream_for(User.first)
). We really don't need that as we talk to models directly.
So I think we should have our own API (which is like adding a module in https://github.com/rails/rails/tree/master/actioncable/lib/action_cable/channel). The name changefeed
may appear in the API name.
The API should: 1) Provide a way for the user to know that the subscription succeeded. It seems that there is already some way to do so: https://github.com/rails/rails/blob/master/actioncable/lib/action_cable/channel/base.rb#L272 and https://github.com/rails/rails/blob/master/actioncable/lib/action_cable/channel/streams.rb#L87 2) Provide a way to report errors. Some errors will happen before the subscription succeeded (e.g. query is invalid), some will happen once the subscription has been confirmed (e.g. the connection to the DB dropped). We might want to handle these two kinds of errors separately. How should we propagate errors to the user? 3) Maintain message ordering from the changefeed. When I see this: https://github.com/rails/rails/blob/master/actioncable/lib/action_cable/channel/streams.rb#L124 I'm a bit suspicious. It seems that when two messages A and B are received, both messages can be processed in parallel by two different workers. This is not a good idea as the two messages may get processed in the opposite order, and we end up sending B and then A to the websocket, which can be a problem for keeping track of a last updated value by the client. 4) The API should allow subscribing and unsubscribing to changefeeds from anywhere, even from a callback (either ours, or a stream callback).
Let me know if all this make sense to you. Thank you :)
it seems that there's an event loop running in the background given the name of connection.server.event_loop
Correct, the event loop class that is used by default is located here. There is a mode where you use Faye instead of direct websockets, and Faye depends on EventMachine, but that's not enabled by default.
suggest that there's one event loop, assisted by a bunch of worker threads
Ok, I dispatched on the main event loop but it would be better to do it as they recommend and use the workers for that.
I don't think we should use stream_from
I like the idea of using stream_from
on RQL and having it magically work because it's intuitive and it hilights the idea that you don't need a separate broadcasting service if your database natively supports change feeds. It also make semantic sense, you can stream from a generic broadcasts, but you can also stream from queries. If we allow both uses I don't think they would conflict.
Of course we can use a unique name like changefeed_stream_from
or something. It would avoid any conflict, even though I feel that the first argument being a NoBrainer query already avoids all conflict.
I implemented a small demonstration of my idea with regards to extending stream_from
instead of introducing a new method.
Hi everyone. I have an issue with streams in Rails 5. For example if we set up an ActionCable channel with NoBrainer and we are streaming data from rethinkDB's change(), and rethinkDB restarts, NoBrainer fails to reset a connection and tries to use an old, invalid one. Here are some rails logs:
2017-03-30T20:35:58+00:00 INFO: Started GET "/chats" for 195.191.175.244 at 2017-03-30 20:35:58 +0000
2017-03-30T20:35:58+00:00 INFO: Processing by ChatController#index as HTML
2017-03-30T20:35:58+00:00 INFO: Rendering chat/index.html.haml within layouts/dashboard
2017-03-30T20:35:58+00:00 INFO: Rendered layouts/_alert_notice_flash.haml (0.2ms)
2017-03-30T20:35:58+00:00 INFO: Rendered layouts/_ribbon.html.erb (0.5ms)
2017-03-30T20:35:58+00:00 INFO: Rendered chat/_message_thread.html.haml (0.4ms)
2017-03-30T20:35:58+00:00 INFO: Rendered chat/_chat_list_united.html.haml (71.0ms)
2017-03-30T20:35:58+00:00 INFO: Rendered chat/_chats_list.haml (72.2ms)
2017-03-30T20:35:58+00:00 INFO: Rendered chat/_no_messages_found.html.haml (0.2ms)
2017-03-30T20:35:58+00:00 INFO: Rendered message_templates/_variables.haml (0.5ms)
2017-03-30T20:35:58+00:00 INFO: Rendered chat/_message_template_modal.html.haml (3.3ms)
2017-03-30T20:35:58+00:00 INFO: Rendered chat/_chat_footer.html.haml (6.7ms)
2017-03-30T20:35:58+00:00 INFO: Rendered chat/_chat_synchronize_dialog.haml (0.4ms)
2017-03-30T20:35:58+00:00 INFO: Rendered message_templates/_upload.haml (0.7ms)
2017-03-30T20:35:58+00:00 INFO: Rendered rdb/messages/_airbnb_message.haml (0.2ms)
2017-03-30T20:35:58+00:00 INFO: Rendered rdb/messages/_email_message.haml (0.2ms)
2017-03-30T20:35:58+00:00 INFO: Rendered rdb/messages/_status_message.haml (0.3ms)
2017-03-30T20:35:58+00:00 INFO: Rendered rdb/messages/_message_templates.haml (1.3ms)
2017-03-30T20:35:58+00:00 INFO: Rendered chat/_chat_body.haml (10.1ms)
2017-03-30T20:36:00+00:00 INFO: Started GET "/auth/login" for 50.112.95.211 at 2017-03-30 20:36:00 +0000
2017-03-30T20:36:00+00:00 INFO: Processing by Auth::SessionsController#new as */*
2017-03-30T20:36:00+00:00 INFO: Rendering auth/sessions/new.html.erb within layouts/auth
2017-03-30T20:36:00+00:00 INFO: Rendered auth/_header_regist.html.erb (0.3ms)
2017-03-30T20:36:00+00:00 INFO: Rendered auth/_slider.html.erb (0.2ms)
2017-03-30T20:36:00+00:00 INFO: Rendered auth/sessions/new.html.erb within layouts/auth (1.9ms)
2017-03-30T20:36:00+00:00 INFO: Rendered layouts/_footer.html.erb (0.2ms)
2017-03-30T20:36:00+00:00 INFO: Rendered layouts/_google_analytics.html.erb (0.1ms)
2017-03-30T20:36:00+00:00 INFO: Completed 200 OK in 10ms (Views: 3.4ms | ActiveRecord: 0.0ms)
2017-03-30T20:36:01+00:00 INFO: Rendered chat/_chat_list_filter_form.html.haml (2577.8ms)
2017-03-30T20:36:01+00:00 INFO: Rendered chat/_chat_list_filter_modal.html.haml (2578.2ms)
2017-03-30T20:36:01+00:00 INFO: Rendered chat/index.html.haml within layouts/dashboard (2664.0ms)
2017-03-30T20:36:01+00:00 INFO: Rendered layouts/_locale_select.html.haml (0.8ms)
2017-03-30T20:36:01+00:00 INFO: Rendered layouts/_dashboard_header.html.erb (1.1ms)
2017-03-30T20:36:01+00:00 INFO: Rendered layouts/_user_stats.html.haml (6.5ms)
2017-03-30T20:36:01+00:00 INFO: Rendered layouts/_left_panel.html.haml (10.1ms)
2017-03-30T20:36:01+00:00 INFO: Rendered layouts/_page_footer.html.erb (0.2ms)
2017-03-30T20:36:01+00:00 INFO: Rendered layouts/_scripts.html.erb (0.1ms)
2017-03-30T20:36:01+00:00 INFO: Rendered layouts/_logout_modal.html.erb (0.4ms)
2017-03-30T20:36:01+00:00 INFO: Rendered layouts/_google_analytics.html.erb (0.1ms)
2017-03-30T20:36:01+00:00 INFO: Completed 200 OK in 2690ms (Views: 592.6ms | NoBrainer: 2085.6ms (read) | ActiveRecord: 0.0ms)
2017-03-30T20:36:01+00:00 INFO: Finished "/cable/" [WebSocket] for 195.191.175.244 at 2017-03-30 20:36:01 +0000
2017-03-30T20:36:03+00:00 INFO: Started POST "/chat/confirm_online" for 195.191.175.244 at 2017-03-30 20:36:03 +0000
2017-03-30T20:36:03+00:00 INFO: Processing by ChatController#confirm_online as */*
2017-03-30T20:36:03+00:00 INFO: Completed 200 OK in 24ms (NoBrainer: 14.1ms (write) | ActiveRecord: 0.0ms)
2017-03-30T20:36:03+00:00 INFO: Started GET "/cable" for 195.191.175.244 at 2017-03-30 20:36:03 +0000
2017-03-30T20:36:03+00:00 INFO: Started GET "/cable/" [WebSocket] for 195.191.175.244 at 2017-03-30 20:36:03 +0000
2017-03-30T20:36:03+00:00 INFO: Successfully upgraded to WebSocket (REQUEST_METHOD: GET, HTTP_CONNECTION: upgrade, HTTP_UPGRADE: websocket)
2017-03-30T20:36:03+00:00 INFO: Registered connection (Z2lkOi8vbWlucGFrdS1kYXNoYm9hcmQvVXNlci8xOTA1ZDk1OC1jODRmLTExZTYtODQ3MC0wNmMwMGVjMzIzMzE)
2017-03-30T20:36:04+00:00 INFO: ChatListChannel is transmitting the subscription confirmation
2017-03-30T20:36:04+00:00 INFO: ChatsSyncChannel is transmitting the subscription confirmation
2017-03-30T20:36:04+00:00 INFO: ChatMessagesChannel is transmitting the subscription confirmation
2017-03-30T20:36:04+00:00 INFO: UnreadThreadsChannel is transmitting the subscription confirmation
2017-03-30T20:36:04+00:00 INFO: ChatsSyncChannel#stream_channel_data
2017-03-30T20:36:04+00:00 ERROR: Could not execute command from {"command"=>"message", "identifier"=>"{\"channel\":\"ChatsSyncChannel\"}", "data"=>"{\"action\":\"stream_channel_data\"}"}) [RethinkDB::ReqlRuntimeError - Connection is closed.]: /opt/rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/nobrainer_streams-0.2.0/lib/nobrainer_streams/rethinkdb_monkeypatch.rb:258:in `run' | /opt/rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/nobrainer_streams-0.2.0/lib/nobrainer_streams/rethinkdb_monkeypatch.rb:249:in `block in async_run' | /opt/rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/nobrainer_streams-0.2.0/lib/nobrainer_streams.rb:68:in `run' | /opt/rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/nobrainer_streams-0.2.0/lib/nobrainer_streams/rethinkdb_monkeypatch.rb:248:in `async_run' | /opt/rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/nobrainer_streams-0.2.0/lib/nobrainer_streams.rb:42:in `nobrainer_stream_from'
2017-03-30T20:36:04+00:00 INFO: UnreadThreadsChannel#stream_channel_data
2017-03-30T20:36:04+00:00 INFO: ChatListChannel#stream_channel_data({"unread"=>false})
2017-03-30T20:36:04+00:00 INFO: UnreadThreadsChannel transmitting {:unread_count=>15}
2017-03-30T20:36:04+00:00 ERROR: Could not execute command from {"command"=>"message", "identifier"=>"{\"channel\":\"UnreadThreadsChannel\"}", "data"=>"{\"action\":\"stream_channel_data\"}"}) [RethinkDB::ReqlRuntimeError - Connection is closed.]: /opt/rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/nobrainer_streams-0.2.0/lib/nobrainer_streams/rethinkdb_monkeypatch.rb:258:in `run' | /opt/rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/nobrainer_streams-0.2.0/lib/nobrainer_streams/rethinkdb_monkeypatch.rb:249:in `block in async_run' | /opt/rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/nobrainer_streams-0.2.0/lib/nobrainer_streams.rb:68:in `run' | /opt/rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/nobrainer_streams-0.2.0/lib/nobrainer_streams/rethinkdb_monkeypatch.rb:248:in `async_run' | /opt/rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/nobrainer_streams-0.2.0/lib/nobrainer_streams.rb:42:in `nobrainer_stream_from'
2017-03-30T20:36:04+00:00 ERROR: Could not execute command from {"command"=>"message", "identifier"=>"{\"channel\":\"ChatListChannel\"}", "data"=>"{\"unread\":false,\"action\":\"stream_channel_data\"}"}) [RethinkDB::ReqlRuntimeError - Connection is closed.]: /opt/rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/nobrainer_streams-0.2.0/lib/nobrainer_streams/rethinkdb_monkeypatch.rb:258:in `run' | /opt/rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/nobrainer_streams-0.2.0/lib/nobrainer_streams/rethinkdb_monkeypatch.rb:249:in `block in async_run' | /opt/rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/nobrainer_streams-0.2.0/lib/nobrainer_streams.rb:68:in `run' | /opt/rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/nobrainer_streams-0.2.0/lib/nobrainer_streams/rethinkdb_monkeypatch.rb:248:in `async_run' | /opt/rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/nobrainer_streams-0.2.0/lib/nobrainer_streams.rb:42:in `nobrainer_stream_from'
The channel looks like this:
class ChatListChannel < ApplicationCable::Channel
include NoBrainer::Streams
def subscribed
chats_list = message_threads_collection
.order_by(filtering_index: :desc)
.limit(LIST_LIMIT)
.with_index(:filtering_index)
stream_from chats_list, { include_initial: true }, ->(change) { transmit_thread(change) }
end
def unsubscribed
# Any cleanup needed when channel is unsubscribed
end
end
The code by @tinco doesn't handle DB connection drops well (see my previous point 2. at https://github.com/nviennot/nobrainer/pull/192#issuecomment-234822138). As this is outside of NoBrainer codebase, may I suggest that you open an issue on the nobrainer-stream project? Btw, I'm happy to provide some help if NoBrainer's code needs adjustment to provide for NoBrainer-streams
He already opened an issue here: https://github.com/tinco/nobrainer_streams/issues/3 , I'll try and find some time to look at it.
Cool, thank you :))
@tinco do you have an update on this? It's really cool!
@tinco @nviennot shouldn't we close this PR and just keep the nobrainer_streams gem? It make sense to keep them separated in 2 pieces in my opinion.
Sure!
Hi,
I made this small module to make the new ActionCable feature of Rails 5.0 work super conveniently with NoBrainer. You can see an example of its use here:
https://github.com/tinco/rails-chat-game/blob/master/app/channels/room_channel.rb
I use DHH's weird server side rendering technique there, if you skip that the code could be as simple as:
Pretty neat right?
I have not tested it in your project yet, if you think this is a good addition I'll work on getting the tests right so you could merge.
Kind regards, Tinco