kieker-monitoring / kieker

Kieker's main repository
Apache License 2.0
70 stars 41 forks source link

[KIEKER-1367] (Async FS) writer in blocking mode tries to shutdown but fails #2480

Open rju opened 3 days ago

rju commented 3 days ago

JIRA Issue: KIEKER-1367 (Async FS) writer in blocking mode tries to shutdown but fails Original Reporter: Andre van Hoorn


Writer configured to use Async FS with blocking queue full behavior (mode 1) and a queue size of 100000. During monitoring, the writer tries to re-add records to the queue 10 times and then initiates the shutdown. However, monitoring just continues.

Some excerpts from the log. The full log is attached. Thanks to Teerat for reporting this issue and for providing the log.

INFO [main] 2015-03-19 08:29:56,118 LogImplSLF4JLogging.java (line 85) Using Kieker's AspectJLoader. This is not recommended for multi-classloader environments such as JavaEE and OSGI. Use the additional VM parameter '-Dkieker.monitoring.skipDefaultAOPConfiguration=true'. to disable Kieker's AspectJLoader
 WARN [main] 2015-03-19 08:29:57,983 URLConfigurationSource.java (line 122) No URLs will be polled as dynamic configuration sources.
 INFO [main] 2015-03-19 08:29:57,984 URLConfigurationSource.java (line 123) To enable URLs as dynamic configuration sources, define System property archaius.configurationSource.additionalUrls or make config.properties available on classpath.
 INFO [main] 2015-03-19 08:29:58,028 DynamicPropertyFactory.java (line 271) DynamicPropertyFactory is initialized with configuration sources: com.netflix.config.ConcurrentCompositeConfiguration248e68a2
 INFO [main] 2015-03-19 08:29:58,412 LogImplSLF4JLogging.java (line 85) Loading configuration from JVM-specified location: 'kieker.monitoring.properties'
 INFO [main] 2015-03-19 08:29:58,718 LogImplSLF4JLogging.java (line 85) Current State of kieker.monitoring (1.11-SNAPSHOT) Status: 'enabled'
        Name: 'KIEKER'; Hostname: 'edge.rssreader.rssperf.emulab.net'; experimentID: '1'
JMXController: JMX disabled
RegistryController: 0 strings registered.
TimeSource: 'kieker.monitoring.timer.SystemNanoTimer'
        Time in nanoseconds (with nanoseconds precision) since Wed Dec 31 17:00:00 MST 1969'
ProbeController: disabled
WriterController:
        Number of Inserts: '0'
        Automatic assignment of logging timestamps: 'true'
Writer: 'kieker.monitoring.writer.filesystem.AsyncFsWriter'
        Configuration:
                kieker.monitoring.writer.filesystem.AsyncFsWriter.flush='true'
                kieker.monitoring.writer.filesystem.AsyncFsWriter.maxLogSize='-1'
                kieker.monitoring.writer.filesystem.AsyncFsWriter.QueueFullBehavior='1'
                kieker.monitoring.writer.filesystem.AsyncFsWriter.MaxShutdownDelay='-1'
                kieker.monitoring.writer.filesystem.AsyncFsWriter.maxEntriesInFile='25000'
                kieker.monitoring.writer.filesystem.AsyncFsWriter.bufferSize='8192'
                kieker.monitoring.writer.filesystem.AsyncFsWriter.maxLogFiles='-1'
                kieker.monitoring.writer.filesystem.AsyncFsWriter.customStoragePath='/tmp'
                kieker.monitoring.writer.filesystem.AsyncFsWriter.QueueSize='100000'
                kieker.monitoring.writer.filesystem.AsyncFsWriter.PrioritizedQueueSize='100'
        Records lost: 0
        Writer Threads (2): 
                Finished: 'false'; Writing to Directory: '/tmp/kieker-20150319-142958671-UTC-edge.rssreader.rssperf.emulab.net-KIEKER'
                Finished: 'false'; Writing to Directory: '/tmp/kieker-20150319-142958671-UTC-edge.rssreader.rssperf.emulab.net-KIEKER'
Sampling Controller: Periodic Sensor available: Poolsize: '0'; Scheduled Tasks: '0'
 INFO [main] 2015-03-19 08:29:58,721 LogImplSLF4JLogging.java (line 85) First threadId will be 3174474787342778368
INFO [hystrix-RSSThreadPool-5] 2015-03-19 08:31:57,784 LogImplSLF4JLogging.java (line 85) Received response from http://middletier2.rssreader.rssperf.emulab.net:9191/middletier/r
ss/user/user_2 responseHeaderList = [3174474787342778887,null,10]
 WARN [hystrix-RSSThreadPool-9] 2015-03-19 08:31:57,941 LogImplSLF4JLogging.java (line 101) Interupted when adding new monitoring record to queue. Try: 0
 WARN [hystrix-RSSThreadPool-9] 2015-03-19 08:31:57,942 LogImplSLF4JLogging.java (line 101) Interupted when adding new monitoring record to queue. Try: 1
 WARN [hystrix-RSSThreadPool-9] 2015-03-19 08:31:57,943 LogImplSLF4JLogging.java (line 101) Interupted when adding new monitoring record to queue. Try: 2
 WARN [hystrix-RSSThreadPool-9] 2015-03-19 08:31:57,943 LogImplSLF4JLogging.java (line 101) Interupted when adding new monitoring record to queue. Try: 3
 WARN [hystrix-RSSThreadPool-9] 2015-03-19 08:31:57,944 LogImplSLF4JLogging.java (line 101) Interupted when adding new monitoring record to queue. Try: 4
 WARN [hystrix-RSSThreadPool-9] 2015-03-19 08:31:57,944 LogImplSLF4JLogging.java (line 101) Interupted when adding new monitoring record to queue. Try: 5
 WARN [hystrix-RSSThreadPool-9] 2015-03-19 08:31:57,945 LogImplSLF4JLogging.java (line 101) Interupted when adding new monitoring record to queue. Try: 6
 WARN [hystrix-RSSThreadPool-9] 2015-03-19 08:31:57,946 LogImplSLF4JLogging.java (line 101) Interupted when adding new monitoring record to queue. Try: 7
 WARN [hystrix-RSSThreadPool-9] 2015-03-19 08:31:57,947 LogImplSLF4JLogging.java (line 101) Interupted when adding new monitoring record to queue. Try: 8
 WARN [hystrix-RSSThreadPool-9] 2015-03-19 08:31:57,947 LogImplSLF4JLogging.java (line 101) Interupted when adding new monitoring record to queue. Try: 9
ERROR [hystrix-RSSThreadPool-9] 2015-03-19 08:31:57,948 LogImplSLF4JLogging.java (line 117) Failed to add new monitoring record to queue (Finally interruped while blocked).
ERROR [hystrix-RSSThreadPool-9] 2015-03-19 08:31:57,949 LogImplSLF4JLogging.java (line 117) Error writing the monitoring data. Will terminate monitoring!
ERROR [hystrix-RSSThreadPool-9] 2015-03-19 08:31:57,951 LogImplSLF4JLogging.java (line 125) Error while trying to stop writer thread
java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1219)
        at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:340)
        at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:321)
        at kieker.monitoring.writer.AbstractAsyncThread.initShutdown(AbstractAsyncThread.java:70)
        at kieker.monitoring.writer.AbstractAsyncWriter.terminate(AbstractAsyncWriter.java:116)
        at kieker.monitoring.core.controller.WriterController.cleanup(WriterController.java:83)
        at kieker.monitoring.core.controller.AbstractController.terminate(AbstractController.java:72)
        at kieker.monitoring.core.controller.WriterController.newMonitoringRecord(WriterController.java:124)
        at kieker.monitoring.core.controller.MonitoringController.newMonitoringRecord(MonitoringController.java:260)
        at kieker.monitoring.probe.aspectj.jersey.OperationExecutionJerseyClientInterceptor.operation(OperationExecutionJerseyClientInterceptor.java:176)
        at com.sun.jersey.client.apache4.ApacheHttpClient4Handler.handle(ApacheHttpClient4Handler.java:153)
        at com.sun.jersey.api.client.Client.handle(Client.java:648)
        at com.sun.jersey.api.client.WebResource.handle(WebResource.java:680)
        at com.sun.jersey.api.client.WebResource.access$200(WebResource.java:74)
        at com.sun.jersey.api.client.WebResource$Builder.get(WebResource.java:507)
        at com.netflix.niws.client.http.RestClient.execute(RestClient.java:498)
        at com.netflix.niws.client.http.RestClient.execute(RestClient.java:418)
        at com.netflix.niws.client.http.RestClient.execute(RestClient.java:78)
        at com.netflix.client.AbstractLoadBalancerAwareClient.executeOnSingleServer(AbstractLoadBalancerAwareClient.java:192)
        at com.netflix.client.AbstractLoadBalancerAwareClient.executeWithLoadBalancer(AbstractLoadBalancerAwareClient.java:402)
        at com.netflix.recipes.rss.hystrix.GetRSSCommand.run_aroundBody0(GetRSSCommand.java:88)
        at com.netflix.recipes.rss.hystrix.GetRSSCommand$AjcClosure1.run(GetRSSCommand.java:1)
        at org.aspectj.runtime.reflect.JoinPointImpl.proceed(JoinPointImpl.java:149)
        at kieker.monitoring.probe.aspectj.operationExecution.AbstractOperationExecutionAspect.operation(AbstractOperationExecutionAspect.java:93)
        at com.netflix.recipes.rss.hystrix.GetRSSCommand.run(GetRSSCommand.java:66)
        at com.netflix.recipes.rss.hystrix.GetRSSCommand.run(GetRSSCommand.java:36)
        at com.netflix.hystrix.HystrixCommand.executeCommand(HystrixCommand.java:764)
        at com.netflix.hystrix.HystrixCommand.access$1400(HystrixCommand.java:81)
        at com.netflix.hystrix.HystrixCommand$2.call(HystrixCommand.java:706)
       at com.netflix.hystrix.strategy.concurrency.HystrixContextCallable.call(HystrixContextCallable.java:45)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
 INFO [hystrix-RSSThreadPool-9] 2015-03-19 08:31:57,952 LogImplSLF4JLogging.java (line 85) Shutting down writers.
rju commented 3 days ago

author André van Hoorn -- Fri, 3 Jul 2015 14:26:39 +0200

The following code is included in the AbstractAsyncWriter's newMonitoringRecord method:

                        case 1: // blocks when queue full
                                for (int i = 0; i < 10; i++) { // drop out if more than 10 times interrupted
                                        try {
                                                this.blockingQueue.put(monitoringRecord);
                                                return true;
                                        } catch (final InterruptedException ignore) {
                                                LOG.warn("Interupted when adding new monitoring record to queue. Try: " + i);
                                                Thread.currentThread().interrupt(); // propagate interrupt
                                        }
                                }
                                LOG.error("Failed to add new monitoring record to queue (Finally interruped while blocked).");
                                return false;

Assumption: this way, the interrupt flag is not reset, leading to the continuous interruption of subsequent calls to put.

Teerat will try whether the following change resolves the issue:

                          case 1: // blocks when queue full
                                for (int i = 0; i < 10; i++) { // drop out if more than 10 times interrupted
                                        try {
                                                this.blockingQueue.put(monitoringRecord);
                                                return true;
                                        } catch (final InterruptedException ignore) {
                                                if(Thread.currentThread().interrupted()) { // test and reset thread's interrupted status
                                                     LOG.warn("Interupted when adding new monitoring record to queue. Try: " + i);
                                                }
                                        }
                                 }
                                LOG.error("Failed to add new monitoring record to queue (Finally interruped while blocked).");
                                return false;

See http://docs.oracle.com/javase/7/docs/api/java/lang/Thread.html#interrupted%28%29

rju commented 3 days ago

author teeratpitakrat -- Sun, 5 Jul 2015 19:02:28 +0200

Replying to [avh|comment:1]:
> The following code is included in the AbstractAsyncWriter's newMonitoringRecord method:

Moved to https://kieker-monitoring.atlassian.net/ticket/1662 as it is a separate issue.

rju commented 3 days ago

author teeratpitakrat -- Mon, 6 Jul 2015 14:50:12 +0200

One reason for the writer not shutting down is because the interrupt flag of AbstractAsyncWriter is not reset https://kieker-monitoring.atlassian.net/ticket/1662. This causes an InterruptedException in the put method of AbstractAsyncThread's initShutdown while adding end-marker to the queue. As a consequence, the AbstractAsyncThread never receives the end-marker and does not shutdown. Fixing https://kieker-monitoring.atlassian.net/ticket/1662 should prevent this case from occurring.

However, there might be another rare circumstance, in which an interrupt occurs while adding an end-marker to the queue in AbstractAsyncThread's initShutdown. This may lead to the same problem that the writer does not receive the end-marker and does not terminate.

One possible fix for this would be to periodically check for the shutdown signal (i.e. shutdownLatch) in AbstractAsyncThread's run. For example, changing the following code:

Override
public final void run() {
    if (LOG.isDebugEnabled()) {
        LOG.debug(this.getClass().getName() + " running");
    }
    try {
        // making it a local variable for faster access
        final BlockingQueue<IMonitoringRecord> writeQueueLocal = this.writeQueue;
        while (true) {
            try {
                IMonitoringRecord monitoringRecord = writeQueueLocal.take();
                if (monitoringRecord == END_OF_MONITORING_MARKER) { // NOPMD (CompareObjectsWithEquals
                    ...

to

Override
public final void run() {
    if (LOG.isDebugEnabled()) {
        LOG.debug(this.getClass().getName() + " running");
    }
    try {
        // making it a local variable for faster access
        final BlockingQueue<IMonitoringRecord> writeQueueLocal = this.writeQueue;
        while (true) {
            try {
                IMonitoringRecord monitoringRecord = writeQueueLocal.poll(5, TimeUnit.SECONDS);
                if (monitoringRecord != null) {
                    if (monitoringRecord == END_OF_MONITORING_MARKER) { // NOPMD (CompareObjectsWithEquals
                        ...
                } else {
                    // Check for shutdown signal
                }