keedio / flume-ng-sql-source

Flume Source to import data from SQL Databases
Apache License 2.0
264 stars 164 forks source link

Issue while fetching the records added later to SQL #15

Closed prashanttct07 closed 9 years ago

prashanttct07 commented 9 years ago

Hi Team,

We have integrated this plugin with Flume, so when we are processing the data at start say initially my SQL DB has 1000 records it sends the data to flume.

Now after 5 second another 200 records comes , so I waited for 5 minute also and it was not able to send the data to flume. As a workaround I added just a # in flume.conf and then it processed another 200 newly added record.

Kindly let me know is this in issue or constraint or any workaround is there to overcome this problem. (As wrt parameter mentioned in site wrt this plugin I was not able to correlate any to this use case)

~Prashant

mvalleavila commented 9 years ago

Hello Prashant,

After some tests we reproduce the issue in our environment. In the new release 1.3.2 the bug is fixed

Thanks for the warning!

prashanttct07 commented 9 years ago

Hi Mvalleavila, could you please let me know when that release will be available ?

mvalleavila commented 9 years ago

Prashant,

Of course, It's now available :)

Cheers

Marcelo

prashanttct07 commented 9 years ago

Oh, OK thanks.

Will check and let you know if issue persist.

prashanttct07 commented 9 years ago

One more thing like the chages has been done for source code of flume-ng-sql-source or mysql-connector-java-5.1.35 as well.

mvalleavila commented 9 years ago

Only for the source code, with version 5.1.35 of mysql-connector should be work

prashanttct07 commented 9 years ago

Hi while running the same with latest release I am getting the error as 2015-08-07 15:11:40,855 (PollableSourceRunner-SQLSource-sql-source) [ERROR - org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:156)] Unhandled exception, logging and sleeping for 5000ms org.hibernate.SessionException: Session is closed! at org.hibernate.internal.AbstractSessionImpl.errorIfClosed(AbstractSessionImpl.java:133) at org.hibernate.internal.SessionImpl.createSQLQuery(SessionImpl.java:1841) at org.keedio.flume.source.HibernateHelper.executeQuery(HibernateHelper.java:81) at org.keedio.flume.source.SQLSource.process(SQLSource.java:87) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139) at java.lang.Thread.run(Thread.java:745)

So why the session is getting closed if there are no input for some time.

prashanttct07 commented 9 years ago

Also below messages were not coming during processing the same with earlier version and now these messages are coming: 2015-08-07 15:14:09,916 (PollableSourceRunner-SQLSource-sql-source) [INFO - org.hibernate.hql.internal.ast.ASTQueryTranslatorFactory.(ASTQueryTranslatorFactory.java:47)] HHH000397: Using ASTQueryTranslatorFactory 2015-08-07 15:14:09,916 (PollableSourceRunner-SQLSource-sql-source) [WARN - org.hibernate.type.TypeFactory$TypeScopeImpl.injectSessionFactory(TypeFactory.java:69)] HHH000233: Scoping types to session factory org.hibernate.internal.SessionFactoryImpl@401e1f8d after already scoped org.hibernate.internal.SessionFactoryImpl@33466929

mvalleavila commented 9 years ago

I'm seeing the WARNING message in the logs too, but with 1.3.2 release the new records are processed correctly.

Can you give me some details of your agent configuration and the steps to reproduce the issue

Thanks

prashanttct07 commented 9 years ago

Hi Could you please ignore this Warning message (for a time being) and check the message related to error posted earlier which is mentioned below as well as "2015-08-07 15:11:40,855 (PollableSourceRunner-SQLSource-sql-source) [ERROR - org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:156)] Unhandled exception, logging and sleeping for 5000ms org.hibernate.SessionException: Session is closed! at org.hibernate.internal.AbstractSessionImpl.errorIfClosed(AbstractSessionImpl.java:133) at org.hibernate.internal.SessionImpl.createSQLQuery(SessionImpl.java:1841) at org.keedio.flume.source.HibernateHelper.executeQuery(HibernateHelper.java:81) at org.keedio.flume.source.SQLSource.process(SQLSource.java:87) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139) at java.lang.Thread.run(Thread.java:745)"

Consider the scenario -- Start the flume and connect with DB which has 100 records -- Flume is able to process 100 records -- Insert 5 more records , and observe .. So here also flume is able to process the same -- Now let flume run for some time say 5 min, don't put any new record to sql and observe -- Now we are getting the error (at this point if I am inserting new records it its not able to process)

Let me know if you are not able to re produce now also. BTW below are flume conf details

agent1.channels.ch1.type = memory agent1.sources.sql-source.channels = ch1 agent1.channels = ch1 agent1.sinks = avroSink

agent1.sources = sql-source agent1.sources.sql-source.type = org.keedio.flume.source.SQLSource agent1.sources.sql-source.connection.url = jdbc:mysql://localhost:3306/test agent1.sources.sql-source.user = root agent1.sources.sql-source.password = xxxxxxxx agent1.sources.sql-source.table = abcd agent1.sources.sql-source.database = test agent1.sources.sql-source.columns.to.select = * agent1.sources.sql-source.incremental.column.name = id agent1.sources.sql-source.incremental.value = 0 agent1.sources.sql-source.run.query.delay=1000 agent1.sources.sql-source.status.file.path = /var/lib/flume agent1.sources.sql-source.status.file.name = sql-source.status

Channel

agent1.channels.ch1.capacity = 1000 agent1.channels.ch1.transactionCapacity = 1000

agent1.sinks.avroSink.type = avro agent1.sinks.avroSink.channel = ch1 agent1.sinks.avroSink.hostname = localhost agent1.sinks.avroSink.port = 55555

prashanttct07 commented 9 years ago

Are you able to reproduce the same ?

mvalleavila commented 9 years ago

Yes, I reproduced it. Issue reopened , new version will be release soon fixing the bug

prashanttct07 commented 9 years ago

Any Update on the issue, Or when I can expect the release for the same.

mvalleavila commented 9 years ago

We can't give a specific date for the new release fixing the problem. Anyway if you want to contribute with a solution of the issue you are free to fork the project and do a pull request.