src-d / apollo

Advanced similarity and duplicate source code proof of concept for our research efforts.
GNU General Public License v3.0
52 stars 17 forks source link

[`hash` with a big number of files] Failed to execute: com.datastax.spark.connector.writer.RichBatchStatement #43

Open EgorBu opened 6 years ago

EgorBu commented 6 years ago

Hi, spark cassandra connector / scylla fails when you attempt to make hash step with a big number of files. I tried 1M files - always fails, 300k files - unstable, some experiments can be completed, but after a several trials it fails with error. Before each new run of hash step I used reset_db but memory is not released from scylla (I'm not ure if it's correct behaviour of DB).

Error log ``` 18/04/10 23:04:51 ERROR QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBatchStatement@3ec8da3b com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency LOCAL_QUORUM (1 replica were required but only 0 acknowled ged the write) at com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:100) at com.datastax.driver.core.Responses$Error.asException(Responses.java:122) at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:506) at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1070) at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:993) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911) at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:934) at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:397) at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:302) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:748) Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency LOCAL_QUORUM (1 replica were required but only 0 acknowledged the write) at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:59) at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:37) at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:289) at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:269) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88) ... 18 more ```
vmarkovtsev commented 6 years ago

Having a write timeout is totally logical on huge batches (though says something about the connector/protocol/server quality without splitting and heartbeat). Can you please test on Cassandra instead of Scylla?

EgorBu commented 6 years ago

I will try, but it's still strange. Because several attempts are successful, then it starts to fail. + It was possible to make hash step on 300k files after increasing the default parallelism from 300 to 1200

r0mainK commented 6 years ago

Had similar problem writing a large number of bag batches to scylla, see error log below, seems the solution is simply to increase spark.default.parallelism as Egor did (in my case increasing value from 360 to 720 was enough). I think we could simply add a troubleshooting section in the doc to warn about this, with the solution to this bug.

WARN TaskSetManager: Lost task 233.0 in stage 18.0 (TID 3855, 10.2.7.95, executor 13): java.io.IOException: Failed to write statements to apollo.bags.
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:207)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:175)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:175)
    at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
r0mainK commented 6 years ago

So since yesterday I saw that I actually was excluding a lot of files, and when rerunning I had to change the number of partitions to a much higher value (13680 partitions) which caused the error below (nothing fatal but still) and slowed down execution. So while I still think we should inform somewhere that the number of partitions has to be increased when encountering this error, I think that we should repartition the data before writing to DB (in bags and hashes commands) to a value proportional to spark.default.parallelism. Tests could be done to optimize the coefficient (at first guess value should be between 5~10). That way we should get best of both worlds

ERROR Utils: Uncaught exception in thread heartbeat-receiver-event-loop-thread
java.lang.NullPointerException
    at org.apache.spark.util.CollectionAccumulator.value(AccumulatorV2.scala:469)
    at org.apache.spark.util.CollectionAccumulator.value(AccumulatorV2.scala:444)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$8$$anonfun$9.apply(TaskSchedulerImpl.scala:458)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$8$$anonfun$9.apply(TaskSchedulerImpl.scala:458)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$8.apply(TaskSchedulerImpl.scala:458)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$8.apply(TaskSchedulerImpl.scala:457)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
    at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:457)
    at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2$$anonfun$run$2.apply$mcV$sp(HeartbeatReceiver.scala:129)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1317)
    at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2.run(HeartbeatReceiver.scala:128)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
r0mainK commented 6 years ago

I modified a bit the yaml files provided here by IBM: https://github.com/IBM/Scalable-Cassandra-deployment-on-Kubernetes to check if using Cassandra also gets rid of the problem on K8, if thats the case I think we ought to close this issue and advise people to avoid Scylla