Closed ce07c3 closed 5 years ago
Unfortunately, I don't think there is much we can do for this request. The AsyncPublisher and Subscriber need to be stopped in a thread safe way, and according to Ruby's signals documentation:
As with implementing signal handlers in C or most other languages, all code passed to
Signal.trap
must be reentrant... Most importantly, “thread-safety” does not guarantee reentrancy; and methods such asMutex#lock
andMutex#synchronize
which are commonly used for thread-safety even prevent reentrancy.
Subscriber#stop
does call Thread.new
, but even if Thread.new
was not called directly in the method it will raise ThreadError
regardless. Calls to thread safe methods using a mutex, or calls to Concurrent Ruby's Thread Pools will still raise.
Consider the following Ruby script:
require "concurrent"
tp = Concurrent::ThreadPoolExecutor.new
Signal.trap("INT") {
tp.shutdown
}
sleep
Running this script and then pressing CTRL+C will produce the following:
$ ruby repro-sigint.rb
^CTraceback (most recent call last):
5: from repro-sigint.rb:9:in `<main>'
4: from repro-sigint.rb:9:in `sleep'
3: from repro-sigint.rb:6:in `block in <main>'
2: from /Users/blowmage/.gem/repos/gcloud-ruby/gems/concurrent-ruby-1.1.5/lib/concurrent/executor/ruby_executor_service.rb:28:in `shutdown'
1: from /Users/blowmage/.gem/repos/gcloud-ruby/gems/concurrent-ruby-1.1.5/lib/concurrent/synchronization/mutex_lockable_object.rb:41:in `synchronize'
/Users/blowmage/.gem/repos/gcloud-ruby/gems/concurrent-ruby-1.1.5/lib/concurrent/synchronization/mutex_lockable_object.rb:41:in `synchronize': can't be called from trap context (ThreadError)
The guidance we have given before is to avoid signal traps when using concurrent libraries such as Pub/Sub and Logging and Spanner.
@blowmage I think your analysis is correct here. A subscriber cannot be stopped directly from a signal handler, given the way it is currently set up. There are probably ways to work around this. HOWEVER, I believe doing so would be the wrong approach. Here's why:
The use case, I assume (@ce07c3 please correct me if I'm wrong) is that an application has received a SIGTERM and wants to clean up, including finishing and closing any publishers and subscribers (e.g. flushing buffers, handling any remaining pending messages, etc.). This is a reasonable requirement, but it should not be implemented directly in the signal handler. This is because it is generally bad design for a signal handler to block for any significant amount of time (because of the re-entrancy issue, among others).
Instead, if an application wants to clean up in response to a SIGTERM, the application should handle the signal by putting itself into a cleanup state (for example, by setting a global flag that affects the processing in its main event loop.) Then the signal handler should return immediately and allow the application to continue running with that modified state. The application's main loop should then itself detect that its state has changed, and respond by initiating cleanup such as calling stop
on subscribers and publishers (and probably then calling wait!
to block until the cleanup has completed.) Then the application can exit normally. All this should be the responsibility of the application. Neither pubsub nor any other one library should be responsible for application cleanup and exit.
Then there’s no solution here I presume. I use at_exit
to stop the subscriber. This is what the thread pool is using in concurrent-ruby
.
I'd expect that certain flags to cease operations would be set from the stop method, as there is no other way to stop
the subscriber but to call stop!
. Hence, it's not application cleanup but getting the subscriber to stop as part of application cleanup. Doing so as part of exit handling seems like a valid use case to me, given its primary purpose it to stream process messages in the background. For precedent, as pointed out above, concurrent-ruby
does so.
I think using at_exit
is the right thing to do. With it you can stop a subscriber and wait for all the pending messages to be processed before the process exits.
subscriber = sub.listen do |msg|
# TODO: process the message
msg.ack!
end
at_exit do
subscriber.stop
subscriber.wait!
end
subscriber.start
sleep
I was under the impression that at_exit
had the same issues with spawning new threads though? I'll try it out.
AFAIK at_exit
does not have the same requirement for being reentrant. I have not seen any issues using at_exit
, unlike Signal.trap
.
@ce07c3 Did at_exit
work for you?
I'll try it out next week! Sorry for the delay here.
@ce07c3 Have you been able to verify this yet? If so, can we close this issue?
at_exit
does not shut down cleanly for me. I don't know if it's due to threads or not, I'll debug a bit more.
Seems fine, I cannot reproduce. Will reopen if I can. Thanks a lot, @blowmage and @dazuma for your help!
https://github.com/googleapis/google-cloud-ruby/blob/master/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber.rb#L140
If my code gets an interrupt and I call stop, this spawns threads. Ruby does not guarantee that mutexes or threads work at this point, leading to exit timeouts on Heroku.