reiseburo / verspaetung

Verspätung is a small utility which aims to help identify delay of Kafka consumers
http://reiseburo.github.io/verspaetung/
MIT License
11 stars 2 forks source link

Verspaetung can get into an unrecoverable state with Kafka cluster errors #30

Closed rtyler closed 9 years ago

rtyler commented 9 years ago

It's still unclear what behavior in Verspaetung might be causing this but we appear to have some verspaetung workers that spin out of control and fill up disks with logs.

06:10:28.582 [Thread-1] ERROR c.g.lookout.verspaetung.KafkaPoller - Failed to fetch latest for REDACTED_TOPIC:5
java.lang.NullPointerException: Cannot invoke method earliestOrLatestOffset() on null object
        at org.codehaus.groovy.runtime.NullObject.invokeMethod(NullObject.java:88) ~[verspaetung-0.2.0-all.jar:na]
        at org.codehaus.groovy.runtime.callsite.PogoMetaClassSite.call(PogoMetaClassSite.java:45) [verspaetung-0.2.0-all.jar:na
]
        at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:45) ~[verspaetung-0.2.0-all.jar:na
]
        at org.codehaus.groovy.runtime.callsite.NullCallSite.call(NullCallSite.java:32) ~[verspaetung-0.2.0-all.jar:na]
        at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:45) ~[verspaetung-0.2.0-all.jar:na
]
        at org.codehaus.groovy.runtime.callsite.PojoMetaMethodSite.call(PojoMetaMethodSite.java:55) [verspaetung-0.2.0-all.jar:
na]
        at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:136) [verspaetung-0.2.0-all.jar:na]
        at com.github.lookout.verspaetung.KafkaPoller.latestFromLeader(KafkaPoller.groovy:120) [verspaetung-0.2.0-all.jar:na]
        at com.github.lookout.verspaetung.KafkaPoller$latestFromLeader$4.callCurrent(Unknown Source) ~[na:na]
        at com.github.lookout.verspaetung.KafkaPoller.captureLatestOffsetFor(KafkaPoller.groovy:111) [verspaetung-0.2.0-all.jar
:na]
        at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) ~[na:na]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_75]
        at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_75]
        at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:90) ~[verspaetung-0.2.0-all.jar:na]
        at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:324) ~[verspaetung-0.2.0-all.jar:na]
        at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:382) ~[verspaetung-0.2.0-a
ll.jar:na]
        at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1017) [verspaetung-0.2.0-all.jar:na]
        at org.codehaus.groovy.runtime.callsite.PogoMetaClassSite.callCurrent(PogoMetaClassSite.java:66) [verspaetung-0.2.0-all
.jar:na]
        at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callCurrent(AbstractCallSite.java:169) [verspaetung-0.2.0-all.
jar:na]
        at com.github.lookout.verspaetung.KafkaPoller$_dumpMetadata_closure1.doCall(KafkaPoller.groovy:77) ~[verspaetung-0.2.0-
all.jar:na]
        at sun.reflect.GeneratedMethodAccessor30.invoke(Unknown Source) ~[na:na]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_75]
        at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_75]
        at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:90) ~[verspaetung-0.2.0-all.jar:na]
        at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:324) ~[verspaetung-0.2.0-all.jar:na]
        at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:292) ~[verspaetung-0.2.0-a
ll.jar:na]
        at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1017) [verspaetung-0.2.0-all.jar:na]
        at org.codehaus.groovy.runtime.callsite.PogoMetaClassSite.call(PogoMetaClassSite.java:39) [verspaetung-0.2.0-all.jar:na
]
        at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:128) [verspaetung-0.2.0-all.jar:na]
        at com.github.lookout.verspaetung.KafkaPoller$_withTopicsAndPartitions_closure2$_closure6.doCall(KafkaPoller.groovy:98) [verspaetung-0.2.0-all.jar:na]
        at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) ~[na:na]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_75]
        at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_75]
        at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:90) ~[verspaetung-0.2.0-all.jar:na]
        at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:324) ~[verspaetung-0.2.0-all.jar:na]
        at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:292) ~[verspaetung-0.2.0-all.jar:na]
        at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1017) [verspaetung-0.2.0-all.jar:na]
        at groovy.lang.Closure.call(Closure.java:423) [verspaetung-0.2.0-all.jar:na]
        at groovy.lang.Closure.call(Closure.java:439) [verspaetung-0.2.0-all.jar:na]
        at org.codehaus.groovy.runtime.DefaultGroovyMethods.each(DefaultGroovyMethods.java:2027) [verspaetung-0.2.0-all.jar:na]
        at org.codehaus.groovy.runtime.DefaultGroovyMethods.each(DefaultGroovyMethods.java:2012) [verspaetung-0.2.0-all.jar:na]
        at org.codehaus.groovy.runtime.DefaultGroovyMethods.each(DefaultGroovyMethods.java:2041) [verspaetung-0.2.0-all.jar:na]
        at org.codehaus.groovy.runtime.dgm$160.invoke(Unknown Source) [verspaetung-0.2.0-all.jar:na]
        at org.codehaus.groovy.runtime.callsite.PojoMetaMethodSite$PojoMetaMethodSiteNoUnwrapNoCoerce.invoke(PojoMetaMethodSite.java:271) [verspaetung-0.2.0-all.jar:na]
        at org.codehaus.groovy.runtime.callsite.PojoMetaMethodSite.call(PojoMetaMethodSite.java:53) [verspaetung-0.2.0-all.jar:na]
        at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:120) [verspaetung-0.2.0-all.jar:na]
        at com.github.lookout.verspaetung.KafkaPoller$_withTopicsAndPartitions_closure2.doCall(KafkaPoller.groovy:96) [verspaetung-0.2.0-all.jar:na]
        at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source) ~[na:na]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_75]
        at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_75]
        at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:90) ~[verspaetung-0.2.0-all.jar:na]
        at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:324) ~[verspaetung-0.2.0-all.jar:na]
rtyler commented 9 years ago

So this error means that the consumer we have in the brokerConsumerMap inside of KafkaPoller is nulled out.

Need to investigate why that might be happening, it might be strongly correlated to a broken cluster

rtyler commented 9 years ago

Current theory is that Zookeeper had topic/partition that is/was invalid which Verspaetung was picked up and used to attempt to access an invalid consumer in the map