RubyDevInc / paho.mqtt.ruby

Eclipse Public License 1.0
31 stars 19 forks source link

Mqtt client is not working in background mode in a Rails application #49

Open avishekjana opened 6 years ago

avishekjana commented 6 years ago

Hi, When I'm starting my rails application using rails c command, mqtt client is working fine, and I'm receiving messages from my subscriptions perfectly. But when I'm trying to detach the rails app from console using rails c -d command, mqtt subscriptions are not working. Then I followed the instructions in Foreground and Daemon section, but this config client.connect('iot.eclipse.org', 1883, client.keep_alive, client.persistence, true) is not working. Means it's getting connected to my mqtt server (checked using the on_connack callback), but on_suback callback is not giving any response.

This is my MQTT handler file

`

require 'paho-mqtt'
class MqttHandler

  def self.handle_mqtt_subscriptions
    # Setup MQTT client
    client = PahoMqtt::Client.new({ username: "xxxxx", password: "xxxxxx", persistent: true })

    ### Register a callback trigger on the reception of a CONNACK packet
    client.on_connack = proc { puts "Connected to MQTT>>" }

    client.add_topic_callback("device/status/+") do |packet|
      puts "MQTT CHANNEL:: #{packet.topic} \n>>> #{packet.payload}"
    end

    ### Register a callback on suback to assert the subcription
    waiting_suback = true
    client.on_suback do
      waiting_suback = false
      puts "Subscribed to MQTT>>>"
    end

    ### Connect to the eclipse test server on port 1883 (Unencrypted mode)
    client.connect('mqtt.mymqttserver.in', 1883, client.keep_alive, client.persistent, true)

    ### Subscribe to a topic
    client.subscribe(['device/status/+', 1])

    ### Waiting for the suback answer and execute the previously set on_suback callback
    while waiting_suback do
      sleep 0.001
    end
  end
end

`

And I'm calling the handle_mqtt_subscriptions function from my config/application.rb file like: `

  config.after_initialize do
      MqttHandler.handle_mqtt_subscriptions
  end

`

What could be the issue, any clue?

rgaufman commented 6 years ago

I wouldn't recommend using an mqtt subscriber inside a rails app. Every time you open rails console or have multiple processes, each one will become a duplicate subscriber, probably undesirable? - I have a separate process to act as a subscriber.

However, publishing MQTT messages from Rails works great!

You want to add something like this in your puma.rb:

on_worker_boot do
  # Re-open appenders after forking the process
  SemanticLogger.reopen
  MyMQTT.reopen if defined?(MyMQTT)
end

Inside of MyMQTT class I have:

      def create_client
        logger.info("Starting MQTT Connection (client_id=#{@client_id} " \
                    "clean_session=#{@clean_session} username=#{@username} " \
                    "will_topic=#{@will_topic})")

        PahoMqtt::Client.new(
          clean_session: @clean_session, client_id: @client_id,
          username: @username, password: @password,
          # persistent
          persistent: true, reconnect_limit: 1000, reconnect_delay: 5,
          # Keep Alives
          keep_alive: 29, ack_timeout: 120,
          # Will
          will_topic: @will_topic, will_payload: 'offline',
          will_qos: 2, will_retain: true
        )
      end

      # Method to deal with Puma workers
      def reopen
        logger.warn('Re-opening the connection...')
        @client = create_client
        @client.connect(@ip, @port)
      end
nicovak commented 6 years ago

For my Rails app I am running It in a sidekick job and It's working as expected. I am also on Heroku. It was the best solution I could find to handle It

Here is my sidekick.rb, just in case


sidekiq_server = !!Sidekiq.redis(&:info) rescue false

if sidekiq_server
  Sidekiq.configure_client do |config|
    schedule_file = 'config/schedule.yml'
    if File.exist?(schedule_file) && Rails.env.production?
      Sidekiq::Cron::Job.load_from_hash YAML.load_file(schedule_file)
    end

    Rails.application.config.after_initialize do
      MqttJob.perform_async
    end
  end
end
DevGW commented 5 years ago

I know this is old but if someone is looking for a simple implementation of this ... create a rake task like below and run from cron every minute or however often:

namespace :ext_nodes do
  require 'mqtt'
  require 'awesome_print'

  task sub_nodes: :environment do
    # Subscribe example
    pidFile = File.open("tmp/sub.pid").close || true rescue false 
    ap "pidFile #{pidFile}"
    pidFileContents = File.read("tmp/sub.pid") || File.read("tmp/sub.pid") rescue false 
    ap "pidFileContents #{pidFileContents}"
    if  !pidFile || !pidFileContents
      ap "Starting subscription  false false"
      fork do
        # create a PID file for testing state
        File.open("tmp/sub.pid", "w") {|file| file.puts "#{Process.pid}" }

        @client = MQTT::Client.connect('127.0.0.1')
        @client.subscribe('test')
        @client.get do |topic, message|
          ap "#{topic} - #{message}"
        end
      end
    else
      if !exist? pidFileContents.to_i
        ap "Starting subscription  proc does not exist"
        fork do
          # create a PID file for testing state
          File.open("tmp/sub.pid", "w") {|file| file.puts "#{Process.pid}" }

          @client = MQTT::Client.connect('127.0.0.1')
          @client.subscribe('test')
          @client.get do |topic, message|
            ap "#{topic} - #{message}"
          end
        end
      else
        ap "We Good!"
      end

    end

  end

  def exist?(pid)
    Process.kill(0, pid)
    true
  rescue Errno::ESRCH
    false
  end

end
lbrnmdev commented 4 years ago

I wouldn't recommend using an mqtt subscriber inside a rails app. Every time you open rails console or have multiple processes, each one will become a duplicate subscriber, probably undesirable? - I have a separate process to act as a subscriber.

However, publishing MQTT messages from Rails works great!

You want to add something like this in your puma.rb:

on_worker_boot do
  # Re-open appenders after forking the process
  SemanticLogger.reopen
  MyMQTT.reopen if defined?(MyMQTT)
end

Inside of MyMQTT class I have:

      def create_client
        logger.info("Starting MQTT Connection (client_id=#{@client_id} " \
                    "clean_session=#{@clean_session} username=#{@username} " \
                    "will_topic=#{@will_topic})")

        PahoMqtt::Client.new(
          clean_session: @clean_session, client_id: @client_id,
          username: @username, password: @password,
          # persistent
          persistent: true, reconnect_limit: 1000, reconnect_delay: 5,
          # Keep Alives
          keep_alive: 29, ack_timeout: 120,
          # Will
          will_topic: @will_topic, will_payload: 'offline',
          will_qos: 2, will_retain: true
        )
      end

      # Method to deal with Puma workers
      def reopen
        logger.warn('Re-opening the connection...')
        @client = create_client
        @client.connect(@ip, @port)
      end

What is best practice for where to put my_mqtt.rb? Does a simple require 'my_mqtt' at the beginning of puma.rb make sense? Or is there a better way to go about it?

Davidzhu001 commented 2 years ago

is there any better way to do it?