keedio / flume-ng-sql-source

Flume Source to import data from SQL Databases
Apache License 2.0
264 stars 164 forks source link

Updated records from Oracle DB are not streamed #48

Closed ganikris86 closed 6 years ago

ganikris86 commented 6 years ago

Hello,

I'm trying to stream the data from an Oracle DB table into kafka sink on a specific topic. I'm able to stream the records which get newly inserted. If the (existing) records in the table are updated (some fields), then this updated data is not getting streamed.

My flume.conf looks like this,

agent.channels=ch1 ch2 agent.sinks=kafkaSink1 kafkaSink2 agent.sources=sql-source1 sql-source2

agent.channels.ch1.type=memory agent.channels.ch1.capacity=10000 agent.channels.ch1.transactionCapacity=10000

agent.sources.sql-source1.channels=ch1

agent.sources.sql-source1.type=org.keedio.flume.source.SQLSource

agent.sources.sql-source1.hibernate.connection.url=jdbc:oracle:thin:@XXX.XXX.XX.XXX:1521: agent.sources.sql-source1.hibernate.connection.user=XLS_ADMIN agent.sources.sql-source1.hibernate.connection.password=xls agent.sources.sql-source1.hibernate.connection.autocommit=true agent.sources.sql-source1.hibernate.dialect=org.hibernate.dialect.Oracle10gDialect agent.sources.sql-source1.hibernate.connection.driver_class=oracle.jdbc.driver.OracleDriver

agent.sources.sql-source1.table=XLS_ADMIN.MD_MEDIA agent.sources.sql-source1.columns.to.select=*

agent.sources.sql-source1.incremental.column.name=CREATED_DATETIME

agent.sources.sql-source1.incremental.value=0

agent.sources.sql-source1.run.query.delay=5000

agent.sources.sql-source1.status.file.path=/var/lib/flume agent.sources.sql-source1.status.file.name=sql-source1.status

agent.sinks.kafkaSink1.type=org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkaSink1.brokerList=192.168.77.99:9092 agent.sinks.kafkaSink1.topic=md_media agent.sinks.kafkaSink1.channel=ch1 agent.sinks.kafkaSink1.batchSize=10

In the above configuration, the field CREATED_DATETIME will be updated to the latest timestamp whenever there is an update to any of the records in the table.

Any clue why the updated data is not getting streamed ?

lucarosellini commented 6 years ago

Hi @ganikris86, for your use case I think it's better to use the custom query approach. In your example it should be something like:

agent.sources.sql-source.custom.query = SELECT * FROM XLS_ADMIN.MD_MEDIA WHERE CREATED_DATETIME > $@$

Let us know if this works for you

ganikris86 commented 6 years ago

Hi @lucarosellini ,

I updated the flume.conf to use custom query. My conf looks like below,

agent.sources.sql-source1.start.from = SYSDATE-300 agent.sources.sql-source1.custom.query = SELECT * FROM XLS_ADMIN.MD_MEDIA WHERE CREATED_DATETIME > $@$

agent.sources.sql-source1.table=XLS_ADMIN.MD_MEDIA

agent.sources.sql-source1.columns.to.select=*

agent.sources.sql-source1.incremental.column.name = CREATED_DATETIME

agent.sources.sql-source1.incremental.value=0

In the above, though i mentioned the 'start.from' as 'SYSDATE-300' (which in Oracle DB means current date minus 300 days) this custom query use the table's primary key or the first column as its index i believe. I get this error in the console,

2017-12-19 11:52:22,096 (PollableSourceRunner-SQLSource-sql-source1) [WARN - org.hibernate.engine.jdbc.spi.SqlExceptionHelper.logExceptions(SqlExceptionHelper.java:144)] SQL Error: 932, SQLState: 42000
2017-12-19 11:52:22,097 (PollableSourceRunner-SQLSource-sql-source1) [ERROR - org.hibernate.engine.jdbc.spi.SqlExceptionHelper.logExceptions(SqlExceptionHelper.java:146)] ORA-00932: inconsistent datatypes: expected TIMESTAMP got NUMBER

2017-12-19 11:52:22,098 (PollableSourceRunner-SQLSource-sql-source1) [ERROR - org.keedio.flume.source.HibernateHelper.executeQuery(HibernateHelper.java:124)] Exception thrown, resetting connection.
org.hibernate.exception.SQLGrammarException: could not extract ResultSet
        at org.hibernate.exception.internal.SQLExceptionTypeDelegate.convert(SQLExceptionTypeDelegate.java:80)
        at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:49)
        at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:126)
        at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:112)
        at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.extract(ResultSetReturnImpl.java:91)
        at org.hibernate.loader.Loader.getResultSet(Loader.java:2066)
        at org.hibernate.loader.Loader.executeQueryStatement(Loader.java:1863)
        at org.hibernate.loader.Loader.executeQueryStatement(Loader.java:1839)
        at org.hibernate.loader.Loader.doQuery(Loader.java:910)
        at org.hibernate.loader.Loader.doQueryAndInitializeNonLazyCollections(Loader.java:355)
        at org.hibernate.loader.Loader.doList(Loader.java:2554)
        at org.hibernate.loader.Loader.doList(Loader.java:2540)
        at org.hibernate.loader.Loader.listIgnoreQueryCache(Loader.java:2370)
        at org.hibernate.loader.Loader.list(Loader.java:2365)
        at org.hibernate.loader.custom.CustomLoader.list(CustomLoader.java:353)
        at org.hibernate.internal.SessionImpl.listCustomQuery(SessionImpl.java:1909)
        at org.hibernate.internal.AbstractSessionImpl.list(AbstractSessionImpl.java:311)
        at org.hibernate.internal.SQLQueryImpl.list(SQLQueryImpl.java:141)
        at org.keedio.flume.source.HibernateHelper.executeQuery(HibernateHelper.java:122)
        at org.keedio.flume.source.SQLSource.process(SQLSource.java:89)
        at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLSyntaxErrorException: ORA-00932: inconsistent datatypes: expected TIMESTAMP got NUMBER

        at oracle.jdbc.driver.SQLStateMapping.newSQLException(SQLStateMapping.java:91)
        at oracle.jdbc.driver.DatabaseError.newSQLException(DatabaseError.java:133)
        at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:206)
        at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:455)
        at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:413)
        at oracle.jdbc.driver.T4C8Oall.receive(T4C8Oall.java:1034)
        at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:194)
        at oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:791)
        at oracle.jdbc.driver.T4CPreparedStatement.executeMaybeDescribe(T4CPreparedStatement.java:866)
        at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1186)
        at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3387)
        at oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3431)
        at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java:1491)
        at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.extract(ResultSetReturnImpl.java:82)
        ... 17 more

The sql-source status file has the content, {"SourceName":"sql-source1","URL":"jdbc:oracle:thin:@192.168.77.106:1521:X763DEV1","LastIndex":"480002","Query":"SELECT * FROM XLS_ADMIN.MD_MEDIA WHERE CREATED_DATETIME > $@$"}

Look at the 'LastIndex' field. It shows a number (its the first column in the table, a primary key as well). Why is it that the custom query is not using the 'start.from' i specified in the conf ?

lucarosellini commented 6 years ago

Hi @ganikris86: did you delete the status file before executing the agent with the changes in the config? The agent does not make any assumption on the type of the incremental column you're using and it does use the start.from option in order to set the offset for the first execution (as in: https://github.com/keedio/flume-ng-sql-source/blob/develop/src/main/java/org/keedio/flume/source/SQLSourceHelper.java#L116).

It seems to me that on your second execution of the agent, where you switched on the custom query behaviour, the agent found an existing status file, generated by a previous execution, where the pk of your table was being used as the incremental column. Since the types of the actual and the previous incremental columns differ, you get the ORA-00932.

Can you please try again deleting the status file before running the agent?

ganikris86 commented 6 years ago

Hi @lucarosellini ,

I did remove the status file before i started the flume agent. In fact, for each start of the flume agent i cautiously delete the status files from '/var/lib/flume'. In this case the status file was getting generated in the directory '/var/lib/flume' and the file name is 'sql-source1.status'. As you mentioned, I understand that the pk of the table is getting compared with the CREATED_DATETIME field of the table. But i wonder why it doesn't use the start.from mentioned in the conf.

lazaromedina commented 6 years ago

Hi @ganikris86 , i will try to highlight some issues that seem confusing:

_agent.sources.sql1.custom.query = SELECT id,firstname FROM customers WHERE id > 1620 now delete status file and restart flume-sql, i get by sinking to flie_roll the latest rows (and just this rows):

1621~8Kyp
1622~t66S3
1623~MPVds1P
1624~GqJ31sk
1625~CzbhaPA
1626~rgvVm
1627~WATU
1628~VvOK
1620~IAQE
1621~8Kyp
1622~t66S3
1623~MPVds1P
1624~GqJ31sk
1625~CzbhaPA
1626~rgvVm
1627~WATU
1628~VvOK
| IAQE       | Bc5mJG5   | 1620 |
| 8Kyp       | n9uZhu    | 1621 |
| t66S3      | 60WNK     | 1622 |
| MPVds1P    | sTMsCp    | 1623 |
| GqJ31sk    | L2j3      | 1624 |
| rgvVm      | hPP1qoD   | 1626 |
| WATU       | 2URDmdF   | 1627 |
| VvOK       | mrx8y     | 1628 |
| CzbhaPA    | uQPWN     | 1700 |
+------------+-----------+------+

i get processed this "new data" from an existing row with update id:

1620~IAQE
1621~8Kyp
1622~t66S3
1623~MPVds1P
1624~GqJ31sk
1625~CzbhaPA <----
1626~rgvVm
1627~WATU 
1628~VvOK
1700~CzbhaPA <----

Please reopen issue if you want to add something else or consider it appropriate. Thanks, best, Luis.