qubole / spark-on-lambda

Apache Spark on AWS Lambda
Apache License 2.0
151 stars 32 forks source link

Spark does not talk to Lambda at all #5

Open habemusne opened 5 years ago

habemusne commented 5 years ago

I have been struggling with setting up this framework on my EC2 server. I tried the best to follow the instruction of both this repo and also faromero's forked repo, but I have been getting this error message each time I run sudo ./../driver/bin/spark-submit ml_kmeans.py --master lambda://test:

19/04/05 05:11:40 ERROR ShuffleBlockFetcherIterator: Error occurred while fetching local blocks
java.io.FileNotFoundException: No such file or directory: s3://mc-cse597cc/tmp/executor-driver-4775351731/30/shuffle_0_0_0.index
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1636)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:684)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:772)
    at org.apache.spark.shuffle.S3ShuffleBlockResolver.getRemoteBlockData(S3ShuffleBlockResolver.scala:240)
    at org.apache.spark.shuffle.S3ShuffleBlockResolver.getBlockData(S3ShuffleBlockResolver.scala:263)
    at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:318)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchLocalBlocks(ShuffleBlockFetcherIterator.scala:258)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:292)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
19/04/05 05:11:40 WARN TaskSetManager: Lost task 0.0 in stage 8.0 (TID 8, localhost, executor driver): FetchFailed(BlockManagerId(driver, 172.31.123.183, 33995, None), shuffleId=0, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: No such file or directory: s3://mc-cse597cc/tmp/executor-driver-4775351731/30/shuffle_0_0_0.index
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
    at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: No such file or directory: s3://mc-cse597cc/tmp/executor-driver-4775351731/30/shuffle_0_0_0.index
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1636)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:684)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:772)
    at org.apache.spark.shuffle.S3ShuffleBlockResolver.getRemoteBlockData(S3ShuffleBlockResolver.scala:240)
    at org.apache.spark.shuffle.S3ShuffleBlockResolver.getBlockData(S3ShuffleBlockResolver.scala:263)
    at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:318)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchLocalBlocks(ShuffleBlockFetcherIterator.scala:258)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:292)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
    ... 9 more

)
19/04/05 05:11:40 INFO DAGScheduler: Marking ResultStage 8 (countByValue at KMeans.scala:399) as failed due to a fetch failure from ShuffleMapStage 7 (countByValue at KMeans.scala:399)
19/04/05 05:11:40 INFO DAGScheduler: ResultStage 8 (countByValue at KMeans.scala:399) failed in 0.069 s due to org.apache.spark.shuffle.FetchFailedException: No such file or directory: s3://mc-cse597cc/tmp/executor-driver-4775351731/30/shuffle_0_0_0.index
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
    at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: No such file or directory: s3://mc-cse597cc/tmp/executor-driver-4775351731/30/shuffle_0_0_0.index
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1636)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:684)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:772)
    at org.apache.spark.shuffle.S3ShuffleBlockResolver.getRemoteBlockData(S3ShuffleBlockResolver.scala:240)
    at org.apache.spark.shuffle.S3ShuffleBlockResolver.getBlockData(S3ShuffleBlockResolver.scala:263)
    at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:318)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchLocalBlocks(ShuffleBlockFetcherIterator.scala:258)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:292)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
    ... 9 more

Here is the full log

My config file:

spark.dynamicAllocation.enabled                 true
spark.dynamicAllocation.minExecutors            2
spark.shuffle.s3.enabled                        true
spark.lambda.concurrent.requests.max            100
spark.hadoop.fs.s3n.impl                        org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3.impl                         org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.AbstractFileSystem.s3.impl      org.apache.hadoop.fs.s3a.S3A
spark.hadoop.fs.AbstractFileSystem.s3n.impl     org.apache.hadoop.fs.s3a.S3A
spark.hadoop.fs.AbstractFileSystem.s3a.impl     org.apache.hadoop.fs.s3a.S3A
spark.hadoop.qubole.aws.use.v4.signature        true
spark.hadoop.fs.s3a.fast.upload                 true
spark.lambda.function.name                      spark-lambda
spark.lambda.spark.software.version             149
spark.hadoop.fs.s3a.endpoint                    s3.us-east-1.amazonaws.com
spark.hadoop.fs.s3n.awsAccessKeyId             KEY
spark.hadoop.fs.s3n.awsSecretAccessKey          SECRET
spark.shuffle.s3.bucket                         s3://mc-cse597cc
spark.lambda.s3.bucket                          s3://mc-cse597cc

~/.aws/config is us-east-1. VPC subnets are configured following the forked repo's instruction.

My Lambda function is tested to be able to write and read to S3. My spark-submit command ran on EC2 is able to write to S3 (it generates a tmp/ folder on the bucket), but does not let the Lambda run at all. CloudWatch for my Lambda has no logs. However, I am able to run my Lambda from EC2 using something like aws lambda invoke --function-name spark-lambda ~/test.txt. I guess I configured Spark-on-Lambda wrong but I've been following the instructions.

I am now trying to dive into the source code. Is there any clue for this message?

venkata91 commented 5 years ago

@habemusne Thanks for trying Spark on Lambda out. I understand in its current form its not easy to set it up and try out. Some time back @faromero also had issues running it but with the AWS setup fixed it worked fine.

As far as AWS Lambda, EC2 and VPC setup is concerned, this is what I did which worked.

VPC stuff - This is what I did.

1. Created a VPC - venkat-vpc
2. Created a public subnet - venkat-public-subnet with in the above VPC. This automatically comes up with an Internet gateway. Check routes you'll see something like igw-*
3. Created a private subnet - venkat-private-subnet with in the above VPC as well. 
4. Then created a NAT gateway for the above private subnet. In the route table, created a route for the NAT gateway (nat-*) to the Internet gateway available in the public subnet (igw-*).

Bring up the lambda function inside the VPC (venkat-vpc) with in the private subnet (venkat-private-subnet) and an EC2 instance where you'll run spark-submit or spark-shell in the same VPC (venkat-vpc) but in the public subnet (venkat-public-subnet). Also create a security group with inbound rules (access to TCP, UDP from the same security group) and outbound rules (All traffic (0.0.0.0/0). Associate both the EC2 instance (Spark Driver node) as well as the lambda function (Spark executor).

https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Scenario2.html

Above doc was useful, the picture in the documentation helped in setting up VPC and subnets.

This works for me. Hope it works for you as well.

Regarding passing the input and output files, use S3 locations I don't remember using local files. But for application jars and other dependent jars, those can be passed from the local machine (file:///).

There are couple of questions I have,

  1. Are you able to run map only functions using Spark on Lambda, something like sc.parallelize(1 to 10, 5).map(x => x * 2).collect() If this works then the lambda setup is good and we are good on that part.
  2. If first step works, then running a spark job involving map and reduce fails most likely some config is not setup or possibly a bug.

Let me try running this by this week and share you the set of configs I used to make it work.

venkata91 commented 5 years ago

Quickly checking found that these 2 configs have to be set with access key and secret key

spark.hadoop.fs.s3n.awsAccessKeyId
spark.hadoop.fs.s3n.awsSecretAccessKey

May be can you check if it fails in the write phase itself? These keys are used at the lambda end to write data to S3.

habemusne commented 5 years ago

@venkata91 Thanks a lot for your quick and detailed response! To answer your two questions: before all the below procedures, the parallelize() can be run, but the second cannot.

Instead of running my kmeans application, I switched to test the SparkPi example. So technically my original problem is not yet solved. But I found some interesting problems I had by running SparkPi. My SparkPi is now able to run Lambda now (through not throughly --- the run_executor() did not go through). It turned out that there are a few points that we need to pay attention to:

  1. As the ~/.aws/* are user-level permissions, I should not have used sudo to run spark-submt.
  2. I needed to setup VPC Endpoint in order to allow Lambda to read/write S3. This is not mentioned anywhere in the docs I read. Here is a helpful link. At configuration page, I checked all the subnet groups.
  3. It turned out that I needed to increase the Lambda timeout to 1 minute and increase the memory to 1024MB in order to prevent Lambda from failing silently.

Having these documented will be helpful to other people who want to try out.

faromero commented 5 years ago

Glad you figured it out, @habemusne. For your second point, I think the combination of this section and this section in my documentation should cover it as well. Also, regarding 3), if I remember correctly, you would actually benefit from increasing the Lambda timeout to be longer in general, since the master benefits from persisting the workers and not having to restart them constantly. When the job is done, the workers will automatically be shut down. This is especially useful for long-running sort functions, etc. Qubole talks about their experiments with 5 minute Lambdas (back when that was the maximum) here. However, I agree: these limits should be mentioned somewhere, so I updated some of my documentation to capture this.

venkata91 commented 5 years ago

@habemusne Would you mind if I ask you to help in fixing the docs? If you feel comfortable, please file a PR.

habemusne commented 5 years ago

@faromero Thanks for the detailed point-outs. I don't have tons of experience with Lambda, so at the beginning I wasn't too concerned about the memory and timeout.

Your documentation was way more clear and I followed it almost exactly (as mentioned in the issue description). So I did add your "endpoint" line in my config file. But somehow I needed to add an AWS VPC Endpoint from the AWS console in order to fix some error.

One thing to point out in your doc is that the build step took a long time. It will be helpful if your doc makes the user aware of it. I used the t2.micro EC2 and it took forever to compile the spark-lambda package. I simply moved on by downloading the zip from qubole's S3. Haven't yet met an error caused by not compiling from source.

@venkata91 More than happy to contribute! I will help with the doc when I am done with this issue and the whole setup. I think the doc in this repo is pretty minimal, and faromero mentions more details. I wrote mine for my classmates, which has even more detail and may be easier to newbees (link). To what extent would you like to change?

habemusne commented 5 years ago

Almost getting it working after tough debugging. The first reason was that I did not open my security group to all inbound traffic.

This was not the only reason why I got the error. The fact that I was running with wrong path also counted. I wrote my own kmeans script, and the script used data file path that was under ~/project/data/kmeans/ (i.e. not under the spark-on-lambda folder). The error went away when I switched to running the example scripts (./bin/spark-submit --master lambda://test examples/src/main/python/ml/kmeans_example.py).

It seems tricky to used user-defined Python code and user-defined data on the framework. I guess there can be some framework fix on this issue. But directions on how to run user-defined data will be very useful, as an alternative of spending time to modify this framework.

achinmishra commented 5 years ago

Hey, I am trying to replicate the same but I am facing a similar issue. @venkata91 talks about VPC stuff. I have a VPC established with both private and public subnets and a default NAT Gateway associated within the VPC. However, I do not seem to get the logs in CloudWatch after I have run the SparkPi example from my EC2 instance. I have made sure my configuration file has the right key-value pairs and I can issue "aws lambda invoke --function-name spark-lambda ~/test.txt" via my EC2 instance. Can anyone suggest what I am missing here?

Spark Driver Log.txt