memsql / singlestore-spark-connector

A connector for SingleStore and Spark
Apache License 2.0
159 stars 54 forks source link

Kafka topic with many partitions (on Yarn) #18

Closed 0x0ece closed 8 years ago

0x0ece commented 8 years ago

Hi, I think I've found an issue reading from Kafka when there are too many partitions, specifically when the number of partitions is greater than spark.port.maxRetries (default 16).

Note that I'm using a CDH Spark cluster in yarn-client mode, so I'm not sure this also repros with MemSQL Spark distro.

In summary, when a container starts it tries to bind to a port starting from an initial port, and fails after spark.port.maxRetries. If the Kafka topic has n partitions, then at least n containers are created, and if n > spark.port.maxRetries the available ports are saturated and eventually containers will fail to start.

I think in these cases it would be better to shut down the pipeline (like a fatal error), such that other pipelines can run without issues.

What happens now is that the exceptions are thrown but "ignored" and data is loaded normally. However I'm not sure if all data is loaded in all cases, as creating a test is kind of laborious... but I'd expect that if you have data in all partitions, some would fail.

Interface logs:

16/04/25 16:24:38 ERROR Lost executor 9 on <hostname.redacted>: Container marked as failed: container_1461362160503_0004_01_000010 on host: <hostname.redacted>. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_1461362160503_0004_01_000010
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:561)
    at org.apache.hadoop.util.Shell.run(Shell.java:478)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:738)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 1

16/04/25 16:24:38 WARN Container marked as failed: container_1461362160503_0004_01_000010 on host: <hostname.redacted>. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_1461362160503_0004_01_000010
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:561)
    at org.apache.hadoop.util.Shell.run(Shell.java:478)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:738)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 1

16/04/25 16:24:38 WARN Lost task 7.0 in stage 0.0 (TID 12, <hostname.redacted>): ExecutorLostFailure (executor 9 exited caused by one of the running tasks) Reason: Container marked as failed: container_1461362160503_0004_01_000010 on host: <hostname.redacted>. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_1461362160503_0004_01_000010
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:561)
    at org.apache.hadoop.util.Shell.run(Shell.java:478)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:738)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 1

16/04/25 16:24:38 ERROR Listener SQLListener threw an exception
java.lang.NullPointerException
    at org.apache.spark.sql.execution.ui.SQLListener.onTaskEnd(SQLListener.scala:167)
    at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:42)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55)
    at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:80)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1181)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
16/04/25 16:24:39 ERROR Lost executor 27 on <hostname.redacted>: Container marked as failed: container_1461362160503_0004_01_000030 on host: <hostname.redacted>. Exit status: 1. Diagnostics: Exception from container-launch.

Container logs from: http://...:8042/node/containerlogs/container_1461362160503_0004_01_000010/stderr/stderr/?start=0

16/04/25 16:24:30 INFO executor.CoarseGrainedExecutorBackend: Started daemon with process name: 34377@<hostname.redacted>
16/04/25 16:24:30 INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT]
16/04/25 16:24:32 INFO spark.SecurityManager: Changing view acls to: yarn,emanuele
16/04/25 16:24:32 INFO spark.SecurityManager: Changing modify acls to: yarn,emanuele
16/04/25 16:24:32 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, emanuele); users with modify permissions: Set(yarn, emanuele)
16/04/25 16:24:33 INFO spark.SecurityManager: Changing view acls to: yarn,emanuele
16/04/25 16:24:33 INFO spark.SecurityManager: Changing modify acls to: yarn,emanuele
16/04/25 16:24:33 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, emanuele); users with modify permissions: Set(yarn, emanuele)
16/04/25 16:24:34 INFO slf4j.Slf4jLogger: Slf4jLogger started
16/04/25 16:24:34 INFO Remoting: Starting remoting
16/04/25 16:24:34 WARN util.Utils: Service 'sparkExecutorActorSystem' could not bind on port 10014. Attempting port 10015.
16/04/25 16:24:34 ERROR Remoting: Remoting error: [Startup failed] [
akka.remote.RemoteTransportException: Startup failed
    at akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)
    at akka.remote.Remoting.start(Remoting.scala:194)
    at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
    at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)
    at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)
    at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)
    at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
    at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
    at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:52)
    at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1989)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1980)
    at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:55)
    at org.apache.spark.SparkEnv$.create(SparkEnv.scala:266)
    at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:217)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:183)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:69)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:68)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:68)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:148)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:250)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.jboss.netty.channel.ChannelException: Failed to bind to: <hostname.redacted>/10.19.192.29:10014
    at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
    at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:391)
    at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:388)
    at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
    at scala.util.Try$.apply(Try.scala:161)
    at scala.util.Success.map(Try.scala:206)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.net.BindException: Address already in use
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Net.java:444)
    at sun.nio.ch.Net.bind(Net.java:436)
    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
    at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
]

[...]

16/04/25 16:24:35 INFO Remoting: Starting remoting
16/04/25 16:24:35 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutorActorSystem@<hostname.redacted>:10025]
16/04/25 16:24:35 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutorActorSystem@<hostname.redacted>:10025]
16/04/25 16:24:35 INFO util.Utils: Successfully started service 'sparkExecutorActorSystem' on port 10025.
16/04/25 16:24:36 INFO storage.DiskBlockManager: Created local directory at /mnt/jbod_1/yarn/nm/usercache/emanuele/appcache/application_1461362160503_0004/blockmgr-20ef5072-5398-4e44-b38d-710038d26c7f
16/04/25 16:24:36 INFO storage.DiskBlockManager: Created local directory at /mnt/jbod_2/yarn/nm/usercache/emanuele/appcache/application_1461362160503_0004/blockmgr-f7e45ba1-1e24-49fb-a475-74057513f119
16/04/25 16:24:36 INFO storage.DiskBlockManager: Created local directory at /mnt/jbod_3/yarn/nm/usercache/emanuele/appcache/application_1461362160503_0004/blockmgr-d8bdba6b-4296-49fd-ada2-b66b9a379b1c
16/04/25 16:24:36 INFO storage.DiskBlockManager: Created local directory at /mnt/jbod_4/yarn/nm/usercache/emanuele/appcache/application_1461362160503_0004/blockmgr-947725bd-a63a-4f23-ae19-05a0a1722b40
16/04/25 16:24:36 INFO storage.DiskBlockManager: Created local directory at /mnt/jbod_5/yarn/nm/usercache/emanuele/appcache/application_1461362160503_0004/blockmgr-c8d2b2cf-de63-42f5-a2e0-7390066f65ba
16/04/25 16:24:36 INFO storage.DiskBlockManager: Created local directory at /mnt/jbod_6/yarn/nm/usercache/emanuele/appcache/application_1461362160503_0004/blockmgr-3bffbf82-2b57-4f5f-9e60-2a9bfec07120
16/04/25 16:24:36 INFO storage.MemoryStore: MemoryStore started with capacity 530.3 MB
16/04/25 16:24:36 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@10.19.192.29:10013
16/04/25 16:24:36 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver
16/04/25 16:24:36 INFO executor.Executor: Starting executor ID 9 on host <hostname.redacted>
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10011. Attempting port 10012.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10012. Attempting port 10013.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10013. Attempting port 10014.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10014. Attempting port 10015.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10015. Attempting port 10016.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10016. Attempting port 10017.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10017. Attempting port 10018.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10018. Attempting port 10019.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10019. Attempting port 10020.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10020. Attempting port 10021.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10021. Attempting port 10022.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10022. Attempting port 10023.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10023. Attempting port 10024.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10024. Attempting port 10025.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10025. Attempting port 10026.
16/04/25 16:24:36 WARN util.Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10026. Attempting port 10027.
16/04/25 16:24:36 ERROR netty.Inbox: Ignoring error
java.net.BindException: Address already in use: Service 'org.apache.spark.network.netty.NettyBlockTransferService' failed after 16 retries!
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Net.java:444)
    at sun.nio.ch.Net.bind(Net.java:436)
    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:125)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:485)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1089)
    at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:430)
    at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:415)
    at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:903)
    at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:198)
    at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:348)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
16/04/25 16:24:36 ERROR executor.CoarseGrainedExecutorBackend: Received LaunchTask command but executor was null
16/04/25 16:24:36 INFO storage.DiskBlockManager: Shutdown hook called
16/04/25 16:24:36 INFO util.ShutdownHookManager: Shutdown hook called
choochootrain commented 8 years ago

hmm it looks like those container exits are happening outside of the interface stack so we might have to detect that we're running in yarn to prevent this.

i also remember seeing a config option to tell spark to start trying ports from a different initial value - i don't know if this is still around and it's not the most robust but at least you'll be able to load data. thoughts?

0x0ece commented 8 years ago

Yes, this is kind of weird, it seems pretty hard to detect the failure at app level.

For my case specifically, I can either increase this maxRetries or look into this randomness (although every default value related to port allocation seems to be random, so I guess this might be a specific issue of Kafka integration... have to look into that).

What I'm kind of scared is how many similar cases Spark/Yarn are hiding. A nice to have could be a sort of "transaction" where either all containers start, or all are killed, but I have no idea if this is supported :/

choochootrain commented 8 years ago

for what its worth this doesn't seem like an issue with kafka - i think yarn containers consume some ports and so starting new executors in an existing yarn cluster decreases the number of available ports for spark and its various web servers to bind to. maybe there is something that can be done in yarn to get around this?

0x0ece commented 8 years ago

I'll try to look into more details.

I was saying Kafka in the sense that maybe the way containers are requested for the DStream computation forces this incremental try... but I have no idea, maybe it's a wider issue and I just noticed it in this case.

choochootrain commented 8 years ago

if what i said is correct, you should be able to repro this issue by creating a vanilla rdd with a large number of partitions (how many partitions are in your kafka topic?) and when it gets processed the same number of executors should be created.

0x0ece commented 8 years ago

Closing for now, this is also caused by dynamicAllocation, which I have finally turned off :)