Closed bergamoyat closed 8 years ago
Could you plz use ASLv2 branch for a test? Master is shortly available.
ASLv2 branch test may be not ok
Problems solved?
Not yet, the branch of ASLv2 may be lack a class or function, I'm not good at java
please attach stack here
Flume supports Kafka 0.8+ originally.
I am having trouble loading the sink.
Tried stopping kakfa and removing all topics from disk , did not help.
Flume - version flume-1.4.0
Kafka - kafka_2.8.0
Flume sink config : applog_agent.sinks = kafka applog_agent.sinks.kafka.type = com.vipshop.flume.sink.kafka.KafkaSink applog_agent.sinks.kafka.channel = C1 applog_agent.sinks.kafka.zk.connect = kafkanode:2181 applog_agent.sinks.kafka.topic = all applog_agent.sinks.kafka.batchsize = 200 applog_agent.sinks.kafka.producer.type = async
applog_agent.sinks.kafka.serializer.class = kafka.serializer.StringEncoder
Plugin lib: ls -lrt plugins.d/flume-kafka-plugin/libext total 18820 -rw-r--r-- 1 kafkausr kafkausr 604182 Nov 26 17:21 zookeeper-3.3.4.jar -rw-r--r-- 1 kafkausr kafkausr 6160791 Nov 26 17:21 scala-library.jar -rw-r--r-- 1 kafkausr kafkausr 8671416 Nov 26 17:21 scala-compiler.jar -rw-r--r-- 1 kafkausr kafkausr 2520145 Nov 26 17:21 kafka_2.8.0-0.8.0.jar -rw-r--r-- 1 kafkausr kafkausr 53244 Nov 26 17:21 jopt-simple-3.2.jar -rw-r--r-- 1 kafkausr kafkausr 64009 Nov 26 17:21 zkclient-0.3.jar -rw-r--r-- 1 kafkausr kafkausr 995968 Nov 26 17:21 snappy-java-1.0.4.1.jar -rw-r--r-- 1 kafkausr kafkausr 82123 Nov 26 17:21 metrics-core-2.2.0.jar -rw-r--r-- 1 kafkausr kafkausr 4229 Nov 26 17:21 metrics-annotation-2.2.0.jar ls -lrt plugins.d/flume-kafka-plugin/lib total 12
-rw-rw-r-- 1 kafkausr kafkausr 6884 Dec 6 06:33 flume-kafka-plugin.jar
Error in Flume log :
06 Dec 2013 11:09:10,520 INFO conf-file-poller-0-SendThread(kafkanode:2181) - Socket connection established to kafkanode/x.x.x.x:2181, initiating session 06 Dec 2013 11:09:10,536 INFO conf-file-poller-0-SendThread(kafkanode:2181) - Session establishment complete on server kafkanode/x.x.x.x:2181, sessionid = 0x42c8a2118b0004, negotiated timeout = 6000 06 Dec 2013 11:09:10,540 INFO conf-file-poller-0-EventThread - zookeeper state changed (SyncConnected) 06 Dec 2013 11:09:10,633 ERROR conf-file-poller-0 - Sink kafka has been removed due to an error during configuration java.lang.NumberFormatException: For input string: "-1, "port"" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:48) at java.lang.Integer.parseInt(Integer.java:458) at java.lang.Integer.parseInt(Integer.java:499) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207) at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at kafka.cluster.Broker$.createBroker(Broker.scala:28) at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKBrokerInfo$1.apply(ZKBrokerPartitionInfo.scala:195) at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKBrokerInfo$1.apply(ZKBrokerPartitionInfo.scala:193) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) at kafka.producer.ZKBrokerPartitionInfo.kafka$producer$ZKBrokerPartitionInfo$$getZKBrokerInfo(ZKBrokerPartitionInfo.scala:193) at kafka.producer.ZKBrokerPartitionInfo.(ZKBrokerPartitionInfo.scala:67)
at kafka.producer.Producer.(Producer.scala:47)
at kafka.javaapi.producer.Producer.(Producer.scala:33)
at kafka.javaapi.producer.Producer.(Producer.scala:40)
at com.vipshop.flume.KafkaUtil.getProducer(KafkaUtil.java:43)
at com.vipshop.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:47)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:418)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)