brandonhilkert / sucker_punch

Sucker Punch is a Ruby asynchronous processing library using concurrent-ruby, heavily influenced by Sidekiq and girl_friday.
MIT License
2.65k stars 114 forks source link

Sucker Punch leaks in JRuby #181

Closed cshupp1 closed 8 years ago

cshupp1 commented 8 years ago

Consider the following rails 4.2.5 application:

Gemfile:

gem 'sucker_punch', '~> 2.0.0'

application.rb contains:

config.active_job.queue_adapter = :sucker_punch

initializer named './config/initializers/zzz_init.rb'

require 'sucker_punch/async_syntax'
TestJob.perform_now("First Job Now")
TestJob.perform_later("First Job Later")

in the jobs directory I have test_job.rb:

class TestJob < ActiveJob::Base
  queue_as :default

  def perform(*args)
    tag = args.shift
    java.lang.Thread.currentThread.setName("TestJob -- #{tag} #{Time.now}")
    puts "Work at #{Time.now}"
    sleep 1
    TestJob.perform_later()
    puts "Done working!"
  end
end

When looking in JVisualVM the most common thing to see (bringing this stock app up via 'rails server')

A Thread named "TestJob -- FirstJob Now 2016-(Some Time)"

This thread is perpetually in the 'Running' state. I also have two threads named 'TestJob -- 2016-(Some Time). Those threads are perpetually waiting on a monitor.

This is affecting me as I suspect that for the following: https://github.com/brandonhilkert/sucker_punch/issues/172

The never dying and being GCed root thread is holding a handle to every single task performed later causing a significant memory leak. Tomcat can see this on 'start/stop' of the application as it detects these threads that have never died.

I will continue to add in my TimerTasks into this small example to see if I can reproduce it and update this bug.

cshupp1 commented 8 years ago

OK making my zzz_init.rb as follows:

require 'sucker_punch/async_syntax'

java_import 'java.util.Timer' do |p, c|
  'JTimer'
end

java_import 'java.util.TimerTask' do |p, c|
  'JTimerTask'
end

class TimerTask < JTimerTask

  def set_runnable(task_lambda)
    @lamb = task_lambda
  end

  def run
    puts "Running..."
    @lamb.call
    puts "Done!"
  end

end

module ActiveJob
  module QueueAdapters
    class SuckerPunchAdapter
      class << self
        def enqueue_at(job, timestamp) #:nodoc:
          time = java.util.Date.new(timestamp * 1000)
          timer = JTimer.new(job.to_s)
          timer_task = TimerTask.new
          timer_task.set_runnable(->{JobWrapper.new.async.perform job.serialize })
          timer.java_send(:schedule,[JTimerTask.java_class, java.util.Date.java_class],timer_task, time)
        end
      end
    end
  end
end

TestJob.perform_now("First Job Now")
#TestJob.perform_later("First Job Later")

And modifying test_job.rb as follows:

class TestJob < ActiveJob::Base
  queue_as :default

  def perform(*args)
    tag = args.shift
    java.lang.Thread.currentThread.setName("TestJob -- #{tag} #{Time.now}")
    puts "Work at #{Time.now}"
    sleep 1
    TestJob.set(wait: 1.seconds).perform_later()
    puts "Done working!"
  end
end

Causes every single Thread executed by a set (wait:) to leak. I am assuming this is due to the parent never being released.

cshupp1 commented 8 years ago

OK, this might be all my fault. I will post more after more testing.

brandonhilkert commented 8 years ago

No problem, let us know!

cshupp1 commented 8 years ago

OK, here is my latest sucker_punch initializer:

require './app/jobs/prisme_base_job'
Dir['./app/jobs/*.rb'].each { |file| require file }

require 'sucker_punch/async_syntax'

module ActiveJob
  module QueueAdapters
    class SuckerPunchAdapter

      class ThreadFactory
        include java.util.concurrent.ThreadFactory
        include Singleton

        def newThread(lambda)
          #@threads ||= []
          t = java.lang.Thread.new(lambda, "Prisme Scheduled Pool #{Time.now}" )
         # @threads << t
          t
        end

        def destroy
          # @threads.each do |thread|
          #   thread.stop
          #   $log.info("Thread #{thread} has been brutally stopped!")
          # end
          # @threads.clear
        end

      end

      class << self
        def enqueue_at(job, timestamp)
          $log.info("Here I am!")
          java.lang.Thread.currentThread.setName("Prisme Scheduled Job #{job}")
          @@scheduler ||= java.util.concurrent.Executors.newScheduledThreadPool(20, ThreadFactory.instance)
          time_delay = [timestamp - Time.now.to_i, 0].max #java source code (for one impl) does this already, but to be safe...)
          @@scheduler.schedule(-> { JobWrapper.new.async.perform job.serialize }, time_delay, java.util.concurrent.TimeUnit::SECONDS);
        end

        def shutdown_scheduler
          begin
            $log.info("Preparing to shutdown future scheduler")
            @@scheduler.shutdown
            bool = @@scheduler.awaitTermination(12, java.util.concurrent.TimeUnit::SECONDS)
            unless bool
              naughty_tasks = @@scheduler.shutdownNow.to_a
              $log.warn("The scheduler had to be stopped via shutdownNow.")
              $log.warn("#{naughty_tasks}")
            end
            #ThreadFactory.instance.destroy
            $log.info("Scheduler stopped!")
          rescue => ex
            $log.error("I could not shut down the scheduler for active job. #{ex}")
            $log.error(ex.backtrace.join("\n"))
          end
        end
      end
    end
  end
end

at_exit do
  ActiveJob::QueueAdapters::SuckerPunchAdapter.shutdown_scheduler
end

unless ($rake || defined?(Rails::Generators))
  #schedule the PrismeCleanupJob to run now, synchronously in the current thread.
  PrismeCleanupJob.perform_now(true)
end

My previous impl was creating a new Timer, despite the fact that Timers can be reused. I wasn't properly destroying those Timers via a shut down hook either, hence my leak.

Regarding your Job threads hanging around in a wait state... I got one of our more current Java guys (been in the land of Rails for a while now) to take a look. The ActiveJob (or your impl) maps things to some of the newer concurrency APIs in Java and that behavior wasn't at all alarming to him the way it was to me.

Thanks for the response. It is nice to know my library is still being watched.

brandonhilkert commented 8 years ago

Thanks for the detailed follow up. Hopefully it's useful for the next person that come along. 👍🏼

On Wednesday, July 20, 2016, Cris Shupp notifications@github.com wrote:

OK, here is my latest sucker_punch initializer:

require './app/jobs/prisme_base_job' Dir['./app/jobs/*.rb'].each { |file| require file }

require 'sucker_punch/async_syntax'

module ActiveJob module QueueAdapters class SuckerPunchAdapter

  class ThreadFactory
    include java.util.concurrent.ThreadFactory
    include Singleton

    def newThread(lambda)
      #@threads ||= []
      t = java.lang.Thread.new(lambda, "Prisme Scheduled Pool #{Time.now}" )
     # @threads << t
      t
    end

    def destroy
      # @threads.each do |thread|
      #   thread.stop
      #   $log.info("Thread #{thread} has been brutally stopped!")
      # end
      # @threads.clear
    end

  end

  class << self
    def enqueue_at(job, timestamp)
      $log.info("Here I am!")
      java.lang.Thread.currentThread.setName("Prisme Scheduled Job #{job}")
      @@scheduler ||= java.util.concurrent.Executors.newScheduledThreadPool(20, ThreadFactory.instance)
      time_delay = [timestamp - Time.now.to_i, 0].max #java source code (for one impl) does this already, but to be safe...)
      @@scheduler.schedule(-> { JobWrapper.new.async.perform job.serialize }, time_delay, java.util.concurrent.TimeUnit::SECONDS);
    end

    def shutdown_scheduler
      begin
        $log.info("Preparing to shutdown future scheduler")
        @@scheduler.shutdown
        bool = @@scheduler.awaitTermination(12, java.util.concurrent.TimeUnit::SECONDS)
        unless bool
          naughty_tasks = @@scheduler.shutdownNow.to_a
          $log.warn("The scheduler had to be stopped via shutdownNow.")
          $log.warn("#{naughty_tasks}")
        end
        #ThreadFactory.instance.destroy
        $log.info("Scheduler stopped!")
      rescue => ex
        $log.error("I could not shut down the scheduler for active job. #{ex}")
        $log.error(ex.backtrace.join("\n"))
      end
    end
  end
end

end end

at_exit do ActiveJob::QueueAdapters::SuckerPunchAdapter.shutdown_scheduler end

unless ($rake || defined?(Rails::Generators))

schedule the PrismeCleanupJob to run now, synchronously in the current thread.

PrismeCleanupJob.perform_now(true) end

My previous impl was creating a new Timer, despite the fact that Timers can be reused. I wasn't properly destroying those Timers via a shut down hook either, hence my leak.

Regarding your Job threads hanging around in a wait state... I got one of our more current Java guys (been in the land of Rails for a while now) to take a look. The ActiveJob (or your impl) maps things to some of the newer concurrency APIs in Java and that behavior wasn't at all alarming to him the way it was to me.

Thanks for the response. It is nice to know my library is still being watched.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/brandonhilkert/sucker_punch/issues/181#issuecomment-233978228, or mute the thread https://github.com/notifications/unsubscribe-auth/AAtbFECu432j3RSlGE-udF6QSAModPzMks5qXjlcgaJpZM4JP1j3 .


http://brandonhilkert.com