Stratio / ingestion

Flume - Ingestion, an Apache Flume distribution
https://stratio.atlassian.net/wiki/display/PLATFORM/STRATIO+INGESTION
Apache License 2.0
147 stars 62 forks source link

Cassandra Sink: Failed to commit transaction when the channel is full #152

Closed thun8392 closed 8 years ago

thun8392 commented 8 years ago

If the memory or file channel are full, cassandra sink fails when it tries to commit transaction. Error logs:

2015-12-28 11:06:59 DEBUG CassandraSink:181 - Executing CassandraSink.process()... 2015-12-28 11:06:59 ERROR CassandraSink:231 - Failed to commit transaction. Transaction rolled back. org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 40 full, consider committing more frequently, increasing capacity, or increasing thread count at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at com.stratio.ingestion.sink.cassandra.CassandraSink.process(CassandraSink.java:190) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) 2015-12-28 11:06:59 ERROR SinkRunner:160 - Unable to deliver event. Exception follows. org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 40 full, consider committing more frequently, increasing capacity, or increasing thread count at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at com.stratio.ingestion.sink.cassandra.CassandraSink.process(CassandraSink.java:190) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745)

Flume configuration:

flume-conf.properties.txt

Complete flume-ng logs:

flume-ng-logs.txt

aaitor commented 8 years ago

Hi @thun8392, Can you try increasing the postal.channels.memch.capacity and transactionCapacity channel values? postal.channels.memch.capacity = 1000 postal.channels.memch.transactionCapacity = 100

thun8392 commented 8 years ago

Hi @aaitor, Thank you for your response. I have changed the values to 1000 and 100 respectively. When i have tested it again, ingestion returns the next error:

2016-01-07 11:14:34 ERROR SpoolDirectorySource:256 - FATAL: Spool Directory source srcdir: { spoolDir: /var/log/postal/source/ }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing. java.nio.charset.MalformedInputException: Input length = 1 at java.nio.charset.CoderResult.throwException(CoderResult.java:281) at org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:195) at org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:133) at org.apache.flume.serialization.LineDeserializer.readEvent(LineDeserializer.java:71) at org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:90) at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:252) at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:228) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Because source log file has non UTF-8 characters that it is solved with 'postal.sources.srcdir.inputCharset = ISO8859-1')

Then with the values to 1000 and 100, it does not appear the 'CassandraSink - Failed to commit transaction' error again.

Flume-ng logs (truncated in the line 9656 because it has more than 100000 lines):

flume-ng-logs.txt

Is it normal the warning line "WARN CassandraTable:74 - Event [Event headers = {}, body.length = 83 ] could not be mapped. Suggestion: Cassandra is case sensitive, so maybe you can check field names" for every log line that it does not match with the interceptor regular expression?

Regards.

aaitor commented 8 years ago

Hi @thun8392, another recommendation in order to find any Cassandra sink configuration issue is to add a test logger sink: postal.sinks.logsink.type = logger

When you run your Ingestion agent, if you can see in log files the trace with the data the problem can be in Cassandra sink configuration file. I would take a look at your /opt/sds/ingestion/conf/init_cassandra.cql . The transformed data should have the same structure in your cql and headers in order to map properly the data with cassandra table.

If you can't see the data using the logger sink, the problem can be in the interceptor. Maybe the interceptor is not applying the data transformation properly.

Hope it helps

thun8392 commented 8 years ago

Hi @aaitor, The problem of the warning line happened because regex_extractor does not realize 'drop event'. Adding 'regex_filter' interceptor with 'excludeEvents' solved it. Thanks.

About channel transactionCapacity and capacity, why does it not work with 100 and 40 ?

aaitor commented 8 years ago

Hi @thun8392 , Seeing your first log messages looks that channel was full. But now that all is working, maybe you can play with the initial transactionCapacity and capacity values.