alibaba / canal

阿里巴巴 MySQL binlog 增量订阅&消费组件
Apache License 2.0
28.32k stars 7.58k forks source link

通过etl命令全量同步一张2亿数据的表到es,同步到5000w条之后速度明显变慢 #2490

Closed level2player closed 4 years ago

level2player commented 4 years ago

Question

同步语句:select t.id as _id,t.id,t.timestamp,t.maker_account,t.maker_order_id,t.maker_order_index,t.taker_account,t.taker_order_id,t.taker_order_index,t.symbol,t.side,t.size,t.price,t.is_otc from engine.trades t

初步怀疑是同步的时候offset太大了造成sql查询性能降低,请问有什么方案可以解决这个问题吗

1120025919 commented 4 years ago

ESEtlService.importData

if (cnt >= 10000) { int threadCount = 3; // 从配置读取默认为3 long perThreadCnt = cnt / threadCount; ExecutorService executor = Util.newFixedThreadPool(threadCount, 5000L); for (int i = 0; i < threadCount; i++) { long offset = i * perThreadCnt; Long size = null; if (i != threadCount - 1) { size = perThreadCnt; } String sqlFinal; if (size != null) { sqlFinal = sql + " LIMIT " + offset + "," + size; } else { sqlFinal = sql + " LIMIT " + offset + "," + cnt; } executor.execute(() -> executeSqlImport(dataSource, sqlFinal, mapping, impCount, errMsg)); } 这个是全量导入数据得源码,可以看出是通过limit offset,size查询这样会导致深度分页,数据库查询太耗时,并且数据量比较大的话,会直接导致内存溢出。要不就是自己根据id>=from and id<= to(from-to=1000 这个可以自己定义)多线程查询改写源码重新打包之后,把plugin/client-adapter.elasticsearch-1.1.4-jar-with-dependencies.jar 替换掉,重启启动就好

level2player commented 4 years ago

ESEtlService.importData

if (cnt >= 10000) { int threadCount = 3; // 从配置读取默认为3 long perThreadCnt = cnt / threadCount; ExecutorService executor = Util.newFixedThreadPool(threadCount, 5000L); for (int i = 0; i < threadCount; i++) { long offset = i * perThreadCnt; Long size = null; if (i != threadCount - 1) { size = perThreadCnt; } String sqlFinal; if (size != null) { sqlFinal = sql + " LIMIT " + offset + "," + size; } else { sqlFinal = sql + " LIMIT " + offset + "," + cnt; } executor.execute(() -> executeSqlImport(dataSource, sqlFinal, mapping, impCount, errMsg)); } 这个是全量导入数据得源码,可以看出是通过limit offset,size查询这样会导致深度分页,数据库查询太耗时,并且数据量比较大的话,会直接导致内存溢出。要不就是自己根据id>=from and id<= to(from-to=1000 这个可以自己定义)多线程查询改写源码重新打包之后,把plugin/client-adapter.elasticsearch-1.1.4-jar-with-dependencies.jar 替换掉,重启启动就好

通过设置etlCondition: "where t.id >={} and t.id<{}" 手动控制id范围解决了这个问题

1120025919 commented 4 years ago

ESEtlService.importData if (cnt >= 10000) { int threadCount = 3; // 从配置读取默认为3 long perThreadCnt = cnt / threadCount; ExecutorService executor = Util.newFixedThreadPool(threadCount, 5000L); for (int i = 0; i < threadCount; i++) { long offset = i * perThreadCnt; Long size = null; if (i != threadCount - 1) { size = perThreadCnt; } String sqlFinal; if (size != null) { sqlFinal = sql + " LIMIT " + offset + "," + size; } else { sqlFinal = sql + " LIMIT " + offset + "," + cnt; } executor.execute(() -> executeSqlImport(dataSource, sqlFinal, mapping, impCount, errMsg)); } 这个是全量导入数据得源码,可以看出是通过limit offset,size查询这样会导致深度分页,数据库查询太耗时,并且数据量比较大的话,会直接导致内存溢出。要不就是自己根据id>=from and id<= to(from-to=1000 这个可以自己定义)多线程查询改写源码重新打包之后,把plugin/client-adapter.elasticsearch-1.1.4-jar-with-dependencies.jar 替换掉,重启启动就好

通过设置etlCondition: "where t.id >={} and t.id<{}" 手动控制id范围解决了这个问题 设置etlCondition: "where t.id >={} and t.id<{}" 这样全量只是导入一部分数据,还要写个程序来调用http://ip:port/etl/es/xxx.yml 传入id参数这样来同步,个人感觉改源码重新打包替换比较好,首先建立mysql表的时候自增id必须要有的,以后再导入其他的数据就比较方便了

cynen commented 1 month ago

参考: https://github.com/alibaba/canal/issues/3149

参考这里,大表的话,还是建议带 etlCondition , 传参实现走索引. 否则如果直接走 select * from table. 不走索引会很慢. select * from table where id >= {} and id <={}