keedio / flume-ng-sql-source

Flume Source to import data from SQL Databases
Apache License 2.0
264 stars 164 forks source link

org.apache.flume.EventDeliveryException while running flume and sending data to spark #21

Closed prashanttct07 closed 8 years ago

prashanttct07 commented 8 years ago

Hi Team.

I am using the flume-ng-sql-source plugin to push data from Mysql DB to Spark, so while running the same I am getting the below exception kindly let me know how to avoid the same.

Exception which I am getting is mentioned below.

{ 2015-10-14 18:00:43,838 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: Failed to send events at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:403) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flume.FlumeException: NettyAvroRpcClient { host: localhost, port: 55555 }: RPC connection error at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:182) at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:121) at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:638) at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88) at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127) at org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:222) at org.apache.flume.sink.AbstractRpcSink.verifyConnection(AbstractRpcSink.java:283) at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:360) ... 3 more Caused by: java.io.IOException: Error connecting to localhost/127.0.0.1:55555 at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261) at org.apache.avro.ipc.NettyTransceiver.(NettyTransceiver.java:203) at org.apache.avro.ipc.NettyTransceiver.(NettyTransceiver.java:152) at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:168) ... 10 more Caused by: java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744) at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.connect(NioClientSocketPipelineSink.java:496) at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.processSelectedKeys(NioClientSocketPipelineSink.java:452) at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.run(NioClientSocketPipelineSink.java:365) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) ... 1 more 2015-10-14 18:00:43,945 (PollableSourceRunner-SQLSource-sql-source) [INFO - org.hibernate.engine.jdbc.connections.internal.DriverManagerConnectionProviderImpl.stop(DriverManagerConnectionProviderImpl.java:281)] HHH000030: Cleaning up connection pool [jdbc:mysql://localhost:3306/demodb_1] }

mvalleavila commented 8 years ago

Hello Prashant,

Can you share your flume configuration?

prashanttct07 commented 8 years ago

Hi Pleas find below configuration.

agent1.channels.ch1.type = memory agent1.sources.sql-source.channels = ch1 agent1.channels = ch1 agent1.sinks = avroSink

agent1.sources = sql-source agent1.sources.sql-source.type = org.keedio.flume.source.SQLSource

URL to connect to database (currently only mysql is supported)

agent1.sources.sql-source.connection.url = jdbc:mysql://localhost:3306/demodb_1

Database connection properties

agent1.sources.sql-source.user = root agent1.sources.sql-source.password = test123 agent1.sources.sql-source.table = table1 agent1.sources.sql-source.database = db1

agent1.sources.sql-source.columns.to.select = id, name, location

Increment column properties (To be explored further)

agent1.sources.sql-source.incremental.column.name = id

Increment value is from you want to start taking data from tables (0 will import entire table)

Here value is maintained as row number like if given 3800 so means val;ue will be extracted column 3800 onwards like 3801 to ...

agent1.sources.sql-source.incremental.value = 0

Query delay, each configured milisecond the query will be sent (Can be considered as performance parameter in later stages)

agent1.sources.sql-source.run.query.delay=1000 agent1.sources.sql-source.batch.size=10

Status file is used to save last readed row

agent1.sources.sql-source.status.file.path = /var/lib/flume agent1.sources.sql-source.status.file.name = sql-source.status

Channel

agent1.channels.ch1.capacity = 10000 agent1.channels.ch1.transactionCapacity = 10000

agent1.sinks.avroSink.type = avro agent1.sinks.avroSink.channel = ch1 agent1.sinks.avroSink.batch-size=10 agent1.sinks.avroSink.hostname = 192.168.0.121 agent1.sinks.avroSink.port = 55555

Note : Here I have tried agent1.sinks.avroSink.hostname = 192.168.0.121 as localhost as well

Let me know if you require anything else for the same

mvalleavila commented 8 years ago

Prashant it seems that your avro sink is not able to connect with an avro source in localhost/192.168.0.121 port 55555

Caused by: java.io.IOException: Error connecting to localhost/127.0.0.1:55555

Issue closed due not related with flume sql source

prashanttct07 commented 8 years ago

Hey ,

Thanks for reply, BTW any idea why the error is coming wrt connecting as both are running on same machine ?

mvalleavila commented 8 years ago

Maybe the other flume agent configured with avro source is not properly running. You can check if some flume service is running and listening on port 55555 executing netstat -atunp | grep 55555

prashanttct07 commented 8 years ago

When my flume and spark both are running and if I do netstat -atunp | grep 55555

then I am not getting any output but exception in flumes are continuously coming

prashanttct07 commented 8 years ago

I checked the same and the things and below are observation and understanding-

Environment : 1) We are running 3 node spark (say m\c with IP 121,189 and 185) and 1 node flume on 121 2) Configured receiver input IP and port as 121:55555 in spark 3) Configured sink host and port as 121:55555 in spark

Run the spark and flume - So what we have observed is like the sometimes the port is listening on 121:55555 , some time 185:55555 and some time 189:55555 for spark

And this assignment of values are being dome by spark on random basis. So when its assigned IP 121 then its working , and when assigned as 185 or 189 it starts giving exception.

So do you have any idea where we can give IP as an array in flume so that if any one is connected so send the events or can we specify the same in spark like receiving of input should not be distributed and should be done on specific node.

Any help on this would be really helpful.

mvalleavila commented 8 years ago

Hello Prashant,

It seems that you are using flume + spark directly. Spark in Batch or Spark Streaming?

You'll need a middleware solution for either, a direct write from flume to a Spark Job is not possible.

One of the followings can be usefull for your scenario:

0x333333 commented 8 years ago

@mvalleavila , I came into the same issue here, thanks for your answer.

However, I found that it is possible to connect Flume to Spark Streaming directly: http://spark.apache.org/docs/latest/streaming-flume-integration.html

mvalleavila commented 8 years ago

zp-j,

Interesting! The second way to do the connection sounds good, http://spark.apache.org/docs/latest/streaming-flume-integration.html#approach-2-pull-based-approach-using-a-custom-sink

Anyway, for streaming in production environments, I still prefer a message broker like Kafka between Flume and Spark. Kafka can guarantee:

In the other hand:

In my opinion, connecting Flume and Spark directly is useful for testing or development environments. If we think about a production scenario, using a certain message broker as middleware, like Kafka, can be a good idea.

Regards

0x333333 commented 8 years ago

Hi @mvalleavila ,

I couldn't agree more with you, in production environment availability and flexibility are very important. Thanks for your advice! :)

hassanmehmud commented 5 years ago

did somebody found solution to this problem ? i am trying to stream directly from flume to spark, as there is documentation on it. but i am not able to do, it returns error "unable to deliver event"