iralance / myblog

notes
0 stars 0 forks source link

kafka使用教程 #41

Open iralance opened 6 years ago

iralance commented 6 years ago

开发环境mac

安装

1.os环境下

brew install kafka
brew install zookeeper

2.启动

brew services start kafka(./kafka-server-start /usr/local/etc/kafka/server.properties)
brew services start zookeeper

3.kafka-manage

git clone https://github.com/yahoo/kafka-manager.git
nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9090 &

打开localhost:9090就可以看到了 4.使用 打开2个终端,分别执行以下命令

# 创建 topic
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 创建一个向 topictest 发送消息的 producer
./bin/kafka-console-producer --broker-list localhost:9092 --topic test
# 创建一个从 topic test 读取消息的 consumer
./bin/kafka-console-consumer --zookeeper localhost:2181 --topic test --from-beginning

5.外部调用(python)

pip install kafka-python
#新建个文件
from kafka import KafkaProducer
# 设置 Kafka 地址
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# 设置需要发送的 topic 及内容
producer.send('test', b'I am the most handsome man in the world! I think! ').get(timeout=30)

aa

如果要使用内网访问,新版配置直接修改server.properties

listeners=PLAINTEXT://内网ip:9092

默认zookeeper.properties是不用修改的

Kafka接入Elasticsearch

  1. Logstash->kafka
    
    #log-to-kafka.conf
    input {
    file {
    # 确定需要检测的文件
    path => [ "/Users/qianlei/tmp/test/*.log"]
    # 日志类型
    type => "syslog"
    add_field => { "service" => "system-log"}
    # stat_interval => 30
    }
    }

output {

输出到命令行,一般用于调试

stdout { codec => rubydebug }

输出到 Kafka,topic 名称为 logs,地址为默认的端口号

kafka { topic_id => "logs" bootstrap_servers => "10.200.136.20:9092" } }

说明:
add_field 添加一个 topic 字段,用作之后导入 elasticsearch 的索引标识
stat_interval 单位是秒,这里 30 分钟进行一次检测,不过测试的时候需要去掉这个配置

2.kafka->elasticsearch

kafka-to-es.conf

input { kafka { bootstrap_servers => "10.200.136.20:9092" topics => ["logs"] } } output {

for debugging

stdout { codec => rubydebug }

elasticsearch { hosts => "localhost:9200" index => "system-log" } }


最终可以看到如下效果
![kafka1](https://user-images.githubusercontent.com/4393443/38775735-26e8209e-40bc-11e8-909f-7df44d864af7.png)
![kafka2](https://user-images.githubusercontent.com/4393443/38775740-2d49ea80-40bc-11e8-9065-c5093634104a.png)

## 参考链接
* [kafka的advertised.host.name参数 外网访问配置](https://www.cnblogs.com/snifferhu/p/5102629.html)
* [kafka入门介绍](http://orchome.com/5)
* [python error messages not delivering from producer](https://github.com/dpkp/kafka-python/issues/563)
* [使用Kafka Manager管理Kafka集群](https://www.jianshu.com/p/e59c777f7f4e)