NetherlandsForensicInstitute / kafka-spout

Kafka consumer emitting messages as storm tuples
Apache License 2.0
103 stars 73 forks source link

Always error whenever I use this spout, java.lang.NoSuchMethodError #8

Closed jamesweb1 closed 10 years ago

jamesweb1 commented 10 years ago

I really need a Kafka-Spout to read message from kafka to my storm. I follow the wiki and get the Exception. My code is as follow:

TopologyBuilder builder = new TopologyBuilder(); Config conf = new Config(); conf.put("kafka.spout.topic", "test"); conf.put("kafka.zookeeper.connect", "127.0.0.1:2181"); conf.put("kafka.consumer.timeout.ms", 100);

    builder.setSpout("Input", new KafkaSpout(),1);
    builder.setBolt("output", new Mybolt(),1).shuffleGrouping("Input");

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("test", conf, builder.createTopology());

And I get this error [java.lang.NoSuchMethodError]:

7494 [Thread-6] INFO backtype.storm.daemon.worker - Worker has topology config {"storm.id" "test-1-1400576467", "dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "kafka.zookeeper.connect" "127.0.0.1:2181", "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "kafka.consumer.timeout.ms" 100, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/tmp/b4d2a5cc-23ce-4bc3-902c-5e44ab7bbe38", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.kryo.decorators" (), "kafka.spout.topic" "test", "topology.name" "test", "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1024 1025 1026), "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.kryo.register" nil, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.optimize" true, "topology.max.task.parallelism" nil} 7495 [Thread-6] INFO backtype.storm.daemon.worker - Worker b75052a5-2ccd-4908-bd67-450177112b66 for storm test-1-1400576467 on 39bc6b46-d5d3-473c-8fba-0ef03273b500:1024 has finished loading 7619 [Thread-22-Input] ERROR backtype.storm.util - Async loop died! java.lang.NoSuchMethodError: scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String; at kafka.utils.VerifiableProperties.getBoolean(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.ConsumerConfig.(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.ConsumerConfig.(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at nl.minvenj.nfi.storm.kafka.KafkaSpout.createConsumer(KafkaSpout.java:136) ~[kafka-spout-0.2-SNAPSHOT.jar:na] at nl.minvenj.nfi.storm.kafka.KafkaSpout.open(KafkaSpout.java:207) ~[kafka-spout-0.2-SNAPSHOT.jar:na] at backtype.storm.daemon.executor$eval5100$fn5101$fn5116.invoke(executor.clj:519) ~[na:na] at backtype.storm.util$async_loop$fn390.invoke(util.clj:431) ~[na:na] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] at java.lang.Thread.run(Thread.java:662) [na:1.6.0_26] 7620 [Thread-22-Input] ERROR backtype.storm.daemon.executor - java.lang.NoSuchMethodError: scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String; at kafka.utils.VerifiableProperties.getBoolean(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.ConsumerConfig.(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at kafka.consumer.ConsumerConfig.(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0] at nl.minvenj.nfi.storm.kafka.KafkaSpout.createConsumer(KafkaSpout.java:136) ~[kafka-spout-0.2-SNAPSHOT.jar:na] at nl.minvenj.nfi.storm.kafka.KafkaSpout.open(KafkaSpout.java:207) ~[kafka-spout-0.2-SNAPSHOT.jar:na] at backtype.storm.daemon.executor$eval5100$fn5101$fn__5116.invoke(executor.clj:519) ~[na:na] at backtype.storm.util$async_loop$fn__390.invoke(util.clj:431) ~[na:na] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] at java.lang.Thread.run(Thread.java:662) [na:1.6.0_26] 7671 [Thread-22-Input] INFO backtype.storm.util - Halting process: ("Worker died")

Process finished with exit code 1

Please give me some suggestion, thank you.

akaIDIOT commented 10 years ago

Judging from the log, the method with signature scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String; is missing. I'm not too familiar with scala itself, but I think your classpath doesn't contain something like scala-lang of scala-reflect.

Could you post your classpath information of the storm runtime?

jamesweb1 commented 10 years ago

I think you are right.When I add dependency scala-lang, it seems to work. But it comes with another problem. When I use my terminal as a publisher, I send something as follows:

[2014-05-21 10:14:35,337] INFO Accepted socket connection from /127.0.0.1:36341 (org.apache.zookeeper.server.NIOServerCnxn) [2014-05-21 10:14:35,338] INFO Client attempting to establish new session at /127.0.0.1:36341 (org.apache.zookeeper.server.NIOServerCnxn) [2014-05-21 10:14:35,362] INFO Established session 0x14617b66aa80017 with negotiated timeout 6000 for client /127.0.0.1:36341 (org.apache.zookeeper.server.NIOServerCnxn) [2014-05-21 10:14:35,666] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) test abc OneMore?

And my bolt is doing: byte[] message = (byte[])tuple.getValue(0); String tmp = new String(message); System.out.println("***Content: "+ tmp);

I can only see first message ***Content: test

And it shows the same message several times instead of showing other message like "abc", "OneMore?"

307073 [Thread-16-input] INFO backtype.storm.daemon.task - Emitting: input default [[B@52929ed4] 307074 [Thread-16-input] INFO backtype.storm.daemon.task - Emitting: input ackinit [-4311306768572226661 8851113207906713379 2] 307074 [Thread-18-output] INFO backtype.storm.daemon.executor - Processing received message source: input:2, stream: default, id: {-4311306768572226661=8851113207906713379}, [[B@52929ed4] **Content: First!!! 307074 [Thread-22-acker] INFO backtype.storm.daemon.executor - Processing received message source: input:2, stream: ack_init, id: {}, [-4311306768572226661 8851113207906713379 2] 307104 [Thread-18-output] INFO backtype.storm.daemon.executor - Processing received message source: system:-1, stream: metrics_tick, id: {}, [60] 307105 [Thread-18-output] INFO backtype.storm.daemon.task - Emitting: output metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@765141a5> [#<DataPoint [ack-count = {}]> #<DataPoint [sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint [receive = {write_pos=11, read_pos=10, capacity=1024, population=1}]> #<DataPoint [process-latency = {}]> #<DataPoint [transfer-count = {}]> #<DataPoint [execute-latency = {}]> #<DataPoint [fail-count = {}]> #<DataPoint [emit-count = {}]> #<DataPoint [execute-count = {}]>]] 307117 [Thread-20-system] INFO backtype.storm.daemon.executor - Processing received message source: system:-1, stream: metrics_tick, id: {}, [60] 307118 [Thread-20-system] INFO backtype.storm.daemon.task - Emitting: system metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@50acf452> [#<DataPoint [ack-count = {}]> #<DataPoint [GC/PSScavenge = {count=0, timeMs=0}]> #<DataPoint [memory/heap = {unusedBytes=188282928, usedBytes=177866704, maxBytes=1845493760, initBytes=129712576, virtualFreeBytes=1667627056, committedBytes=366149632}]> #<DataPoint [receive = {write_pos=5, read_pos=4, capacity=1024, population=1}]> #<DataPoint [GC/PSMarkSweep = {count=0, timeMs=0}]> #<DataPoint [fail-count = {}]> #<DataPoint [execute-latency = {}]> #<DataPoint [emit-count = {}]> #<DataPoint [newWorkerEvent = 0]> #<DataPoint [execute-count = {}]> #<DataPoint [sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint [memory/nonHeap = {unusedBytes=153904, usedBytes=52602576, maxBytes=136314880, initBytes=24313856, virtualFreeBytes=83712304, committedBytes=52756480}]> #<DataPoint [uptimeSecs = 307.176]> #<DataPoint [transfer = {write_pos=12, read_pos=12, capacity=1024, population=0}]> #<DataPoint [startTimeSecs = 1.400641911496E9]> #<DataPoint [process-latency = {}]> #<DataPoint [transfer-count = {}]>]] 307121 [Thread-22-acker] INFO backtype.storm.daemon.executor - Processing received message source: system:-1, stream: metrics_tick, id: {}, [60] 307122 [Thread-22-acker] INFO backtype.storm.daemon.task - Emitting: acker metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@adc5cdf> [#<DataPoint [ack-count = {input:ack_init=0}]> #<DataPoint [sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint [receive = {write_pos=11, read_pos=10, capacity=1024, population=1}]> #<DataPoint [process-latency = {}]> #<DataPoint [transfer-count = {}]> #<DataPoint [execute-latency = {}]> #<DataPoint [fail-count = {}]> #<DataPoint [emit-count = {}]> #<DataPoint [execute-count = {}]>]] 307282 [Thread-16-input] INFO backtype.storm.daemon.executor - Processing received message source: system:-1, stream: metrics_tick, id: {}, [60] 307282 [Thread-16-input] INFO backtype.storm.daemon.task - Emitting: input metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@9553a0b> [#<DataPoint [ack-count = {}]> #<DataPoint [sendqueue = {write_pos=11, read_pos=11, capacity=1024, population=0}]> #<DataPoint [complete-latency = {}]> #<DataPoint [receive = {write_pos=15, read_pos=14, capacity=1024, population=1}]> #<DataPoint [transfer-count = {}]> #<DataPoint [fail-count = {}]> #<DataPoint [emit-count = {}]>]] 337073 [Thread-16-input] INFO backtype.storm.daemon.executor - Processing received message source: system:-1, stream: tick, id: {}, [30] 367073 [Thread-16-input] INFO backtype.storm.daemon.executor - Processing received message source: system:-1, stream: tick, id: {}, [30] 367073 [Thread-16-input] INFO backtype.storm.daemon.task - Emitting: input default [[B@52929ed4] 367076 [Thread-16-input] INFO backtype.storm.daemon.task - Emitting: input ackinit [-2486615009280095393 -5195998769667913953 2] 367076 [Thread-18-output] INFO backtype.storm.daemon.executor - Processing received message source: input:2, stream: default, id: {-2486615009280095393=-5195998769667913953}, [[B@52929ed4] **Content: First!!!

What's wrong with my storm?

akaIDIOT commented 10 years ago

Interesting. Are the messages coming out of kafka using a different consumer? I haven't had issues getting the same message more than once, except when the spout explicitly re-emits it. It will re-emit messages if storm determines they've failed (either through explicit fail() from a bolt or from the ack() timing out. Assuming you haven't set kafka.spout.buffer.size.max to 1, the spout should be emitting other tuples as well, waiting for acks for all of those tuples.

If you're willing to share your code (at least the bolt and the building of the topology), I can try to reproduce the behavior you're seeing and maybe tackle a bug :)

akaIDIOT commented 10 years ago

@jamesweb1: is this still causing issues for you? I'd be happy to investigate a possible bug, but I can't reproduce your problems here.

akaIDIOT commented 10 years ago

Feel free to reopen the issue if the problem persists, with reproduction steps if possible.