Closed MaheshSankaran closed 9 years ago
Can you post the console consumer command? From the logs, it appears to be connecting to your broker as expected. Currently it is setup to read from the head of the Kafka queue from the time which you first connect the consumer group. So one you establish the consumer group anything submitted to the broker, via a producer, should show up on the other side. Try firing a console producer after logstash is running that places messages on the testing 1 topic. They should show up.
Note that if you run a single consumers you can set reset_beginning to true and you should get all messages that were on the queue for that topic.
Hi joe, As you told, i set reset_beginning=>true.Now it is working fine.
Thank you Mahesh.S
Hi joe, This is my logstash config file.It does not receive data from kafka.but when i tried kafka-console-consumer.sh command it received data from kafka. input { kafka { zk_connect => "x.x.x.x:2181" queue_size => 30 group_id => "logstash" topic_id => "testing1" reset_beginning => false rebalance_max_retries => 3 rebalance_backoff_ms => 3000 consumer_timeout_ms => 30000
}
output { stdout { codec => "json" } } My output from logstash with logging is following.kindly help me to resolve this issue.
[root@datanode logstash-1.4.2]# bin/logstash agent -f test1.conf -v Using milestone 1 input plugin 'kafka'. This plugin should work, but would benefit from use by folks like you. Please let us know if you find bugs or have suggestions on how to improve this plugin. For more information on plugin milestones, see http://logstash.net/docs/1.4.2/plugin-milestones {:level=>:warn} Registering kafka {:group_id=>"logstash", :topic_id=>"testing1", :zk_connect=>"10.10.100.62:2181", :level=>:info} Pipeline started {:level=>:info} Running kafka {:group_id=>"logstash", :topic_id=>"testing1", :zk_connect=>"10.10.100.62:2181", :level=>:info} log4j, [2015-01-28T15:12:30.842] INFO: kafka.utils.VerifiableProperties: Verifying properties log4j, [2015-01-28T15:12:30.891] INFO: kafka.utils.VerifiableProperties: Property auto.commit.enable is overridden to true log4j, [2015-01-28T15:12:30.892] INFO: kafka.utils.VerifiableProperties: Property auto.commit.interval.ms is overridden to 1000 log4j, [2015-01-28T15:12:30.892] INFO: kafka.utils.VerifiableProperties: Property auto.offset.reset is overridden to largest log4j, [2015-01-28T15:12:30.892] INFO: kafka.utils.VerifiableProperties: Property consumer.timeout.ms is overridden to 300000 log4j, [2015-01-28T15:12:30.892] INFO: kafka.utils.VerifiableProperties: Property fetch.message.max.bytes is overridden to 1048576 log4j, [2015-01-28T15:12:30.892] INFO: kafka.utils.VerifiableProperties: Property fetch.min.bytes is overridden to 1 log4j, [2015-01-28T15:12:30.893] INFO: kafka.utils.VerifiableProperties: Property fetch.wait.max.ms is overridden to 100 log4j, [2015-01-28T15:12:30.893] INFO: kafka.utils.VerifiableProperties: Property group.id is overridden to logstash log4j, [2015-01-28T15:12:30.893] INFO: kafka.utils.VerifiableProperties: Property queued.max.message.chunks is overridden to 10 log4j, [2015-01-28T15:12:30.893] INFO: kafka.utils.VerifiableProperties: Property rebalance.backoff.ms is overridden to 3000 log4j, [2015-01-28T15:12:30.893] INFO: kafka.utils.VerifiableProperties: Property rebalance.max.retries is overridden to 3 log4j, [2015-01-28T15:12:30.893] INFO: kafka.utils.VerifiableProperties: Property refresh.leader.backoff.ms is overridden to 200 log4j, [2015-01-28T15:12:30.894] INFO: kafka.utils.VerifiableProperties: Property socket.receive.buffer.bytes is overridden to 65536 log4j, [2015-01-28T15:12:30.894] INFO: kafka.utils.VerifiableProperties: Property socket.timeout.ms is overridden to 30000 log4j, [2015-01-28T15:12:30.894] INFO: kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to 10.10.100.62:2181 log4j, [2015-01-28T15:12:30.894] INFO: kafka.utils.VerifiableProperties: Property zookeeper.connection.timeout.ms is overridden to 6000 log4j, [2015-01-28T15:12:30.894] INFO: kafka.utils.VerifiableProperties: Property zookeeper.session.timeout.ms is overridden to 6000 log4j, [2015-01-28T15:12:30.895] INFO: kafka.utils.VerifiableProperties: Property zookeeper.sync.time.ms is overridden to 2000 log4j, [2015-01-28T15:12:30.915] INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_datanode.sumtwo.com-1422438150910-8bf76627], Connecting to zookeeper instance at 10.10.100.62:2181 log4j, [2015-01-28T15:12:30.945] INFO: org.I0Itec.zkclient.ZkEventThread: Starting ZkClient event thread. log4j, [2015-01-28T15:12:30.954] INFO: org.apache.zookeeper.ZooKeeper: Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT log4j, [2015-01-28T15:12:30.954] INFO: org.apache.zookeeper.ZooKeeper: Client environment:host.name=datanode.sumtwo.com log4j, [2015-01-28T15:12:30.954] INFO: org.apache.zookeeper.ZooKeeper: Client environment:java.version=1.7.0_67 log4j, [2015-01-28T15:12:30.954] INFO: org.apache.zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation log4j, [2015-01-28T15:12:30.955] INFO: org.apache.zookeeper.ZooKeeper: Client environment:java.home=/usr/jdk64/jdk1.7.0_67/jre log4j, [2015-01-28T15:12:30.955] INFO: org.apache.zookeeper.ZooKeeper: Client environment:java.class.path=/root/logstash-1.4.2/vendor/jar/jruby-complete-1.7.11.jar log4j, [2015-01-28T15:12:30.955] INFO: org.apache.zookeeper.ZooKeeper: Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib log4j, [2015-01-28T15:12:30.955] INFO: org.apache.zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/tmp log4j, [2015-01-28T15:12:30.955] INFO: org.apache.zookeeper.ZooKeeper: Client environment:java.compiler=
log4j, [2015-01-28T15:12:30.955] INFO: org.apache.zookeeper.ZooKeeper: Client environment:os.name=Linux
log4j, [2015-01-28T15:12:30.955] INFO: org.apache.zookeeper.ZooKeeper: Client environment:os.arch=amd64
log4j, [2015-01-28T15:12:30.955] INFO: org.apache.zookeeper.ZooKeeper: Client environment:os.version=2.6.32-358.el6.x86_64
log4j, [2015-01-28T15:12:30.955] INFO: org.apache.zookeeper.ZooKeeper: Client environment:user.name=root
log4j, [2015-01-28T15:12:30.955] INFO: org.apache.zookeeper.ZooKeeper: Client environment:user.home=/root
log4j, [2015-01-28T15:12:30.955] INFO: org.apache.zookeeper.ZooKeeper: Client environment:user.dir=/root/logstash-1.4.2
log4j, [2015-01-28T15:12:30.956] INFO: org.apache.zookeeper.ZooKeeper: Initiating client connection, connectString=10.10.100.62:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@3451b8b2
log4j, [2015-01-28T15:12:30.971] INFO: org.apache.zookeeper.ClientCnxn: Opening socket connection to server /10.10.100.62:2181
log4j, [2015-01-28T15:12:30.977] INFO: org.apache.zookeeper.ClientCnxn: Socket connection established to datanode.sumtwo.com/10.10.100.62:2181, initiating session
log4j, [2015-01-28T15:12:31.013] INFO: org.apache.zookeeper.ClientCnxn: Session establishment complete on server datanode.sumtwo.com/10.10.100.62:2181, sessionid = 0x14b2ef6f1830021, negotiated timeout = 6000
log4j, [2015-01-28T15:12:31.015] INFO: org.I0Itec.zkclient.ZkClient: zookeeper state changed (SyncConnected)
log4j, [2015-01-28T15:12:31.043] INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_datanode.sumtwo.com-1422438150910-8bf76627], starting auto committer every 1000 ms
log4j, [2015-01-28T15:12:31.136] INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_datanode.sumtwo.com-1422438150910-8bf76627], begin registering consumer logstash_datanode.sumtwo.com-1422438150910-8bf76627 in ZK
log4j, [2015-01-28T15:12:31.224] INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_datanode.sumtwo.com-1422438150910-8bf76627], end registering consumer logstash_datanode.sumtwo.com-1422438150910-8bf76627 in ZK
log4j, [2015-01-28T15:12:31.228] INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_datanode.sumtwo.com-1422438150910-8bf76627], starting watcher executor thread for consumer logstash_datanode.sumtwo.com-1422438150910-8bf76627
log4j, [2015-01-28T15:12:31.259] INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_datanode.sumtwo.com-1422438150910-8bf76627], begin rebalancing consumer logstash_datanode.sumtwo.com-1422438150910-8bf76627 try #0
log4j, [2015-01-28T15:12:31.545] INFO: kafka.consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1422438151022] Stopping leader finder thread
log4j, [2015-01-28T15:12:31.546] INFO: kafka.consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1422438151022] Stopping all fetchers
log4j, [2015-01-28T15:12:31.547] INFO: kafka.consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1422438151022] All connections stopped
log4j, [2015-01-28T15:12:31.548] INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_datanode.sumtwo.com-1422438150910-8bf76627], Cleared all relevant queues for this fetcher
log4j, [2015-01-28T15:12:31.550] INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_datanode.sumtwo.com-1422438150910-8bf76627], Cleared the data chunks in all the consumer message iterators
log4j, [2015-01-28T15:12:31.551] INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_datanode.sumtwo.com-1422438150910-8bf76627], Committing all offsets after clearing the fetcher queues
log4j, [2015-01-28T15:12:31.552] INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_datanode.sumtwo.com-1422438150910-8bf76627], Releasing partition ownership
log4j, [2015-01-28T15:12:31.555] INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_datanode.sumtwo.com-1422438150910-8bf76627], Consumer logstash_datanode.sumtwo.com-1422438150910-8bf76627 rebalancing the following partitions: ArrayBuffer(0) for topic testing1 with consumers: List(logstash_datanode.sumtwo.com-1422438150910-8bf76627-0)
log4j, [2015-01-28T15:12:31.558] INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_datanode.sumtwo.com-1422438150910-8bf76627], logstash_datanode.sumtwo.com-1422438150910-8bf76627-0 attempting to claim partition 0
log4j, [2015-01-28T15:12:31.577] INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_datanode.sumtwo.com-1422438150910-8bf76627], logstash_datanode.sumtwo.com-1422438150910-8bf76627-0 successfully owned partition 0 for topic testing1
log4j, [2015-01-28T15:12:31.579] INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_datanode.sumtwo.com-1422438150910-8bf76627], Updating the cache
log4j, [2015-01-28T15:12:31.582] INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_datanode.sumtwo.com-1422438150910-8bf76627], Consumer logstash_datanode.sumtwo.com-1422438150910-8bf76627 selected partitions : testing1:0: fetched offset = 10000: consumed offset = 10000
log4j, [2015-01-28T15:12:31.587] INFO: kafka.consumer.ZookeeperConsumerConnector: [logstash_datanode.sumtwo.com-1422438150910-8bf76627], end rebalancing consumer logstash_datanode.sumtwo.com-1422438150910-8bf76627 try #0
log4j, [2015-01-28T15:12:31.587] INFO: kafka.consumer.ConsumerFetcherManager$LeaderFinderThread: [logstash_datanode.sumtwo.com-1422438150910-8bf76627-leader-finder-thread], Starting
log4j, [2015-01-28T15:12:31.639] INFO: kafka.utils.VerifiableProperties: Verifying properties
log4j, [2015-01-28T15:12:31.640] INFO: kafka.utils.VerifiableProperties: Property client.id is overridden to logstash
log4j, [2015-01-28T15:12:31.640] INFO: kafka.utils.VerifiableProperties: Property metadata.broker.list is overridden to datanode.sumtwo.com:6667
log4j, [2015-01-28T15:12:31.640] INFO: kafka.utils.VerifiableProperties: Property request.timeout.ms is overridden to 30000
log4j, [2015-01-28T15:12:31.672] INFO: kafka.client.ClientUtils$: Fetching metadata from broker id:0,host:datanode.sumtwo.com,port:6667 with correlation id 0 for 1 topic(s) Set(testing1)
log4j, [2015-01-28T15:12:31.675] INFO: kafka.producer.SyncProducer: Connected to datanode.sumtwo.com:6667 for producing
log4j, [2015-01-28T15:12:31.704] INFO: kafka.producer.SyncProducer: Disconnecting from datanode.sumtwo.com:6667
log4j, [2015-01-28T15:12:31.725] INFO: kafka.consumer.ConsumerFetcherThread: [ConsumerFetcherThread-logstash_datanode.sumtwo.com-1422438150910-8bf76627-0-0], Starting
log4j, [2015-01-28T15:12:31.729] INFO: kafka.consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1422438151022] Added fetcher for partitions ArrayBuffer([[testing1,0], initOffset 10000 to broker id:0,host:datanode.sumtwo.com,port:6667] )
Thanks Mahesh.S