ruby-concurrency / concurrent-ruby

Modern concurrency tools including agents, futures, promises, thread pools, supervisors, and more. Inspired by Erlang, Clojure, Scala, Go, Java, JavaScript, and classic concurrency patterns.
https://ruby-concurrency.github.io/concurrent-ruby/
Other
5.68k stars 418 forks source link

Shutting down TimerTask is stopping already executing tasks #691

Closed stravid closed 9 months ago

stravid commented 6 years ago

Shutting down a timer task is not waiting for already executing tasks to finish. I talked to @headius and he encouraged me to open an issue since it is not clear if this is a feature or bug.

Example code:

require "rubygems"
require "bundler/setup"
require "concurrent"

tasks = 10.times.map do |s|
  Concurrent::TimerTask.new(execution_interval: 1) do
    sleep s
    puts "s: #{s}\n"
  end
end

begin
  tasks.each { |t| t.execute }
  sleep
rescue Interrupt
  puts "Received INT"
  puts "Shutting down tasks"
  tasks.each { |t| t.shutdown }
ensure
  puts "Releasing resources"
end
headius commented 6 years ago

It seems like a bug if shutting down a TimerTask kills a timed execution that has already started. Seems like it should wait for completion, or there should be another way to say "shutdown_after_current_task_has_completed".

pitr-ch commented 6 years ago

Thank you, I'll have a look.

ajlai commented 5 years ago

The TimerTask seems to complete if you wait around long enough (not getting killed):

require 'concurrent'

t = Concurrent::TimerTask.new(execution_interval: 1) do
  puts "#{Time.now}: start task"
  sleep 5
  puts "#{Time.now}: finish task"
end

begin
  t.execute
  sleep
rescue Interrupt => e
  puts "#{Time.now}: interrupt rescued, shutting down"
  t.shutdown
  sleep 5 # Waiting 'long enough' here
end

#=> 2019-01-24 15:54:16 -0800: start task
#=> 2019-01-24 15:54:21 -0800: finish task
#=> 2019-01-24 15:54:22 -0800: start task
#=> ^C2019-01-24 15:54:24 -0800: interrupt rescued, shutting down
#=> 2019-01-24 15:54:27 -0800: finish task

there should be another way to say "shutdown_after_current_task_has_completed".

There is a method t.wait_for_termination which should block until the shutdown is totally done if it behaves the same way as threadpool#wait_for_termination (http://ruby-concurrency.github.io/concurrent-ruby/master/file.thread_pools.html#Thread_Pool_Status_and_Shutdown).

Unfortunately, it doesn't seem to be working for TimerTask (Task is still running even after wait_for_termination returns.)

jamesarosen commented 2 years ago

I would have expected Concurrent.global_timer_set.wait_for_termination to be the implementation of "wait around long enough," but it resolves as soon as the stopped_event is synchronized across the threads. That's because TimerTask#ns_shutdown_execution only blocks on synchronizing the @running variable. #execute_task doesn't record the current state in any way, so #ns_shutdown_execution can't watch for it.

jamesarosen commented 2 years ago

This is my current workaround:

module ShutdownConcurrentTasks
  def self.all(tasks, timeout)
    observers = tasks.map { |t| t.add_observer(ShutdownConcurrentTasks::Observer.new) }
    stop_at = Time.now
    Concurrent.global_timer_set.shutdown
    tasks.each(&:shutdown)
    timeout.times do
      break if observers.all? { |o| o.task_run_since?(stop_at) }
      sleep 1
    end
  end

  class Observer
    def initialize
      @last_run = Time.now
    end

    def update(time, *args)
      @last_run = time
    end

    def task_run_since?(time)
      @last_run > time
    end
  end
end

tasks = build_some_tasks

begin
  tasks.each(&:execute)
  sleep
rescue Interrupt
  ShutdownConcurrentTasks.all(tasks, 20)
ensure
  puts "Shut down all tasks"
end

That will give the tasks up to 20 seconds to finish their current run. By calling task.shutdown on each task, task.running? will be correct in the task block, which lets you build well-behaved tasks:

Concurrent::TimerTask do |task|
  10.times do
    break if !task.running?
    puts 'still alive'
    sleep 1
  end
end
bensheldon commented 1 year ago

TimerTask executes the task as a ScheduledTask on the default global executor (Concurrent.global_io_executor):

https://github.com/ruby-concurrency/concurrent-ruby/blob/9f40827be9a8a192a6993a8d157bd3ed0662ada0/lib/concurrent-ruby/concurrent/timer_task.rb#L292-L292

I'm not sure if that's a mistake when it was introduced and instead it should have the argument executor: self; though going back farther in git history I don't see a TimerTask ever making use of itself inheritting from RubyExecutorService.

So as it is today, to safely shutdown, you'll want to shutdown and wait on Concurrent.global_io_executor when an interrupt signal is trapped.

I think this could also imply:

headius commented 1 year ago

I'm not sure who is maintaining this currently but we should probably try to come to some resolution here.

bensheldon commented 1 year ago

I'm planning to make a PR. There are maintainers, but fairly quiet.

headius commented 1 year ago

I'll be watching and will help get maintenance going on this project again.

bensheldon commented 1 year ago

Not to get too offtopic, but I think this project is incredibly important and I'm personally invested in contributing because of GoodJob (and I'm interested in becoming a formal maintainer too). My colleague @matthewd is a current maintainer, so I don't think we'll be blocked on necessary/important work by absence. But forming consensus/decisions will probably need some pushing on 👍🏻

eregon commented 1 year ago

I'm not sure who is maintaining this currently

It's documented in the README: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/README.md#maintainers It's also a pretty big project, so it's maintained more on a need basis I'd say.

bensheldon commented 1 year ago

Setting executor: self is a bit spicier than I realized. While the TimerTask is RubyExecutorService it does not implement any kind of execution behavior. So I think simply providing an argument for executor: Concurrent.global_io_executor on initialize will be the path I plan to take.

eregon commented 1 year ago

I'll be watching and will help get maintenance going on this project again.

@headius Would you like to help maintain this project and e.g. reviews PRs? I would happily add you to the list of active maintainers at https://github.com/ruby-concurrency/concurrent-ruby/blob/master/README.md#maintainers (or you can do it yourself if you like). You already have the rights anyway. I should have asked you earlier when we were searching new maintainers, I missed it somehow, sorry about that.

eregon commented 9 months ago

Not to get too offtopic, but I think this project is incredibly important and I'm personally invested in contributing because of GoodJob (and I'm interested in becoming a formal maintainer too).

Given your many PRs and interest I would be delighted to add you as maintainer, I'll send the GitHub invite now.