rails / solid_cable

A database backed ActionCable adapter
MIT License
217 stars 14 forks source link

Add auto trimming #10

Closed dhh closed 1 month ago

dhh commented 1 month ago

I think the last missing piece of Solid Cable before we can proceed with its default inclusion in Rails 8 is to add auto trimming. I understood that there was a performance issue if you did it on every broadcast, but we could do it less frequently than that. @kevinmcconnell had built a Solid Cable like setup for our ONCE/Campfire product before we decided at the last minute to go with Redis for other reasons. Here's the code from that setup:

module ActionCable
  module SubscriptionAdapter
    class BoxCable < Base
      prepend ChannelPrefix

      def broadcast(channel, payload=nil)
        ::BoxCable::Message.create!(channel: channel, payload: payload)
      end

      def subscribe(channel, callback, success_callback = nil)
        listener.add_subscriber(channel, callback, success_callback)
      end

      def unsubscribe(channel, callback)
        listener.remove_subscriber(channel, callback)
      end

      def shutdown
        listener.shutdown
      end

      def clear
        shutdown
        @listener = nil
      end

      private
        def listener
          @listener ||= @server.mutex.synchronize { Listener.new(@server.event_loop) }
        end

        def db_filename
          config_options[:db] || Rails.root.join("tmp", "cable.sqlite3")
        end

        def config_options
          @config_options ||= @server.config.cable.deep_symbolize_keys
        end

        class Listener < SubscriberMap
          # Keep history for 10 seconds, in case any listeners fall a bit behind.
          TRIM_MESSAGES_AFTER = 10

          # Poll the database for new messages every 50ms
          POLLING_INTERVAL = 0.05

          attr_reader :connection
          delegate :latest_message_id, :fetch_messages, :trim_messages, to: ::BoxCable::Message

          def initialize(event_loop)
            super()
            @event_loop = event_loop
            @running = Thread::Queue.new

            monitor_messages
          end

          def shutdown
            @running.close
          end

          private
            def monitor_messages
              last_id = Rails.application.executor.wrap { latest_message_id } || 0
              last_trim = Time.now

              Thread.new do
                ActiveRecord::Base.logger.silence do
                  Thread.current.abort_on_exception = true

                  until @running.closed? do
                    Rails.application.executor.wrap do
                      messages = fetch_messages(last_id)

                      if messages.any?
                        messages.each do |message|
                          broadcast(message.channel, message.payload)
                          last_id = message.id
                        end
                      else
                        sleep POLLING_INTERVAL
                      end

                      if Time.now - last_trim > TRIM_MESSAGES_AFTER
                        trim_messages(TRIM_MESSAGES_AFTER)
                        last_trim = Time.now
                      end
                    end
                  end
                end
              end
            end

            def invoke_callback(*)
              @event_loop.post { super }
            end
        end
    end
  end
end

So that's one way of doing it. Another way is the Solid Cache setup which has both the option of a job and an inline way, and can be found here: https://github.com/rails/solid_cache/blob/main/lib/solid_cache/store/expiry.rb#L39.