frankyaorenjie / flume-kafka

A kafka source & sink for flume
72 stars 53 forks source link

Zkclient depency #2

Closed greatpatton closed 11 years ago

greatpatton commented 11 years ago

Hello,

It seems that the project is missing the dependency on zkclient which is not included in the pom.xml

By the way when I try the flume sink I get this error (zookeeper connection looks ok):

02 Sep 2013 15:36:45,519 ERROR conf-file-poller-0 - Sink loggerSink has been removed due to an error during configuration java.lang.NumberFormatException: null at java.lang.Integer.parseInt(Integer.java:454) at java.lang.Integer.parseInt(Integer.java:527) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:207) at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1$$anonfun$5.apply(ZKBrokerPartitionInfo.scala:167) at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1$$anonfun$5.apply(ZKBrokerPartitionInfo.scala:167) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474) at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) at scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:521) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.JavaConversions$JListWrapper.map(JavaConversions.scala:521) at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1.apply(ZKBrokerPartitionInfo.scala:167) at kafka.producer.ZKBrokerPartitionInfo$$anonfun$kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo$1.apply(ZKBrokerPartitionInfo.scala:163) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474) at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) at scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:521) at kafka.producer.ZKBrokerPartitionInfo.kafka$producer$ZKBrokerPartitionInfo$$getZKTopicPartitionInfo(ZKBrokerPartitionInfo.scala:163) at kafka.producer.ZKBrokerPartitionInfo.(ZKBrokerPartitionInfo.scala:65) at kafka.producer.Producer.(Producer.scala:47) at kafka.javaapi.producer.Producer.(Producer.scala:33) at kafka.javaapi.producer.Producer.(Producer.scala:40) at com.vipshop.flume.KafkaUtil.getProducer(KafkaUtil.java:51) at com.vipshop.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:56) at org.apache.flume.conf.Configurables.configure(Configurables.java:41) at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:418) at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103) at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724)

frankyaorenjie commented 11 years ago

zookeeper-3.3.4 is in pom.xml. I'm not sure what you mean about zkClient? If some dependencies are missing, maven cannot build jar file successfully - actually you have built it.

As for the exception, it seems like you have some mistakes in parameters in flume conf file. Would you like to paste your flume conf file here?

greatpatton commented 11 years ago

Here is my flume config file:

agent.sinks.loggerSink.type = com.vipshop.flume.sink.kafka.KafkaSink

Specify the channel the sink should use

agent.sinks.loggerSink.channel = memoryChannel agent.sinks.loggerSink.zkconnect = 10.192.57.12:2181,10.192.57.13:2181,10.192.57.14:2181,10.192.57.15:2181,10.192.57.16:2181 agent.sinks.loggerSink.topic = test4 agent.sinks.loggerSink.batchsize = 200

For ZKclient, it may be caused by the fact that I'm not compiling on the same machine than the one I run my tests. It was missing zkclient jar, and I was not able to find it on my compiling machine but I may have done a mistake.

frankyaorenjie commented 11 years ago

In my code, I only parse one zookeeper ip. Can you just use one ip for zkconnect parameter and test again?

greatpatton commented 11 years ago

I did the test and get the same exception.

On Tue, Sep 3, 2013 at 12:12 PM, Frank Yao notifications@github.com wrote:

In my code, I only parse one zookeeper ip. Can you just use one ip for zkconnect parameter and test again?

— Reply to this email directly or view it on GitHubhttps://github.com/baniuyao/flume-kafka/issues/2#issuecomment-23701974 .

frankyaorenjie commented 11 years ago

I think this is not caused by this project but by your kafka. Things are like that in http://mail-archives.apache.org/mod_mbox/kafka-users/201306.mbox/%3C51B1F9EE.2090602@gmail.com%3E

Could you reset your kafka (delete all topics on disk and restart kafka)?

greatpatton commented 11 years ago

OK found it!

I was using kafka_0.8 before moving to kafka_0.7.2 to test your code. It seems that the structure is slightly modified in zookeeper (However I was still able to use the queue). After cleaning up zookeeper structure and kafka I'm able to get a connection.

Thanks for your help!

frankyaorenjie commented 11 years ago

oh yes, this project is based on kafka-0.7.2 because 0.7.X is more mature than 0.8. Plus, we use 0.7.2 in our production environment.:)

wayne666 commented 10 years ago

how to solve the problem? Thx