sclasen / akka-kafka

185 stars 62 forks source link

Question : I try to use it but nothing ... #26

Closed richiesgr closed 9 years ago

richiesgr commented 9 years ago

Hi

I've try to use it but I can't get it working I mean that I get message from kafka only at the this time then I enter in this mode: [test-akka.actor.default-dispatcher-3] INFO com.sclasen.akka.kafka.ConnectorFSM - at=created-streams [test-akka.actor.default-dispatcher-3] INFO akka.actor.ActorSystemImpl - at=consumer-started [test-akka.actor.default-dispatcher-5] INFO com.sclasen.akka.kafka.ConnectorFSM - at=transition from=Receiving to=Committing uncommitted=0 [test-akka.actor.default-dispatcher-5] WARN com.sclasen.akka.kafka.ConnectorFSM - state=Committing msg=StateTimeout drained=1 streams=1 [test-akka.actor.default-dispatcher-5] WARN com.sclasen.akka.kafka.ConnectorFSM - state=Committing msg=StateTimeout drained=1 streams=1 [test-akka.actor.default-dispatcher-2] WARN com.sclasen.akka.kafka.ConnectorFSM - state=Committing msg=StateTimeout drained=1 streams=1

And don't get nothing anymore. I've try to play with CommitConf without success. Could you help me My code is Main () { val system = ActorSystem("test")

val senderRequest = system.actorOf(Props[SenderRequest]) val consumerProps = AkkaConsumerProps.forSystem( system = system, zkConnect = "localhost:2181", topic = "HTTP", group = "1", streams = 1, //one per partition keyDecoder = new DefaultDecoder(), msgDecoder = new DefaultDecoder(), receiver = senderRequest, commitConfig = new CommitConfig(Some(10 seconds), Some(1), Timeout(10 second)) )

val consumer = new AkkaConsumer(consumerProps) consumer.start() }

class SenderRequest extends Actor with ActorLogging { override def receive: Actor.Receive = { case data : Any => log.info("RECEIVE HERE !!!!!!!!!!!!!!!") sender() ! StreamFSM.Processed } }

Thanks

sclasen commented 9 years ago

Hmm, perhaps there is an off by one error. What happens if you use

commitConfig = new CommitConfig(Some(10 seconds), Some(10), Timeout(10 second))

also, is there any process sending messages to kafka?

sclasen commented 9 years ago

also, what version are you using?

richiesgr commented 9 years ago

Thanks for super quick help !!

using 0.9 from maven repo try your commitConfig don't help

here is my log with debug [test-akka.actor.default-dispatcher-6] INFO com.sclasen.akka.kafka.ConnectorFSM - at=created-streams 2015-01-13 19:21:51 DEBUG ClientCnxn:839 - Reading reply sessionid:0x14ae3aafd0c0028, packet:: clientPath:null serverPath:null finished:false header:: 16,4 replyHeader:: 16,453,0 request:: '/brokers/ids/0,F response:: #7b226a6d785f706f7274223a2d312c2274696d657374616d70223a2231343231313538383536393333222c22686f7374223a223139322e3136382e35302e3731222c2276657273696f6e223a312c22706f7274223a393039327d,s{218,218,1421158856939,1421158856939,0,0,0,93137065843884032,90,0,218} [test-akka.actor.default-dispatcher-6] INFO akka.actor.ActorSystemImpl - at=consumer-started 2015-01-13 19:21:51 INFO VerifiableProperties:68 - Verifying properties 2015-01-13 19:21:51 INFO VerifiableProperties:68 - Property client.id is overridden to 1 2015-01-13 19:21:51 INFO VerifiableProperties:68 - Property metadata.broker.list is overridden to 192.168.50.71:9092 2015-01-13 19:21:51 INFO VerifiableProperties:68 - Property request.timeout.ms is overridden to 30000 2015-01-13 19:21:51 INFO ClientUtils$:68 - Fetching metadata from broker id:0,host:192.168.50.71,port:9092 with correlation id 0 for 1 topic(s) Set(HTTP) 2015-01-13 19:21:51 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 530904 (requested -1), SO_SNDBUF = 102400 (requested 102400). 2015-01-13 19:21:51 INFO SyncProducer:68 - Connected to 192.168.50.71:9092 for producing 2015-01-13 19:21:51 INFO SyncProducer:68 - Disconnecting from 192.168.50.71:9092 2015-01-13 19:21:51 DEBUG ClientUtils$:52 - Successfully fetched metadata for 1 topic(s) Set(HTTP) 2015-01-13 19:21:51 DEBUG ConsumerFetcherManager$LeaderFinderThread:52 - [1_richard-Latitude-E6440-1421169710915-2dd7b19d-leader-finder-thread], {TopicMetadata for topic HTTP -> Metadata for partition [HTTP,0] is partition 0 leader: 0 (192.168.50.71:9092) replicas: 0 (192.168.50.71:9092) isr: 0 (192.168.50.71:9092) isUnderReplicated: false} 2015-01-13 19:21:51 INFO ConsumerFetcherThread:68 - [ConsumerFetcherThread-1_richard-Latitude-E6440-1421169710915-2dd7b19d-0-0], Starting 2015-01-13 19:21:51 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 1313280 (requested -1). 2015-01-13 19:21:51 DEBUG PartitionTopicInfo:52 - reset fetch offset of ( HTTP:0: fetched offset = 20996: consumed offset = -1 ) to 20996 2015-01-13 19:21:51 DEBUG PartitionTopicInfo:52 - reset consume offset of HTTP:0: fetched offset = 20996: consumed offset = 20996 to 20996 2015-01-13 19:21:51 INFO ConsumerFetcherManager:68 - [ConsumerFetcherManager-1421169710957] Added fetcher for partitions ArrayBuffer([[HTTP,0], initOffset -1 to broker id:0,host:192.168.50.71,port:9092] ) 2015-01-13 19:21:53 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 1ms 2015-01-13 19:21:55 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 1ms 2015-01-13 19:21:57 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 1ms 2015-01-13 19:21:59 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 0ms 2015-01-13 19:22:01 DEBUG PartitionTopicInfo:52 - updated fetch offset of (HTTP:0: fetched offset = 20997: consumed offset = 20996) to 20997 2015-01-13 19:22:01 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 0ms [test-akka.actor.default-dispatcher-2] INFO com.sclasen.akka.kafka.ConnectorFSM - at=transition from=Receiving to=Committing uncommitted=0 [test-akka.actor.default-dispatcher-4] WARN com.sclasen.akka.kafka.ConnectorFSM - state=Committing msg=StateTimeout drained=1 streams=1

sclasen commented 9 years ago

OOh, sorry I think 0.0.9 is broken. .Can you try 0.0.10-SNAPSHOT from oss.sonatype. I am publishing 0.0.10 right now!!!

On Tue, Jan 13, 2015 at 9:23 AM, richiesgr notifications@github.com wrote:

Thanks for super quick help !!

using 0.9 from maven repo try your commitConfig don't help

here is my log with debug [test-akka.actor.default-dispatcher-6] INFO com.sclasen.akka.kafka.ConnectorFSM - at=created-streams 2015-01-13 19:21:51 DEBUG ClientCnxn:839 - Reading reply sessionid:0x14ae3aafd0c0028, packet:: clientPath:null serverPath:null finished:false header:: 16,4 replyHeader:: 16,453,0 request:: '/brokers/ids/0,F response::

7b226a6d785f706f7274223a2d312c2274696d657374616d70223a2231343231313538383536393333222c22686f7374223a223139322e3136382e35302e3731222c2276657273696f6e223a312c22706f7274223a393039327d,s{218,218,1421158856939,1421158856939,0,0,0,93137065843884032,90,0,218}

[test-akka.actor.default-dispatcher-6] INFO akka.actor.ActorSystemImpl - at=consumer-started 2015-01-13 19:21:51 INFO VerifiableProperties:68 - Verifying properties 2015-01-13 19:21:51 INFO VerifiableProperties:68 - Property client.id is overridden to 1 2015-01-13 19:21:51 INFO VerifiableProperties:68 - Property metadata.broker.list is overridden to 192.168.50.71:9092 2015-01-13 19:21:51 INFO VerifiableProperties:68 - Property request.timeout.ms is overridden to 30000 2015-01-13 19:21:51 INFO ClientUtils$:68 - Fetching metadata from broker id:0,host:192.168.50.71,port:9092 with correlation id 0 for 1 topic(s) Set(HTTP) 2015-01-13 19:21:51 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 530904 (requested -1), SO_SNDBUF = 102400 (requested 102400). 2015-01-13 19:21:51 INFO SyncProducer:68 - Connected to 192.168.50.71:9092 for producing 2015-01-13 19:21:51 INFO SyncProducer:68 - Disconnecting from 192.168.50.71:9092 2015-01-13 19:21:51 DEBUG ClientUtils$:52 - Successfully fetched metadata for 1 topic(s) Set(HTTP) 2015-01-13 19:21:51 DEBUG ConsumerFetcherManager$LeaderFinderThread:52 - [1_richard-Latitude-E6440-1421169710915-2dd7b19d-leader-finder-thread], {TopicMetadata for topic HTTP -> Metadata for partition [HTTP,0] is partition 0 leader: 0 ( 192.168.50.71:9092) replicas: 0 (192.168.50.71:9092) isr: 0 ( 192.168.50.71:9092) isUnderReplicated: false} 2015-01-13 19:21:51 INFO ConsumerFetcherThread:68 - [ConsumerFetcherThread-1_richard-Latitude-E6440-1421169710915-2dd7b19d-0-0], Starting 2015-01-13 19:21:51 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 1313280 (requested -1). 2015-01-13 19:21:51 DEBUG PartitionTopicInfo:52 - reset fetch offset of ( HTTP:0: fetched offset = 20996: consumed offset = -1 ) to 20996 2015-01-13 19:21:51 DEBUG PartitionTopicInfo:52 - reset consume offset of HTTP:0: fetched offset = 20996: consumed offset = 20996 to 20996 2015-01-13 19:21:51 INFO ConsumerFetcherManager:68 - [ConsumerFetcherManager-1421169710957] Added fetcher for partitions ArrayBuffer([[HTTP,0], initOffset -1 to broker id:0,host:192.168.50.71,port:9092] ) 2015-01-13 19:21:53 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 1ms 2015-01-13 19:21:55 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 1ms 2015-01-13 19:21:57 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 1ms 2015-01-13 19:21:59 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 0ms 2015-01-13 19:22:01 DEBUG PartitionTopicInfo:52 - updated fetch offset of (HTTP:0: fetched offset = 20997: consumed offset = 20996) to 20997 2015-01-13 19:22:01 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 0ms [test-akka.actor.default-dispatcher-2] INFO com.sclasen.akka.kafka.ConnectorFSM - at=transition from=Receiving to=Committing uncommitted=0 [test-akka.actor.default-dispatcher-4] WARN com.sclasen.akka.kafka.ConnectorFSM - state=Committing msg=StateTimeout drained=1 streams=1

— Reply to this email directly or view it on GitHub https://github.com/sclasen/akka-kafka/issues/26#issuecomment-69782516.

sclasen commented 9 years ago

0.0.10 should be live in just a few minutes

On Tue, Jan 13, 2015 at 9:24 AM, Scott Clasen scott@heroku.com wrote:

OOh, sorry I think 0.0.9 is broken. .Can you try 0.0.10-SNAPSHOT from oss.sonatype. I am publishing 0.0.10 right now!!!

On Tue, Jan 13, 2015 at 9:23 AM, richiesgr notifications@github.com wrote:

Thanks for super quick help !!

using 0.9 from maven repo try your commitConfig don't help

here is my log with debug [test-akka.actor.default-dispatcher-6] INFO com.sclasen.akka.kafka.ConnectorFSM - at=created-streams 2015-01-13 19:21:51 DEBUG ClientCnxn:839 - Reading reply sessionid:0x14ae3aafd0c0028, packet:: clientPath:null serverPath:null finished:false header:: 16,4 replyHeader:: 16,453,0 request:: '/brokers/ids/0,F response::

7b226a6d785f706f7274223a2d312c2274696d657374616d70223a2231343231313538383536393333222c22686f7374223a223139322e3136382e35302e3731222c2276657273696f6e223a312c22706f7274223a393039327d,s{218,218,1421158856939,1421158856939,0,0,0,93137065843884032,90,0,218}

[test-akka.actor.default-dispatcher-6] INFO akka.actor.ActorSystemImpl - at=consumer-started 2015-01-13 19:21:51 INFO VerifiableProperties:68 - Verifying properties 2015-01-13 19:21:51 INFO VerifiableProperties:68 - Property client.id is overridden to 1 2015-01-13 19:21:51 INFO VerifiableProperties:68 - Property metadata.broker.list is overridden to 192.168.50.71:9092 2015-01-13 19:21:51 INFO VerifiableProperties:68 - Property request.timeout.ms is overridden to 30000 2015-01-13 19:21:51 INFO ClientUtils$:68 - Fetching metadata from broker id:0,host:192.168.50.71,port:9092 with correlation id 0 for 1 topic(s) Set(HTTP) 2015-01-13 19:21:51 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 530904 (requested -1), SO_SNDBUF = 102400 (requested 102400). 2015-01-13 19:21:51 INFO SyncProducer:68 - Connected to 192.168.50.71:9092 for producing 2015-01-13 19:21:51 INFO SyncProducer:68 - Disconnecting from 192.168.50.71:9092 2015-01-13 19:21:51 DEBUG ClientUtils$:52 - Successfully fetched metadata for 1 topic(s) Set(HTTP) 2015-01-13 19:21:51 DEBUG ConsumerFetcherManager$LeaderFinderThread:52 - [1_richard-Latitude-E6440-1421169710915-2dd7b19d-leader-finder-thread], {TopicMetadata for topic HTTP -> Metadata for partition [HTTP,0] is partition 0 leader: 0 ( 192.168.50.71:9092) replicas: 0 (192.168.50.71:9092) isr: 0 ( 192.168.50.71:9092) isUnderReplicated: false} 2015-01-13 19:21:51 INFO ConsumerFetcherThread:68 - [ConsumerFetcherThread-1_richard-Latitude-E6440-1421169710915-2dd7b19d-0-0], Starting 2015-01-13 19:21:51 DEBUG BlockingChannel:52 - Created socket with SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 1313280 (requested -1). 2015-01-13 19:21:51 DEBUG PartitionTopicInfo:52 - reset fetch offset of ( HTTP:0: fetched offset = 20996: consumed offset = -1 ) to 20996 2015-01-13 19:21:51 DEBUG PartitionTopicInfo:52 - reset consume offset of HTTP:0: fetched offset = 20996: consumed offset = 20996 to 20996 2015-01-13 19:21:51 INFO ConsumerFetcherManager:68 - [ConsumerFetcherManager-1421169710957] Added fetcher for partitions ArrayBuffer([[HTTP,0], initOffset -1 to broker id:0,host:192.168.50.71,port:9092] ) 2015-01-13 19:21:53 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 1ms 2015-01-13 19:21:55 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 1ms 2015-01-13 19:21:57 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 1ms 2015-01-13 19:21:59 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 0ms 2015-01-13 19:22:01 DEBUG PartitionTopicInfo:52 - updated fetch offset of (HTTP:0: fetched offset = 20997: consumed offset = 20996) to 20997 2015-01-13 19:22:01 DEBUG ClientCnxn:759 - Got ping response for sessionid: 0x14ae3aafd0c0028 after 0ms [test-akka.actor.default-dispatcher-2] INFO com.sclasen.akka.kafka.ConnectorFSM - at=transition from=Receiving to=Committing uncommitted=0 [test-akka.actor.default-dispatcher-4] WARN com.sclasen.akka.kafka.ConnectorFSM - state=Committing msg=StateTimeout drained=1 streams=1

— Reply to this email directly or view it on GitHub https://github.com/sclasen/akka-kafka/issues/26#issuecomment-69782516.

richiesgr commented 9 years ago

Thanks I give it a try

richiesgr commented 9 years ago

Thanks a lot for your help working now on 0.0.10