keedio / flume-ng-sql-source

Flume Source to import data from SQL Databases
Apache License 2.0
264 stars 164 forks source link

Issue with the start.from property #67

Open mounikarudra opened 6 years ago

mounikarudra commented 6 years ago

Hi,

I am unable to fetch the records from the desired index even though I specified the agent.sources.sql-source.start.from value to a number.

Here is the config file I'm using:

agent1.channels.ch1.type = memory agent1.sources.sql-source.channels = ch1 agent1.channels = ch1 agent1.sinks = HDFS

agent1.sources = sql-source agent1.sources.sql-source.type = org.keedio.flume.source.SQLSource

URL to connect to database (currently only mysql is supported)

agent1.sources.sql-source.connection.url = jdbc:sybase:Tds:XXXX/test

Database connection properties

agent1.sources.sql-source.user = XX agent1.sources.sql-source.password = XXXX agent1.sources.sql-source.table = employee agent1.sources.sql-source.database = test

agent1.sources.sql-source.columns.to.select = * agent1.sources.sql-source.start.from = 7 agent1.sources.sql-source.run.query.delay=10000

Status file is used to save last readed row

agent1.sources.sql-source.status.file.path = /var/lib/flume agent1.sources.sql-source.status.file.name = sql-source.status

agent1.sinks.HDFS.channel = ch1 agent1.sinks.HDFS.type = logger

I am also unable to make use of the incremental properties. Am I missing any configuration? Please, someone, let me know. I would also like to know how exactly the status file can be used when the agent is restarted to do incremental load.

Thanks in advance.

zhipcui commented 6 years ago

Your should using custom query https://github.com/keedio/flume-ng-sql-source#custom-query

There is one thing should be careful is that incremental field must be the first returned column. Should we add one configuration that specify incremental field index (like: incremental.field.index) @lazaromedina

lazaromedina commented 6 years ago

Hi mounikarudra, zhipcui:

@mounikarudra, i am not sure to understand when you say:

I am also unable to make use of the incremental properties.

In addition to property start.from , what other incremental properties do you mean? Could you please add flume-source-sql version you are using, full config file, and log trace when you start flume agent?.

@zhipcui , thanks for your help: there is no need for incremental column to be the first returned column, because there is no need for incremental column, please check the next example:

mysql> show create table customers3;
+------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table      | Create Table                                                                                                                                                                     |
+------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| customers3 | CREATE TABLE `customers3` (
  `first_name` text COLLATE utf8_unicode_ci,
  `last_name` text COLLATE utf8_unicode_ci
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci |
+------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)

customers3 contains 100 rows:

mysql> select * from customers3;
+------------------------------------------+-----------+
| first_name                               | last_name |
+------------------------------------------+-----------+
| event_1_GenericCSV061418_16_0728_xmRhP   | FXDO      |
| event_2_GenericCSV061418_16_0728_xmRhP   | h5hJ      |
| event_26_GenericCSV061418_16_0728_xmRhP  | e47R      |
| event_3_GenericCSV061418_16_0728_xmRhP   | aIvL      |
| event_27_GenericCSV061418_16_0728_xmRhP  | H7yi      |
| event_4_GenericCSV061418_16_0728_xmRhP   | guco      |
| event_28_GenericCSV061418_16_0728_xmRhP  | ECK6      |
| event_5_GenericCSV061418_16_0728_xmRhP   | Jt03      |
| event_29_GenericCSV061418_16_0728_xmRhP  | wIGP      |

..... (for clarity I have intentionally omitted the values
..... ​​from row 10 to row 80, both inclusive)

| event_89_GenericCSV061418_16_0728_xmRhP  | Y11R      |
| event_82_GenericCSV061418_16_0728_xmRhP  | yq4z      |
| event_80_GenericCSV061418_16_0728_xmRhP  | mk05      |
| event_90_GenericCSV061418_16_0728_xmRhP  | EFrp      |
| event_83_GenericCSV061418_16_0728_xmRhP  | 4YYM      |
| event_81_GenericCSV061418_16_0728_xmRhP  | FuBR      |
| event_91_GenericCSV061418_16_0728_xmRhP  | G2Zu      |
| event_85_GenericCSV061418_16_0728_xmRhP  | i6o3      |
| event_92_GenericCSV061418_16_0728_xmRhP  | MBqG      |
| event_86_GenericCSV061418_16_0728_xmRhP  | vjxM      |
| event_93_GenericCSV061418_16_0728_xmRhP  | OBky      |
| event_87_GenericCSV061418_16_0728_xmRhP  | X8gY      |
| event_94_GenericCSV061418_16_0728_xmRhP  | sTug      |
| event_95_GenericCSV061418_16_0728_xmRhP  | ZzxE      |
| event_84_GenericCSV061418_16_0728_xmRhP  | qsxH      |
| event_96_GenericCSV061418_16_0728_xmRhP  | kfkb      |
| event_97_GenericCSV061418_16_0728_xmRhP  | lSfm      |
| event_98_GenericCSV061418_16_0728_xmRhP  | u3q7      |
| event_99_GenericCSV061418_16_0728_xmRhP  | MA1L      |
| event_100_GenericCSV061418_16_0728_xmRhP | 1XwN      |
+------------------------------------------+-----------+
100 rows in set (0.00 sec)

My config files says to retrieve rows from row 86 (inclusive) onwards. Also note that i did not use custom.query setting:


### FLUME-SQL

agent.sinks = k1
agent.channels = ch1 
agent.sources = sql1

# For each one of the sources, the type is defined
agent.sources.sql1.type = org.keedio.flume.source.SQLSource

agent.sources.sql1.hibernate.connection.url = jdbc:mysql://127.0.0.1:3306/testdb

# Hibernate Database connection properties
agent.sources.sql1.hibernate.connection.user = root
agent.sources.sql1.hibernate.connection.password = root
agent.sources.sql1.hibernate.connection.autocommit = true
agent.sources.sql1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
agent.sources.sql1.hibernate.connection.driver_class =  com.mysql.jdbc.Driver

agent.sources.sql1.table = customers3

# Columns to import to kafka (default * import entire row)
agent.sources.sql1.columns.to.select = *

# Query delay, each configured milisecond the query will be sent
agent.sources.sql1.run.query.delay=10000

# Status file is used to save last readed row
agent.sources.sql1.status.file.path = /var/log/flume-sql
agent.sources.sql1.status.file.name = sql1.status

#------------------------------------

agent.sources.sql1.start.from = 85 

#------------------------------------  

agent.sources.sql1.batch.size = 1000
agent.sources.sql1.max.rows = 1000000
agent.sources.sql1.delimiter.entry = ,
agent.sources.sql1.enclose.by.quotes = false

agent.sources.sql1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent.sources.sql1.hibernate.c3p0.min_size=1
agent.sources.sql1.hibernate.c3p0.max_size=10

agent.sinks.k1.type = file_roll
agent.sinks.k1.sink.directory = /var/log/flume-sql
agent.sinks.k1.sink.rollInterval = 7200
agent.sinks.k1.channel = ch1

agent.channels.ch1.type = memory
agent.channels.ch1.capacity = 10000
agent.channels.ch1.transactionCapacity = 1000

agent.sources.sql1.channels = ch1

I am dumpping events to file:

[root@quickstart flume-sql]# cat 1535355205987-1

event_81_GenericCSV061418_16_0728_xmRhP,FuBR
event_91_GenericCSV061418_16_0728_xmRhP,G2Zu
event_85_GenericCSV061418_16_0728_xmRhP,i6o3
event_92_GenericCSV061418_16_0728_xmRhP,MBqG
event_86_GenericCSV061418_16_0728_xmRhP,vjxM
event_93_GenericCSV061418_16_0728_xmRhP,OBky
event_87_GenericCSV061418_16_0728_xmRhP,X8gY
event_94_GenericCSV061418_16_0728_xmRhP,sTug
event_95_GenericCSV061418_16_0728_xmRhP,ZzxE
event_84_GenericCSV061418_16_0728_xmRhP,qsxH
event_96_GenericCSV061418_16_0728_xmRhP,kfkb
event_97_GenericCSV061418_16_0728_xmRhP,lSfm
event_98_GenericCSV061418_16_0728_xmRhP,u3q7
event_99_GenericCSV061418_16_0728_xmRhP,MA1L
event_100_GenericCSV061418_16_0728_xmRhP,1XwN
[root@quickstart flume-sql]# cat 1535355205987-1 | wc -l

15
[root@quickstart flume-sql]# cat sql1.status

{"SourceName":"sql1","URL":"jdbc:mysql:\/\/127.0.0.1:3306\/testdb","LastIndex":"100","ColumnsToSelect":"*","Table":"customers3"}

@zhipcui , you are almost right, in the first versions of flume-sql-source there were two restrictions about 'incremental column' to exist and to be the first retrieved. Later, there was no need to be the first column, because we added property incremental.column.name to be able to identify incremental column. Actually, such property is disabled.

best, Luis

zhipcui commented 6 years ago

incremental.column.name property has been add ?

lazaromedina commented 6 years ago

sorry, i meant "we added property"