keedio / flume-ng-sql-source

Flume Source to import data from SQL Databases
Apache License 2.0
264 stars 164 forks source link

Can not stream the newly inserted data in mysql table to HDFS #57

Closed zorowonder closed 5 years ago

zorowonder commented 6 years ago

Hi, I'm trying to stream data from mysql table into HDFS. But I can't stream the newly inserted data in mysql table.

I'm using flume-release-1.7.0 + flume-ng-sql-source-1.5.0 + Hadoop 2.7.6 + MySQL 5.7.17

At first, I followed the

https://community.toadworld.com/platforms/oracle/w/wiki/11114.streaming-mysql-database-table-data-to-hdfs-with-flume

to build the entire system. And then I found the updated version of each project as I list above and I updated the conf file.

My flume.conf looks like this,

===================================================================

agent1.channels = ch1 agent1.sinks = h1 agent1.sources = ss1

agent1.channels.ch1.type = memory

agent1.sources.ss1.channels = ch1 agent1.sources.ss1.type = org.keedio.flume.source.SQLSource

agent1.sources.ss1.hibernate.connection.url = jdbc:mysql://129.1.99.1:3306/test agent1.sources.ss1.hibernate.connection.user = root agent1.sources.ss1.hibernate.connection.password = root agent1.sources.ss1.table = user agent1.sources.ss1.start.from = 0 agent1.sources.ss1.custom.query = select * from user where id > $@$ order by id asc agent1.sources.ss1.batch.size = 1000 agent1.sources.ss1.max.rows = 1000 agent1.sources.ss1.run.query.delay=15000 agent1.sources.ss1.hibernate.connection.driver_class = com.mysql.jdbc.Driver agent1.sources.ss1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect agent1.sources.ss1.connection.autocommit = true

agent1.sources.ss1.status.file.path = /opt/flume/status agent1.sources.ss1.status.file.name = ss1.status

agent1.sinks.h1.channel = ch1 agent1.sinks.h1.type = hdfs agent1.sinks.h1.hdfs.path = hdfs://129.1.99.5:9000/flume/mysql/%y-%m-%d/%H%M/%S agent1.sinks.h1.hdfs.writeFormat = Text agent1.sinks.h1.hdfs.rollSize = 268435456 agent1.sinks.h1.hdfs.rollInterval = 0 agent1.sinks.h1.hdfs.rollCount = 0 agent1.sinks.h1.hdfs.fileType = DataStream agent1.sinks.h1.hdfs.threadsPoolSize = 10

agent1.sinks.h1.hdfs.filePrefix = evt- agent1.sinks.h1.hdfs.useLocalTimeStamp = true agent1.sinks.h1.hdfs.round = true agent1.sinks.h1.hdfs.roundValue = 10 agent1.sinks.h1.hdfs.roundUnit = minute

agent1.sinks.h1.hdfs.retryInterval = 60

===================================================================

I start the flume agent with command: bin/flume-ng agent -c conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n agent1 And part of log in console is:

=====================================

2018-05-16 17:36:32,580 (PollableSourceRunner-SQLSource-ss1) [DEBUG - org.hibernate.engine.jdbc.spi.SqlStatementLogger.logStatement(SqlStatementLogger.java:109)] select from user where id > 89 order by id asc limit ? 2018-05-16 17:36:47,525 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes 2018-05-16 17:36:47,583 (PollableSourceRunner-SQLSource-ss1) [DEBUG - org.hibernate.engine.jdbc.spi.SqlStatementLogger.logStatement(SqlStatementLogger.java:109)] select from user where id > 89 order by id asc limit ? 2018-05-16 17:37:02,585 (PollableSourceRunner-SQLSource-ss1) [DEBUG - org.hibernate.engine.jdbc.spi.SqlStatementLogger.logStatement(SqlStatementLogger.java:109)] select from user where id > 89 order by id asc limit ? 2018-05-16 17:37:17,526 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes 2018-05-16 17:37:17,588 (PollableSourceRunner-SQLSource-ss1) [DEBUG - org.hibernate.engine.jdbc.spi.SqlStatementLogger.logStatement(SqlStatementLogger.java:109)] select from user where id > 89 order by id asc limit ?

=====================================

The table user is very simple.It contains id(pk),name and age. I'm confused about 'limit ?' in my query. After inserting the new data in mysql table, for example, a data with id larger than 89. The log is not changed and continue print the same content like 'select * from user where id > 89 order by id asc limit ?'.

Did I do something wrong?

Thanks!

lazaromedina commented 6 years ago

Hi zorowonder, i could reproduce same weird behavior. Seems to be a bug when using property "max.rows" and retrieving LastIndex for file track status. My table has 3 columns with 20000 rows. If i set "..max.rows = 10" i get 10 rows dumped to file_roll:

mysql> select * from customers limit 10;
+------------+-----------+----+
| first_name | last_name | id |
+------------+-----------+----+
| OnNgny     | Q9nLQOU   |  1 |
| F8jB       | z6Uo3     |  2 |
| Mx3wrqo    | huZg89    |  3 |
| 8qFWeW0    | BE08      |  4 |
| FyOGkxq    | ZcNXhQp   |  5 |
| laiiXk     | Qhp9S     |  6 |
| aGCFFcl    | GQ49      |  7 |
| MQzQ1G     | Mv1b69    |  8 |
| CzmRy7J    | dT5U0rQ   |  9 |
| qoUv       | Kl4fF7D   | 10 |
+------------+-----------+----+
10 rows in set (0,00 sec)

dumped data is :

F8jB~z6Uo3~2
Mx3wrqo~huZg89~3
8qFWeW0~BE08~4
FyOGkxq~ZcNXhQp~5
laiiXk~Qhp9S~6
aGCFFcl~GQ49~7
MQzQ1G~Mv1b69~8
CzmRy7J~dT5U0rQ~9
qoUv~Kl4fF7D~10

For some reason, the last value of the first column is taken as column name as you can see in my log:

2018-05-18 10:06:37,796 (PollableSourceRunner-SQLSource-sql1) [DEBUG - org.hibernate.engine.jdbc.spi.SqlStatementLogger.logStatement(SqlStatementLogger.java:109)] SELECT * FROM customers WHERE id > p31K order by id asc limit ?
2018-05-18 10:06:37,796 (PollableSourceRunner-SQLSource-sql1) [DEBUG - org.hibernate.engine.jdbc.internal.LogicalConnectionImpl.obtainConnection(LogicalConnectionImpl.java:226)] Obtaining JDBC connection
2018-05-18 10:06:37,796 (PollableSourceRunner-SQLSource-sql1) [DEBUG - com.mchange.v2.resourcepool.BasicResourcePool.trace(BasicResourcePool.java:1747)] trace com.mchange.v2.resourcepool.BasicResourcePool@204bd92f [managed: 4, unused: 3, excluded: 0] (e.g. com.mchange.v2.c3p0.impl.NewPooledConnection@398ca38d)
2018-05-18 10:06:37,797 (PollableSourceRunner-SQLSource-sql1) [DEBUG - org.hibernate.engine.jdbc.internal.LogicalConnectionImpl.obtainConnection(LogicalConnectionImpl.java:232)] Obtained JDBC connection
2018-05-18 10:06:37,798 (PollableSourceRunner-SQLSource-sql1) [DEBUG - com.mchange.v2.c3p0.impl.NewPooledConnection.handleThrowable(NewPooledConnection.java:490)] com.mchange.v2.c3p0.impl.NewPooledConnection@226c2a35 handling a throwable.
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column 'p31K' in 'where clause'
    at sun.reflect.GeneratedConstructorAccessor15.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:389)
    at com.mysql.jdbc.Util.getInstance(Util.java:372)
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:980)

I could avoid such a problem increasing max.rows like this:

.....
agent.sources.sql1.table = customers
agent.sources.sql1.run.query.delay=10000
agent.sources.sql1.status.file.path = /var/log/sqlflume/flume-out
agent.sources.sql1.status.file.name = sql1.status
agent.sources.sql1.custom.query = SELECT * FROM customers WHERE id > $@$ order by id asc
agent.sources.sql1.start.from = 0
agent.sources.sql1.batch.size = 1000
agent.sources.sql1.max.rows = 1000000
agent.sources.sql1.delimiter.entry = ~
agent.sources.sql1.enclose.by.quotes = false

We wil provide a fix, as soon as posible.

Best, Luis.

lazaromedina commented 5 years ago

Bug description: when setting custom query, sourcehelper updates file status expecting first column to be an integer (primary key). I update prevous comment, it is no related with "max.rows" property. Fix develop : https://github.com/keedio/flume-ng-sql-source/commit/45274f0bf2a0b799b4fbb2ecc16b268d3e248526

best

ghost commented 5 years ago

Hi, I'm trying to stream data from mysql table into HDFS. But I can't stream the newly inserted data in mysql table.

I'm using flume-release-1.7.0 + flume-ng-sql-source-1.5.0 + Hadoop 2.7.6 + MySQL 5.7.17

At first, I followed the

https://community.toadworld.com/platforms/oracle/w/wiki/11114.streaming-mysql-database-table-data-to-hdfs-with-flume

to build the entire system. And then I found the updated version of each project as I list above and I updated the conf file.

My flume.conf looks like this,

===================================================================

agent1.channels = ch1 agent1.sinks = h1 agent1.sources = ss1

agent1.channels.ch1.type = memory

agent1.sources.ss1.channels = ch1 agent1.sources.ss1.type = org.keedio.flume.source.SQLSource

agent1.sources.ss1.hibernate.connection.url = jdbc:mysql://129.1.99.1:3306/test agent1.sources.ss1.hibernate.connection.user = root agent1.sources.ss1.hibernate.connection.password = root agent1.sources.ss1.table = user agent1.sources.ss1.start.from = 0 agent1.sources.ss1.custom.query = select * from user where id > $@$ order by id asc agent1.sources.ss1.batch.size = 1000 agent1.sources.ss1.max.rows = 1000 agent1.sources.ss1.run.query.delay=15000 agent1.sources.ss1.hibernate.connection.driver_class = com.mysql.jdbc.Driver agent1.sources.ss1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect agent1.sources.ss1.connection.autocommit = true

agent1.sources.ss1.status.file.path = /opt/flume/status agent1.sources.ss1.status.file.name = ss1.status

agent1.sinks.h1.channel = ch1 agent1.sinks.h1.type = hdfs agent1.sinks.h1.hdfs.path = hdfs://129.1.99.5:9000/flume/mysql/%y-%m-%d/%H%M/%S agent1.sinks.h1.hdfs.writeFormat = Text agent1.sinks.h1.hdfs.rollSize = 268435456 agent1.sinks.h1.hdfs.rollInterval = 0 agent1.sinks.h1.hdfs.rollCount = 0 agent1.sinks.h1.hdfs.fileType = DataStream agent1.sinks.h1.hdfs.threadsPoolSize = 10

agent1.sinks.h1.hdfs.filePrefix = evt- agent1.sinks.h1.hdfs.useLocalTimeStamp = true agent1.sinks.h1.hdfs.round = true agent1.sinks.h1.hdfs.roundValue = 10 agent1.sinks.h1.hdfs.roundUnit = minute

agent1.sinks.h1.hdfs.retryInterval = 60

===================================================================

I start the flume agent with command: bin/flume-ng agent -c conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n agent1 And part of log in console is:

=====================================

2018-05-16 17:36:32,580 (PollableSourceRunner-SQLSource-ss1) [DEBUG - org.hibernate.engine.jdbc.spi.SqlStatementLogger.logStatement(SqlStatementLogger.java:109)] select from user where id > 89 order by id asc limit ? 2018-05-16 17:36:47,525 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes 2018-05-16 17:36:47,583 (PollableSourceRunner-SQLSource-ss1) [DEBUG - org.hibernate.engine.jdbc.spi.SqlStatementLogger.logStatement(SqlStatementLogger.java:109)] select from user where id > 89 order by id asc limit ? 2018-05-16 17:37:02,585 (PollableSourceRunner-SQLSource-ss1) [DEBUG - org.hibernate.engine.jdbc.spi.SqlStatementLogger.logStatement(SqlStatementLogger.java:109)] select from user where id > 89 order by id asc limit ? 2018-05-16 17:37:17,526 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes 2018-05-16 17:37:17,588 (PollableSourceRunner-SQLSource-ss1) [DEBUG - org.hibernate.engine.jdbc.spi.SqlStatementLogger.logStatement(SqlStatementLogger.java:109)] select from user where id > 89 order by id asc limit ?

=====================================

The table user is very simple.It contains id(pk),name and age. I'm confused about 'limit ?' in my query. After inserting the new data in mysql table, for example, a data with id larger than 89. The log is not changed and continue print the same content like 'select * from user where id > 89 order by id asc limit ?'.

Did I do something wrong?

Thanks!

I have the same problem. Have you solved it?

riccosir commented 4 years ago

@zorowonder Please check the explanation of [custom.query] carefully. ensure that incremental field will be returned in the first position of the Query result.

Custom Query A custom query is supported to bring the possibility of using the entire SQL language. This is powerful, but risky, be careful with the custom queries used.

To avoid row export repetitions use the $@$ special character in WHERE clause, to incrementaly export not processed rows and the new ones inserted.

IMPORTANT: For proper operation of Custom Query ensure that incremental field will be returned in the first position of the Query result.

Example:

agent.sources.sql-source.custom.query = SELECT incrementalField,field2 FROM table1 WHERE incrementalField > $@$

zorowonder commented 4 years ago

@zorowonder Please check the explanation of [custom.query] carefully. ensure that incremental field will be returned in the first position of the Query result.

Custom Query A custom query is supported to bring the possibility of using the entire SQL language. This is powerful, but risky, be careful with the custom queries used.

To avoid row export repetitions use the $@$ special character in WHERE clause, to incrementaly export not processed rows and the new ones inserted.

IMPORTANT: For proper operation of Custom Query ensure that incremental field will be returned in the first position of the Query result.

Example:

agent.sources.sql-source.custom.query = SELECT incrementalField,field2 FROM table1 WHERE incrementalField > $@$

@riccosir The problem is in version 1.5.0 and 1.5.2. Then I test version 1.4.1 and it is ok in my case.

zorowonder commented 4 years ago

@imsoou I test version 1.4.1 and it works

riccosir commented 4 years ago

@zorowonder Please check the explanation of [custom.query] carefully. ensure that incremental field will be returned in the first position of the Query result.

Custom Query A custom query is supported to bring the possibility of using the entire SQL language. This is powerful, but risky, be careful with the custom queries used.

To avoid row export repetitions use the $@$ special character in WHERE clause, to incrementaly export not processed rows and the new ones inserted.

IMPORTANT: For proper operation of Custom Query ensure that incremental field will be returned in the first position of the Query result.

Example:

agent.sources.sql-source.custom.query = SELECT incrementalField,field2 FROM table1 WHERE incrementalField > $@$

@riccosir The problem is in version 1.5.0 and 1.5.2. Then I test version 1.4.1 and it is ok in my case.

The old version(1.4.1) uses counters instead of ID values for comparison, while the new version uses real ID values. Counter matches may have duplicate records when IDs are not consecutive. So I suggest you adjust the order of the columns in the query results.

zorowonder commented 4 years ago

@zorowonder Please check the explanation of [custom.query] carefully. ensure that incremental field will be returned in the first position of the Query result.

Custom Query A custom query is supported to bring the possibility of using the entire SQL language. This is powerful, but risky, be careful with the custom queries used.

To avoid row export repetitions use the $@$ special character in WHERE clause, to incrementaly export not processed rows and the new ones inserted.

IMPORTANT: For proper operation of Custom Query ensure that incremental field will be returned in the first position of the Query result.

Example:

agent.sources.sql-source.custom.query = SELECT incrementalField,field2 FROM table1 WHERE incrementalField > $@$

@riccosir The problem is in version 1.5.0 and 1.5.2. Then I test version 1.4.1 and it is ok in my case.

The old version(1.4.1) uses counters instead of ID values for comparison, while the new version uses real ID values. Counter matches may have duplicate records when IDs are not consecutive. So I suggest you adjust the order of the columns in the query results.

@riccosir Thanks for suggestion! I will try it later.