zhanyingf15 / datax-elasticsearch

elasticsearch reader and writer plugin for datax
39 stars 25 forks source link

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、MaxCompute(原ODPS)、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

datax-elasticsearch是datax的一个支持elasticsearch的插件,包含elasticsearchreaderelasticsearchwriter两部分。借助于datax体系,能够高效的在elasticsearch与oracle、mysql、HDFS等异构数据源间同步数据。

插件安装

方式一:将下载的release.zip插件解压,elasticsearchreader拷贝到${DATAX_HOME}\plugin\reader目录,elasticsearchwriter拷贝到${DATAX_HOME}\plugin\writer目录。

方式二:clone源码,使用maven编译打包,将打包后的资源放到如下结构中。

${DATAX_HOME}
|-- bin       
|   `-- datax.py
|-- conf
|   |-- core.json
|   `-- logback.xml
|-- lib
|   `-- datax-core-dependencies.jar
`-- plugin
    |-- reader
    |   `-- elasticsearchreader
    |       |-- libs
    |       |   `-- elasticsearchreader-plugin-dependencies.jar
    |       |-- elasticsearchreader-0.0.1.jar
    |       `-- plugin.json
    `-- writer
        |-- elasticsearchwriter
        |   |-- libs
        |   |   `--elasticsearchwriter-plugin-dependencies.jar
        |   |-- elasticsearchwriter-0.0.1.jar
        |   `-- plugin.json
        |-- oceanbasewriter
        `-- odpswriter

elasticsearchreader和elasticsearchwriter都是同一个jar包,只是入口不同,datax通过plugin.json中定义的入口反射得到实例。elasticsearchreader和elasticsearchwriter的plugin.json内容在源码的resources文件夹中,分别是plugin_reader_template.json和plugin_writer_template.json。修改命名再放到datax对应文件夹中即可。

elasticsearchreader

elasticsearchreader任务配置如下

{
    "name": "elasticsearchreader",
    "parameter": {
        "connection": [
            "10.43.164.113:9300"
        ],
        "index":"test2",
        "type":"game",
        "pageSize":100,
        "column": ["_id","contact_order_id"]
    }
}

elasticsearchwriter

elasticsearchwriter任务配置如下:

{
    "name": "elasticsearchwriter",
    "parameter": {
        "connection": [
            "10.43.164.113:9300"
        ],
        "index":"test2",
        "type":"game",
        "bulkNum":1000,
        "refresh":false,
        "idField":"_id",
        "column": ["_id","contact_order_id","name"]
    }
}

connection、index、type、column同上。

完整示例

向elasticsearch写入数据时一定要根据elasticsearch运行环境控制写入速度。否则可能会报一下错误操作写入失败

org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution of org.elasticsearch.transport.TransportService$4@12748abb on EsThreadPoolExecutor[bulk, queue capacity = 50, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@29b145ab[Running, pool size = 4, active threads = 4, queued tasks = 50, completed tasks = 31817]]