keedio / flume-ng-sql-source

Flume Source to import data from SQL Databases
Apache License 2.0
264 stars 164 forks source link

flume stopped after first get data from mysql #23

Closed tyipwsi closed 7 years ago

tyipwsi commented 8 years ago

flume stopped after first time get data from mysql. Does it suppose to run continuously?

mvalleavila commented 8 years ago

Hello,

The source is designed to execute queries each time configured in run.query.delay. This value is set by default to 10000 ms, so every 10 seconds a new request to the database will be done. If you want to process data nearer to real-time try to set this parameter to a smaller value (take into consideration that the database will have more charge).

If you are seeing a different behaviour, can you provide a few more information about the error?

Database you are using, Configuration properties, log traces...

Thanks, Marcelo

2016-01-26 17:03 GMT+01:00 tyipwsi notifications@github.com:

flume stopped after first time get data from mysql. Does it suppose to run continuously?

— Reply to this email directly or view it on GitHub https://github.com/keedio/flume-ng-sql-source/issues/23.

2efPer commented 7 years ago

I'm confronting this problem too. Is there any way to print the DEBUG level information in cloudera flume? Because there is no further info in INFO level.

mvalleavila commented 7 years ago

Hello,

To print DEBUG level information in Cloudera Flume try to search in Cloudera Manager Flume Properties page the string "level". There you can set the log level you need

Anyway, Can you share more info about your environment?

Thanks

2efPer commented 7 years ago

Hi, here is my environment info, 1、properties file

tier1.sources = sql-source
tier1.channels = ch1
tier1.sinks = HDFS

tier1.sources.sql-source.channels = ch1
tier1.sources.sql-source.type = org.keedio.flume.source.SQLSource  
tier1.sources.sql-source.hibernate.connection.url = jdbc:mysql://localhost:3306/nova  
tier1.sources.sql-source.hibernate.connection.user = sss
tier1.sources.sql-source.hibernate.connection.password = Password!
tier1.sources.sql-source.table = sessions  
tier1.sources.sql-source.columns.to.select = *
#tier1.sources.sql-source.incremental.column.name = id  
#tier1.sources.sql-source.incremental.value = 0  
tier1.sources.sql-source.run.query.delay=10000
tier1.sources.sql-source.status.file.path = /var/log/flume-ng/
tier1.sources.sql-source.status.file.name = sql-source.status  
tier1.sources.sql-source.delimiter.entry = |
tier1.sources.sql-source.applyquotes = false

tier1.channels.ch1.type = memory  

tier1.sinks.HDFS.channel = ch1  
tier1.sinks.HDFS.type = hdfs  
tier1.sinks.HDFS.hdfs.path = hdfs://node1:8020/data/
tier1.sinks.HDFS.hdfs.fileType = DataStream 
tier1.sinks.HDFS.hdfs.writeFormat = Text
tier1.sinks.HDFS.hdfs.rollSize = 268435456 
tier1.sinks.HDFS.hdfs.rollInterval = 60
tier1.sinks.HDFS.hdfs.rollCount = 0

2、mysql Ver 14.14 Distrib 5.7.18, for Linux (x86_64)

3、mysql-connector-java-5.1.17.jar

thanks for your reply.

mvalleavila commented 7 years ago

Thanks, we are working on reproduce the bug.

mvalleavila commented 7 years ago

Hello,

I reproduce the behavior, but isn't a bug, the flume source have been designed to operate this way.

I used the same sql-source configuration that you and this is how it works:


With a table Persons with initial content:

PersonID LastName FirstName Address City
1 Valle Marcelo Rivas Madrid

I start flume agent: The content going out through the sink is "1"|"Valle"|"Marcelo"|"Rivas"|"Madrid"

If the flume agent continues running any new event is processed

If then a new row is inserted in the mysql table

PersonID LastName FirstName Address City
2 Ranchal Alberto Getafe Madrid

the new event is readed and proccesed by the sink: "2"|"Ranchal"|"Alberto"|"Getafe"|"Madrid"

If now we restart the agent the source load the status file (tier1.sources.sql-source.status.file.path = /var/log/flume-ng/), the status file content is: {"ColumnsToSelect":"*","Table":"Persons","LastIndex":"2","SourceName":"sql-source","URL":"jdbc:mysql:\/\/dos:3306\/test"}

The source use the value "LastIndex":"2" to start reading the table from position 2. The table only content 2 rows, so no event is processed. If we insert a new row in the table:

PersonID LastName FirstName Address City
1 Valle Marcelo Rivas Madrid
2 Ranchal Alberto Getafe Madrid
3 Sanchez Daniel Zabalburu Bilbao

the new row is readed and processed, sending through the sink: "3"|"Sanchez"|"Daniel"|"Zabalburu"|"Bilbao"

If you want to export all the rows of the table before start the agent, and a previous execution was done, you need to manualy remove the status file (/var/log/flume-ng/sql-source.status using the value in your config)

Regards!