spring-attic / spring-integration-kafka

Apache License 2.0
324 stars 180 forks source link

Possible Memory Leak ? #10

Closed a-marcel closed 9 years ago

a-marcel commented 9 years ago

If it possible that the kafka.consumer.ZookeeperConsumerConnector watcher executor thread is started but not stopped ?

It get following in my log:

[frontend] INFO    2015-01-06 01:16:14,504 [default_XXXXXX-1420503374061-2286bb4a_watcher_executor] kafka.consumer.ZookeeperConsumerConnector   - [default_XXXXX-1420503374061-2286bb4a], starting watcher executor thread for consumer default_XXXXXX-1420503374061-2286bb4a

And if i shutdown my tomcat 8:

Exception in thread "default_XXXXXX-1420503374061-2286bb4a_watcher_executor" java.lang.IllegalStateException: Can't overwrite cause with java.lang.IllegalStateException: Illegal access: this web application instance has been stopped already.  Could not load kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1$$anonfun$run$4.  The eventual following stack trace is caused by an error thrown for debugging purposes as well as to attempt to terminate the thread which caused the illegal access, and has no functional impact.
    at java.lang.Throwable.initCause(Throwable.java:457)
    at org.apache.catalina.loader.WebappClassLoaderBase.checkStateForClassLoading(WebappClassLoaderBase.java:1306)
    at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1186)
    at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1147)
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:360)
Caused by: java.lang.ClassNotFoundException
    at org.apache.catalina.loader.WebappClassLoaderBase.checkStateForClassLoading(WebappClassLoaderBase.java:1305)
    ... 3 more

I use the Milestone 2 from the Maven Repo

Thanks

a-marcel commented 9 years ago

This happens, if i connect to a kafka topic, which one is not available.

And an other log message:

SCHWERWIEGEND: The web application [] created a ThreadLocal with key of type [scala.util.DynamicVariable$$anon$1] (value [scala.util.DynamicVariable$$anon$1@7def8249]) and a value of type [scala.Some] (value [Some([1.97] failure: end of input

{"version":1,"subscription":{"YYY":1},"pattern":"static","timestamp":"1420504423575"}
                                                                                                ^)]) but failed to remove it when the web application was stopped. Threads are going to be renewed over time to try and avoid a probable memory leak.
Jan 06, 2015 1:34:10 AM org.apache.catalina.loader.WebappClassLoaderBase checkThreadLocalMapForLeaks
SCHWERWIEGEND: The web application [] created a ThreadLocal with key of type [scala.util.DynamicVariable$$anon$1] (value [scala.util.DynamicVariable$$anon$1@3ff05a6a]) and a value of type [scala.None$] (value [None]) but failed to remove it when the web application was stopped. Threads are going to be renewed over time to try and avoid a probable memory leak.
artembilan commented 9 years ago

Hi @marcelalburg !

Looking to the Kafka source code we have: https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L339 where that internal Thread relies on the isShuttingDown state, which is changed from the shutdown() to die.

The Spring Integration Kafka does something like this: KafkaConsumerContext.destroy() -> ConsumerConfiguration.getConsumerConnector().shutdown() -> ZookeeperConsumerConnector.shutdown().

Since KafkaConsumerContext is DisposableBean, its destroy() should be invoked on the ApplicationContex.close().

You can observe server logs for the message like this:

Invoking destroy() on bean with name ...

when you switch on DEBUG for the org.springframework.beans.factory.support category.

Of course, if you do that manually, you have to take care about that ZookeeperConsumerConnector.shutdown() yourself.

Let us know how it is for you now. I don't see any issues in the Framework code.

a-marcel commented 9 years ago

Hello,

thanks for the fast anwser.

i created an example project wich shows the error on my system.

https://github.com/marcelalburg/spring-integration-kafka-memory-leak

you can see the debug log in the Readme.md

Thanks

artembilan commented 9 years ago

Thank you for such a comprehensive feedback!

What I see in your code:

  1. The inputFromKafka is a DirectChannel without any subscriber for that. Just make it as a <queue> and we get rid off with some issues.
  2. How does it work if you don't use task-executor="kafkaTaskExecutor" for the <poller>?
  3. Whould you mind minimizing your app more (without MVC, Spring Session etc.) to localize the issue with SI Kafka adapters (if it is there, of course).

I need some time to build something similar with Spring Boot to see how does it work with Embedded Tomcat...

a-marcel commented 9 years ago

Hey,

i just copy some code lines to this example project and because of that, threre a some unnecessary lines of code.

But i changed the github repo and remove (hopefully) all unnecessary dependencies and code lines (but i connot remove the webmvc).

Thanks for your help.

artembilan commented 9 years ago

No problem! I've just composed some simple test-case with Spring Boot and I really see some issue on the stop:

15:30:00.693 WARN [Thread-1][org.springframework.beans.factory.support.DisposableBeanAdapter] Invocation of destroy method failed on bean with name 'consumerContext': java.lang.ExceptionInInitializerError

Where the roo is:

java.lang.IllegalStateException: Shutdown in progress

Unfortunatelly I don't see more info in the logs, but when I inject KafkaConsumerContext to the test-method and destroy() it manually in the end, the Tomcat stops without any issues. Maybe it is really an issue with ClassLoader...

Investigation how to see more in the logs with Spring Boot...

a-marcel commented 9 years ago

What did you mean exactly? Because i try this too (take a look in https://github.com/marcelalburg/spring-integration-kafka-memory-leak/blob/master/src/main/java/org/example/kafkamemoryleak/listener/KafkaConsumerStarter.java#L38) but doesn't works for me - because of that i comment it out.

artembilan commented 9 years ago

Well, after switching on logging.level.org.springframework=DEBUG I see this in logs:

15:37:27.033 WARN [Thread-1][org.springframework.beans.factory.support.DisposableBeanAdapter] Invocation of destroy method failed on bean with name 'consumerContext' java.lang.ExceptionInInitializerError at kafka.metrics.KafkaMetricsGroup$class.newGauge(KafkaMetricsGroup.scala:43) at kafka.server.AbstractFetcherManager.newGauge(AbstractFetcherManager.scala:29) at kafka.server.AbstractFetcherManager.(AbstractFetcherManager.scala:36) at kafka.consumer.ConsumerFetcherManager.(ConsumerFetcherManager.scala:40) at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:151) at kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:115) at kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:65) at kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:67) at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100) at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) at org.springframework.integration.kafka.support.ConsumerConnectionProvider.getConsumerConnector(ConsumerConnectionProvider.java:34) at org.springframework.integration.kafka.support.ConsumerConfiguration.getConsumerConnector(ConsumerConfiguration.java:220) at org.springframework.integration.kafka.support.KafkaConsumerContext.destroy(KafkaConsumerContext.java:85) at org.springframework.beans.factory.support.DisposableBeanAdapter.destroy(DisposableBeanAdapter.java:258) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroyBean(DefaultSingletonBeanRegistry.java:538) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingleton(DefaultSingletonBeanRegistry.java:514) at org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingleton(DefaultListableBeanFactory.java:831) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingletons(DefaultSingletonBeanRegistry.java:483) at org.springframework.context.support.AbstractApplicationContext.destroyBeans(AbstractApplicationContext.java:923) at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:897) at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.doClose(EmbeddedWebApplicationContext.java:141) at org.springframework.context.support.AbstractApplicationContext$1.run(AbstractApplicationContext.java:811) Caused by: java.lang.IllegalStateException: Shutdown in progress at java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66) at java.lang.Runtime.addShutdownHook(Runtime.java:211) at com.yammer.metrics.Metrics.(Metrics.java:21) ... 22 more

But looks like it isn't related to your concern...

From other side does it make so much sence if you see that blocking Thread issue only on the Tomcat shutdown?

a-marcel commented 9 years ago

First, i like to develop without memory leaks ;) and at this time, i develop a new application and i've to start and stop the tomcat quite a lot ... and normaly, the tomcat shutdown takes half a second, but with the memory leak, it takes more than 10 seconds. it's not so nice.

Do you have an workaround ? That's ok for me .. for the developmenttime.

artembilan commented 9 years ago

Would you mind sharing this in the Kakfa mailing list (http://kafka.apache.org/contact.html) to involve more people? I was able to run some app in the standalong Tomcat 8 and see only this isue:

06-Jan-2015 18:09:00.128 WARNING [localhost-startStop-2] org.apache.catalina.loader.WebappClassLoaderBase.clearReferencesThreads The web application [kafka_gh10-0.5.0] appears to have started a thread named [localhost-startStop-2-SendThread(localhost:2181)] but has failed to stop it. This is very likely to create a memory leak. Stack trace of thread: java.lang.Thread.sleep(Native Method) org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1261) org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1211)

From other side: share please an expiriance how to configure Tomcat to get more logs, like in your StackTrace shown above.

Thanks you!

a-marcel commented 9 years ago

Hello,

i just import this project into my eclipse and configure the deployment properties and run it on a local Tomcat (configured in Eclipse).

The default settings are the "verbose" logging.

I'll write an E-Mail to this list and link this email to this issuse.

Thanks

artembilan commented 9 years ago

Well, thanks for more info. Now I can reproduce it with Tomcat Plugin from IDEA. But I don't see the same with the Embedded and Standalone one...

a-marcel commented 9 years ago

Hi, i get one answer from the list.


Please call shutdown on https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java#L69 in cleanup method and also find out who is staring "kafkaTaskExecutor-1-SendThread" thread and interrupt or kill it in destroy method. It does not look like a Kafka thread. It seems to be your application thread problem. Since, this is not a daemon thread (and from the executor thread pool). I think your tomcat is not even shutting down gracefully. Most likely, this is not a Kafka related bug.

Thanks,

Bhavesh

But i think, that you (spring-integration-kafka) is calling shutdown on the connector ....

Marcel

garyrussell commented 9 years ago

Merged to master

a-marcel commented 9 years ago

Hey,

i saw this patch and try it - the tomcat stops normal without the errors above. Great work.

Now i've just a small issue:

starting ...
[frontend] INFO    2015-01-08 22:30:44,529 [default_Marcels-MacBook-Pro.local-1420752644064-c4f47dde_watcher_executor] kafka.consumer.ZookeeperConsumerConnector   - [default_Marcels-MacBook-Pro.local-1420752644064-c4f47dde], starting watcher executor thread for consumer default_Marcels-MacBook-Pro.local-1420752644064-c4f47dde

stopping....

WARNUNG: The web application [adminfrontend] appears to have started a thread named [default_Marcels-MacBook-Pro.local-1420752644064-c4f47dde_watcher_executor] but has failed to stop it. This is very likely to create a memory leak. Stack trace of thread:
 sun.misc.Unsafe.park(Native Method)
 java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:348)
...
Exception: java.lang.IllegalStateException thrown from the UncaughtExceptionHandler in thread "kafkaTaskExecutor-1-SendThread(10.0.1.5:2181)"
Jan 08, 2015 10:30:53 PM org.apache.coyote.AbstractProtocol stop
INFORMATION: Stopping ProtocolHandler ["ajp-nio-8010"]
Jan 08, 2015 10:30:54 PM org.apache.coyote.AbstractProtocol destroy
INFORMATION: Destroying ProtocolHandler ["http-nio-8085"]
Jan 08, 2015 10:30:54 PM org.apache.coyote.AbstractProtocol destroy
INFORMATION: Destroying ProtocolHandler ["ajp-nio-8010"]
Exception in thread "default_Marcels-MacBook-Pro.local-1420752644064-c4f47dde_watcher_executor" java.lang.IllegalStateException: Can't overwrite cause with java.lang.IllegalStateException: Illegal access: this web application instance has been stopped already.  Could not load kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1$$anonfun$run$4.  The eventual following stack trace is caused by an error thrown for debugging purposes as well as to attempt to terminate the thread which caused the illegal access, and has no functional impact.
    at java.lang.Throwable.initCause(Throwable.java:457)
    at org.apache.catalina.loader.WebappClassLoaderBase.checkStateForClassLoading(WebappClassLoaderBase.java:1306)
    at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1186)
    at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1147)
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:360)
Caused by: java.lang.ClassNotFoundException
    at org.apache.catalina.loader.WebappClassLoaderBase.checkStateForClassLoading(WebappClassLoaderBase.java:1305)
    ... 3 more

Thanks for the great work Marcel

artembilan commented 9 years ago

@marcelalburg , do you need something else from us on the matter? Do be honest we use this our spring-integration-kafka in the Spring XD extensively and there is no yet any concerns around it.

Let us know how to be with issue. Can we just close it as resolved ?

a-marcel commented 9 years ago

Yeah, you can close it. Thanks.