keedio / flume-ng-sql-source

Flume Source to import data from SQL Databases
Apache License 2.0
264 stars 164 forks source link

Has a litte issue with rowsList`s size #77

Open EdQzz opened 5 years ago

EdQzz commented 5 years ago

Hi my english is not good i hope you can understand my means

source code like bellow if (!rowsList.isEmpty()){
sqlSourceHelper.setCurrentIndex( Integer.toString((Integer.parseInt(sqlSourceHelper.getCurrentIndex())+ rowsList.size())) ); }

In this code,if table has id from 1 to 10 and 20 to 100,we know the rowsList`s size is 90,the currentIndex starts from 1,so the new currentIndex will be 91, that causes we extract id from 92 to 100 again

so we changed code. if (!rowsList.isEmpty()){
Long max=0l; for (List list : rowsList) { if(Long.parseLong(list.get(0).toString())>max) { max=Long.parseLong(list.get(0).toString()); } } sqlSourceHelper.setCurrentIndex(max.toString()); } we use the changeDate(timestamp) as refer column,replaced id

shiMingZheng commented 5 years ago

Thank you,you are right!You can set Current Index from column which you want determined by “list.get(i)”。