iralance / myblog

notes
0 stars 0 forks source link

logstash-input-jdbc实现mysql数据同步 #40

Open iralance opened 6 years ago

iralance commented 6 years ago

mysql全量同步数据到elasticsearch,配合中文分词做搜索

安装

1.修改ruby仓库镜像 如果没有安装 gem 的话 安装gem

sudo yum install gem

替换国内的镜像

gem sources --add https://ruby.taobao.org/ --remove https://rubygems.org/
gem sources -l

然后开始安装

./bin/logstash-plugin install logstash-input-jdbc

如果安装不上也可以考虑用源码安装 安装后执行就能看到安装的插件了

./bin/logstash-plugin list

同步mysql文件

一个 mysql 的Java驱动包,理论上任何版本都可以。 参考elk环境搭建篇-Logstash在该目录下新建个logstash-mysql.conf文件,重新启动logstash。

input {
  jdbc {
    jdbc_driver_library => "/Users/qianlei/soft/logstash/mysql-connector-java-5.1.46.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/laravel-phpunit?useSSL=false"
    jdbc_user => "root"
    jdbc_password => "******"
    jdbc_paging_enabled => "true"
    clean_run => false
    jdbc_page_size => "50000"
    record_last_run => true
    use_column_value => true
    tracking_column => id
    schedule => "* * * * *"
    statement_filepath => "/Users/qianlei/soft/logstash/run.sql"
    last_run_metadata_path => "/Users/qianlei/soft/logstash/last_id"
    type => "jdbc"
  }
}

output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "articles"
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}

在外层目录新建run.sql文件用于存放上一次同步的tracking_column的值。

#run.sql
SELECT * from articles where id > :sql_last_value

补充说明: 1.elasticsearch数据重复以及增量同步 在默认配置下,tracking_column这个值是@timestamp,存在elasticsearch就是_id值,是logstash存入elasticsearch的时间,这个值的主要作用类似mysql的主键,是唯一的,但是我们的时间戳其实是一直在变的,所以我们每次使用select语句查询的数据都会存入elasticsearch中,导致数据重复。 解决方法:在要查询的表中,找主键或者自增值的字段,将它设置为_id的值,因为_id值是唯一的,所以,当有重复的_id的时候,数据就不会重复

// 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
  record_last_run => "true"

  // 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
  use_column_value => "true"

  // 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
  tracking_column => "autoid"

2.数据同步频繁,影响mysql数据库性能 设置mysql查询范围,防止大量的查询拖死数据库

// 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
  schedule => "* * * * *"
  // 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
  tracking_column => "id"

  // 上次执行数据库的值,该值是上次查询时tracking_column设置的字段最大值
  last_run_metadata_path => "/Users/qianlei/soft/logstash/last_id"

  // 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
  clean_run => "false"

在sql语句这里设置select * from WHERE autoid > :sql_last_value; 注意:如果你的语句比较复杂,autoid > :sql_last_value一定要写在WHERE后面,然后接AND即可 3.jdbc_connection_string加参数useSSL=false。默认mysql新版本5.7需要https,加这个参数就不会出现warning了。 log1 log2

4.如果要同步多个表可以这么写

   if[type] == "jdbc_office"{
        elasticsearch {
        hosts  => "localhost:9200"
        index => "contacts4"
        document_type => "office1"
        document_id => "%{id}"
        }
    }
    if[type] == "jdbc_user"{
        elasticsearch {
        hosts  => "localhost:9200"
        index => "contacts4"
        document_type => "user1"
        document_id => "%{id}"
        }
    }

参考链接

logstash-input-jdbc同步mysql数据到elasticsearch