faye / faye-websocket-ruby

Standards-compliant WebSocket client and server
Other
1.03k stars 97 forks source link

Cleaning up resources created during Open within Close #104

Closed TechRsch closed 7 years ago

TechRsch commented 7 years ago

I am issuing a Redis subscribe within WebSocket (WS). When I receive the WS open, I thread the request and then instantiate the Redis client. Within the open, I thread for Redis and issue the subscription. This all works fine, until I receive an unexpected WS close. At that point, the thread running the Redis subscription is gone. If I issue an unsubscribe, I get a hang because the Redis thread has been terminated.

Is there some method that can be used to permit me to clean up resources created during open prior to termination and destruction?

Sample Ruby code is:

class Backend
  include MInit

  def initialize(app)
    setup
    @app = app
  end

  def run!(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, [], ping: KEEPALIVE_TIME)
      ws_thread = Thread.fork(env) do
        credis = Redis.new(host: @redis_uri.host, port: @redis_uri.port, password: @redis_uri.password)

        ws.on :open do |event|
          channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length]
          redis_thread = Thread.fork do
            credis.subscribe(channel) do |on|
              on.message do |message_channel, message|
                sent = ws.send(message)
              end
              on.unsubscribe do |message_channel|
                puts "Unsubscribe on channel:#{channel};"
              end
            end
          end
        end

        ws.on :message do |event|
          handoff(ws: ws, event: event)
        end

        ws.on :close do |event|
          # Hang occurs here
          unsubscribed = credis.unsubscribe(channel)
        end

        ws.on :error do |event|
          ws.close
        end

        # Return async Rack response
        ws.rack_response

      end
    end
  else
    @app.call(env)
  end

  private
  def handoff(ws: nil, event: nil, source: nil, message: nil)
    # processing
  end
end
jcoglan commented 7 years ago

So I wouldn't have any idea about your threads or your Redis client, but onclose is the correct place to clean up resources you created in onopen. Faye::WebSocket won't know about any of these threads you create and isn't responsible for stopping them.

Can I ask why you're calling ws_thread = Thread.fork? I don't think that thread should be necessary, and it might interfere with Faye running in EventMachine.

TechRsch commented 7 years ago

Good question, actually, especially because I really don't need that anymore. I was using it for device isolation and recovery because the server wasn't clearing failed devices without each being in their own thread. The isolation really helped. However, I recently implemented Sinatra's Prototype pattern that isolates each device at the call level instead. The thread is now redundant and I'll probably remove it. Thanks!

On the open versus close, I'm bothered by the fact that the resources that I am trying to access, and clean up, are being deleted before close is reached. Therefore, I have no capability to access the resources, the Redis subscription in this case, so that I can resolve any issues there. Is there any option for me?

TechRsch commented 7 years ago

I've removed the unnecessary WS thread. It really wasn't a problem but neither was it helpful.

Regardless of that, I cannot clean up the resources in close because the Redis subscription thread is gone before I get control. So, I've had to build a hack for Redis, literally using a KILL to purge the orphaned connection. What I really need is a yield block prior to the open'd resources being deleted so that I can clean them up myself.

On the Redis side, whenever I issue a command to close or terminate or unsubscribe, the first thing it does is a "synchronize" that is intended to switch to the original Redis thread prior to command processing. This causes a permanent hang because that thread was deleted prior to my getting control within WS.close. Thus, I need control prior to that deletion. Can that happen?

I admit being a little confused about your point regarding "Faye::WebSocket won't know about any of these threads you create and isn't responsible for stopping them." The resources exist and are accessible up to the point where WS.close runs, which is when they disappear. If this gem isn't deleting them, I cannot imagine where they go.

jcoglan commented 7 years ago

I found some time to try running your code. At first, the errors I found in it were related to variables not being in scope. I cleaned things up and ended up with this:

class Backend
  KEEPALIVE_TIME = 20

  def initialize(app)
    @app = app
  end

  def call(env)
    return @app.call(env) unless Faye::WebSocket.websocket?(env)

    ws = Faye::WebSocket.new(env, [], ping: KEEPALIVE_TIME)

    credis = Redis.new(host: '0.0.0.0', port: 6379)
    channel = nil
    redis_thread = nil

    ws.on :open do |event|
      uri = URI.parse(event.target.url)
      channel = uri.path[1..uri.path.length]

      redis_thread = Thread.fork do
        credis.subscribe(channel) do |on|
          on.message do |message_channel, message|
            sent = ws.send(message)
          end
          on.unsubscribe do |message_channel|
            puts "Unsubscribe on channel:#{channel};"
          end
        end
      end
    end

    ws.on :message do |event|
      handoff(ws: ws, event: event)
    end

    ws.on :close do |event|
      # Hang occurs here
      unsubscribed = credis.unsubscribe(channel)
    end

    # Return async Rack response
    ws.rack_response
  end

  private

  def handoff(ws: nil, event: nil, source: nil, message: nil)
    # processing
  end
end

What I've observed (using the redis gem) is that the credis.subscribe and credis.unsubscribe calls both block, and only SUBSCRIBE is received by the redis server. Of course, running credis.subscribe in a thread stops it from blocking the WebSocket thread, which is fine.

I have checked redis_thread.alive? when onclose happens, and the thread is still running, as I would expect as faye-websocket has no idea about any threads you might have started in its event handlers and won't attempt to stop them. The thread is still running because the call to credis.subscribe is blocking it.

That's as far as I've got, and I'm not too familiar with this gem. Personally, for doing subscriptions I would use em-hiredis, but I hope the above is useful.

TechRsch commented 7 years ago

Well, there you go after all. I did not know of alive?. Should have. Didn't. Your answer made me think in a different direction. If the thread actually still existed, why is Redis hanging? Obviously, Redis is waiting for the thread to gain control. What do I have to do to switch to the thread, or allow it to take control? I need to give up control. Thus, within close, I must run an EM.next_tick block:

EM.next_tick do
      # Hang no longer occurs here
      unsubscribed = credis.unsubscribe(channel)
end

And, that fixed my hang. Thanks for your help.