frankyaorenjie / flume-kafka

A kafka source & sink for flume
72 stars 53 forks source link

Error: java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first #4

Closed snowice closed 10 years ago

snowice commented 10 years ago

serverA kafka_2.8.0-0.8.0-beta1, serverB flume-ng1.4 server B run: flume-ng agent -c . -f flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console

flume sink config: a1.sinks.k1.type = com.vipshop.flume.sink.kafka.KafkaSink a1.sinks.k1.channel = c1 a1.sinks.k1.zkconnect = 192.168.165.39:2181 a1.sinks.k1.topic = test2 a1.sinks.k1.producer.type = async a1.sinks.k1.batchsize = 400

a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder

error :

2013-11-21 00:13:25,909 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink k1 2013-11-21 00:13:25,910 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source r1 2013-11-21 00:13:25,910 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:163)] Exec source starting with command:tail -F /var/log/maillog 2013-11-21 00:13:25,914 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:110)] Monitoried counter group for type: SOURCE, name: r1, registered successfully. 2013-11-21 00:13:25,914 (lifecycleSupervisor-1-4) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:94)] Component type: SOURCE, name: r1 started 2013-11-21 00:13:28,924 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows. java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first at com.google.common.base.Preconditions.checkState(Preconditions.java:172) at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179) at com.vipshop.flume.sink.kafka.KafkaSink.process(KafkaSink.java:45) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:722) 2013-11-21 00:13:34,931 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows. java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first at com.google.common.base.Preconditions.checkState(Preconditions.java:172) at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179) at com.vipshop.flume.sink.kafka.KafkaSink.process(KafkaSink.java:45) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:722)

frankyaorenjie commented 10 years ago

I've refactored some code last week and I think it's the root cause of this issue. Tomorrow I'll have a look on it, thanks.

frankyaorenjie commented 10 years ago

I've fixed in master branch, plz have a try.

snowice commented 10 years ago

Thanks,I'll have a try

duraiv commented 10 years ago

reduce-client-shuffle-2.2.0.jar:/home/durai/yarn/hadoop-2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar:/home/durai/yarn/hadoop-2/share/hadoop/mapreduce/lib:/home/durai/yarn/hadoop-2/share/hadoop/mapreduce/lib-examples:/home/durai/yarn/hadoop-2/share/hadoop/mapreduce/sources:/home/durai/yarn/hbase-9/conf' -Djava.library.path=:/home/durai/yarn/hadoop-2/lib/:/home/durai/yarn/hadoop-2/lib/ org.apache.flume.node.Application --conf-file /home/durai/yarn/flume-4/conf/flume.conf --name a1 log4j:ERROR Could not find value for key log4j.appender.SQL_APPENDER log4j:ERROR Could not instantiate appender named "SQL_APPENDER". 2014-03-20 17:41:05,478 [conf-file-poller-0] ERROR org.apache.flume.node.AbstractConfigurationProvider - Source r1 has been removed due to an error during configuration java.lang.IllegalStateException: The parameter command must be specified at com.google.common.base.Preconditions.checkState(Preconditions.java:145) at org.apache.flume.source.ExecSource.configure(ExecSource.java:221) at org.apache.flume.conf.Configurables.configure(Configurables.java:41) at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:331) at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102) at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 2014-03-20 17:41:06,083 [conf-file-poller-0] WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2014-03-20 17:43:36,608 [conf-file-poller-0] ERROR org.apache.flume.node.AbstractConfigurationProvider - Source r1 has been removed due to an error during configuration java.lang.IllegalStateException: The parameter command must be specified at com.google.common.base.Preconditions.checkState(Preconditions.java:145) at org.apache.flume.source.ExecSource.configure(ExecSource.java:221) at org.apache.flume.conf.Configurables.configure(Configurables.java:41) at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:331) at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102) at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 2014-03-20 17:43:36,612 [conf-file-poller-0] INFO org.apache.flume.lifecycle.LifecycleSupervisor - Stopping component: SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@320cc97f counterGroup:{ name:null counters:{runner.backoffs.consecutive=20, runner.backoffs=20} } } 2014-03-20 17:43:36,613 [conf-file-poller-0] INFO org.apache.flume.lifecycle.LifecycleSupervisor - Stopping component: org.apache.flume.channel.MemoryChannel{name: c1} 2014-03-20 17:43:37,115 [lifecycleSupervisor-1-9] ERROR org.apache.flume.instrumentation.MonitoredCounterGroup - Failed to register monitored counter group for type: SINK, name: k1 javax.management.InstanceAlreadyExistsException: org.apache.flume.sink:type=k1 at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) at org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:108) at org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:88) at org.apache.flume.sink.hdfs.HDFSEventSink.start(HDFSEventSink.java:484) at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46) at org.apache.flume.SinkRunner.start(SinkRunner.java:79) at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)

frankyaorenjie commented 10 years ago

it's too long to dig, could you plz summerize it?