MeltwaterArchive / datasift-ruby

Ruby client for DataSift
http://datasift.com
MIT License
24 stars 22 forks source link

How to stop the consumer? #68

Closed heaven closed 9 years ago

heaven commented 9 years ago

I have the next patches:

# Add stop method
module DataSift
  class LiveStream < DataSift::ApiResource
    def stop
      subscriptions.keys.each do |hash|
        unsubscribe(hash)
      end
    end
  end
end

# Fix zombie processes
module WebsocketTD
  class Websocket
    def send(data, type = :text)
      if IS_WINDOWS
        super
      else
        pid = fork do
          do_send(data, type)
        end

        Process.detach(pid)
      end
    end
  end
end

In my start method I have something like this:

self.streams << Thread.new do
  on_delete = proc { ... }
  on_error = proc { ... }
  on_message = proc { ... }
  on_open = proc { ... }
  on_ds_message = proc { ... }

  self.consumers << consumer = DataSift::new_stream(@config, on_delete, on_error, on_open)

  consumer.on_datasift_message = on_ds_message
  consumer.stream.read_thread.join
end

And the stop method does look like this:

def stop
  self.consumers.each(&:stop)
  self.streams.each(&:join)
  self.queue and self.queue.join
  self.observer and self.observer.join
end

And it freezes at this line: self.streams.each(&:join) so I have to use kill instead.

dugjason commented 9 years ago

Hi @heaven, I'm afraid I'm not quite sure what you're asking for here - is this a pull request, or are you asking for support? I'm not sure I understand your question

dugjason commented 9 years ago

Closing due to lack of activity.

heaven commented 9 years ago

@dugjason I was pointing that there's no stop method to shut down the streams gracefully.

zcourts commented 9 years ago

@heaven you do that be unsubscribing from the stream as per https://github.com/datasift/datasift-ruby/blob/master/examples/live_stream_eg.rb#L34

heaven commented 9 years ago

So if I have the next code:

self.streams << Thread.new do
  on_delete = proc { ... }
  on_error = proc { ... }
  on_message = proc { ... }
  on_open = proc { ... }
  on_ds_message = proc { ... }

  self.consumers << consumer = DataSift::new_stream(@config, on_delete, on_error, on_open)

  consumer.on_datasift_message = on_ds_message
  consumer.stream.read_thread.join
end

How do I stop the stream consumer?