keedio / flume-ng-sql-source

Flume Source to import data from SQL Databases
Apache License 2.0
264 stars 164 forks source link

How to change the delimiter and remove the double quotation marks on both sides of the field #44

Closed starbow727 closed 6 years ago

starbow727 commented 7 years ago

hello, I want to change the delimiter and remove the double quotation marks on both sides of the field。 1504865038350¦"11683","10501","HSJYLYWLXGJ","?????????????? ","hswlxgj ","0559-5522319","0559-5548820","??? ","","","","240","public","expe?%£&?"11882","d","T","F","HSJYLYWLXGJ ","hongcun","2012-06-20 15:08:50.0","cwq","2016-03-18 09:48:09.0","HSJYLY","F","","","2","","" How to do it。 I use these two parameters, and did not get the results。 a1.sources.s1.delimiter.entry=~ a1.sources.s1.enclose.by.quotes=false

starbow727 commented 7 years ago
Change these two values to no effect

private static final String DEFAULT_DELIMITER_ENTRY = "~"; private static final Boolean DEFAULT_ENCLOSE_BY_QUOTES = false;

ChenShuai1981 commented 6 years ago

Does flume-ng-sql-source support output JSON format?

mvalleavila commented 6 years ago

Hello,

No it isn't supported, but is a good feature for the future roadmap.

2017-11-13 6:43 GMT+01:00 ChenShuai1981 notifications@github.com:

Does flume-ng-sql-source support output JSON format?

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/keedio/flume-ng-sql-source/issues/44#issuecomment-343819867, or mute the thread https://github.com/notifications/unsubscribe-auth/AERdiAwfYbgmvkTJYMkqlxu-qR2CJjeMks5s19cHgaJpZM4PSQfc .

starbow727 commented 6 years ago

flume-ng-sql-source可以输出json格式,flume查询数据库的时候,sql拼接成json格式。结合过滤器使用。我们有把数据拼成json格式再放到kafka传入kylin中。@ChenShuai1981

yucy commented 6 years ago

@starbow727 ,你们用的flume版本是多少?我用的flume 1.8,和这个插件不太匹配,1.8版本里的PollableSource接口多了两个方法 getBackOffSleepIncrement(); getMaxBackOffSleepInterval();

mvalleavila commented 6 years ago

Hello @starbow727 @yucy,

Thank you for bringing solutions for using json output format. But please, write in english so your comments could be read by everybody.

Thanks

ChenShuai1981 commented 6 years ago

Could you show me a simple example? Thanks! @starbow727

trockxi commented 6 years ago

@starbow727 我正在尝试mysql同步到kafka,但是我这边未看到值的更新(新增和值改变)的,下面是我的的配置,麻烦帮看看是哪里为设置正确,谢谢! a1.channels = ch-1 a1.sources = src-1 a1.sinks = k1 ###########sql source#################

For each one of the sources, the type is defined

a1.sources.src-1.type = org.keedio.flume.source.SQLSource a1.sources.src-1.hibernate.connection.url = jdbc:mysql://10.72.11.111:3306/m_db

Hibernate Database connection properties

a1.sources.src-1.hibernate.connection.user = root a1.sources.src-1.hibernate.connection.password = root a1.sources.src-1.hibernate.connection.autocommit = true a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver a1.sources.src-1.run.query.delay=5000 a1.sources.src-1.status.file.path = /var/log/flume a1.sources.src-1.status.file.name = sqlSource.status

Custom query

a1.sources.src-1.start.from = 0 a1.sources.src-1.enclose.by.quotes = true a1.sources.src-1.columns.to.select = a1.sources.src-1.custom.query = SELECT from tb_member a1.sources.src-1.batch.size = 1000 a1.sources.src-1.max.rows = 1000 a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider a1.sources.src-1.hibernate.c3p0.min_size=1 a1.sources.src-1.hibernate.c3p0.max_size=10

################################################################ a1.channels.ch-1.type = memory a1.channels.ch-1.capacity = 10000 a1.channels.ch-1.transactionCapacity = 10000 a1.channels.ch-1.byteCapacityBufferPercentage = 20 a1.channels.ch-1.byteCapacity = 800000

################################################################ a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = test218 a1.sinks.k1.brokerList = 10.72.8.87:9092,10.72.8.88:9092,10.72.8.89:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 a1.sinks.k1.channel = ch-1 a1.sources.src-1.channels=ch-1

starbow727 commented 6 years ago

我用的版本Flume 1.6.0-cdh5.8.4 我用的flume-sql-source flume-ng-sql-source-1.4.3.jar 这两个接口对你有什么影响吗?

在 2017-11-16 15:43:56,"yucy" notifications@github.com 写道:

@starbow727 ,你们用的flume版本是多少?我用的flume 1.8,和这个插件不太匹配,1.8版本里的PollableSource接口多了两个方法 getBackOffSleepIncrement(); getMaxBackOffSleepInterval();

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or mute the thread.

starbow727 commented 6 years ago

hello @trockxi use incremental fields like this WHERE ID >$@$ ORDER BY ID ASC

wujlmochasoft commented 6 years ago

hi @starbow727 ,How do you solve the problem with quotes around json strings,I use SQL statements stitching json string, but flume report error log like this org.hibernate.QueryException: Not all named parameters have been set: [""] [select CONCAT('{"id":"',id,"","productName":"",productName,""}") from call_log_2017_08 where id > 0 order by id] at org.hibernate.internal.AbstractQueryImpl.verifyParameters(AbstractQueryImpl.java:401) at org.hibernate.internal.SQLQueryImpl.verifyParameters(SQLQueryImpl.java:195) at org.hibernate.internal.SQLQueryImpl.list(SQLQueryImpl.java:134) at org.keedio.flume.source.HibernateHelper.executeQuery(HibernateHelper.java:122) at org.keedio.flume.source.SQLSource.process(SQLSource.java:89) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) at java.lang.Thread.run(Thread.java:748) Have you encountered this problem, how to solve it?

wujlmochasoft commented 6 years ago

@starbow727 By the way, can you give an example of a SQL statement stitching a json string?

starbow727 commented 6 years ago

use interceptors like this

Filter the braces before the start of the field and quotation marks

tier1.sources.s1.interceptors.i3.type = search_replace tier1.sources.s1.interceptors.i3.searchPattern = "-?[1-9]\d*","\{" tier1.sources.s1.interceptors.i3.replaceString = \{

At 2017-11-27 14:50:44, "WUJL" notifications@github.com wrote:

hi @starbow727 ,How do you solve the problem with quotes around json strings,I use SQL statements stitching json string, but flume report error log like this org.hibernate.QueryException: Not all named parameters have been set: [""] [select CONCAT('{"id":"',id,"","productName":"",productName,""}") from call_log_2017_08 where id > 0 order by id] at org.hibernate.internal.AbstractQueryImpl.verifyParameters(AbstractQueryImpl.java:401) at org.hibernate.internal.SQLQueryImpl.verifyParameters(SQLQueryImpl.java:195) at org.hibernate.internal.SQLQueryImpl.list(SQLQueryImpl.java:134) at org.keedio.flume.source.HibernateHelper.executeQuery(HibernateHelper.java:122) at org.keedio.flume.source.SQLSource.process(SQLSource.java:89) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) at java.lang.Thread.run(Thread.java:748) Have you encountered this problem, how to solve it?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or mute the thread.

trockxi commented 6 years ago

@starbow727 辛苦直接给个可用的配置吧 我不是很懂配置 不知如何下手 谢谢

lucarosellini commented 6 years ago

I kindly ask everyone to speak in english here. Thanks

ChenShuai1981 commented 6 years ago

Why not upgrade apache flume to streamsets data collector? https://streamsets.com/blog/upgrading-apache-flume-streamsets-data-collector/

wujlmochasoft commented 6 years ago

@starbow727 thank you very much. I tried to the configuration items you gave, I may be wrong or other reasons, anyway, I did not succeed. So, if it is convenient to give a contact, I would like to ask some details of the problem. You can send contact information, such as QQ, to my Gmail at wujl.vince@gmail.com. Grateful

wujlmochasoft commented 6 years ago

Thank you @starbow727 help me out of the mud, my problem has been solved. In order to give a reference to the buddy who has not yet solved the problem, I put my thoughts into a document. Chinese and English versions are available, Chinese version please click here, English version please click here. If you have any problems reading the document, send the question to my Gmail at wujl.vince@gmail.com, thanks

lazaromedina commented 6 years ago

Hi everyone, i'm closing this issue because from what I have been able to discern and understand, the initial problem posed by starbow727: "cofigurable parameters delimiter.entry=~ and enclose.by.quotes=false not working" is not repetible becatuse i can get it working, please check:

agent.sources.sql1.delimiter.entry = ~ agent.sources.sql1.enclose.by.quotes = false

and some rows sinking to file_roll:

1493~jpX0b6b
1494~3zMa
1495~KJwE
1496~XTGv
1497~GxEsqk
1498~HVp6
1499~1OnVD
1500~fEWwqP
1501~YnWtWt

Please feel free to reopen if necessary with more information about the intial problem. best, Luis