amplab / SparkNet

Distributed Neural Networks for Spark
MIT License
604 stars 171 forks source link

In ImageNetApp, leave JPEGs compressed so the full dataset fits in memory more easily #71

Open robertnishihara opened 8 years ago

robertnishihara commented 8 years ago

See discussion in #63.

robertnishihara commented 8 years ago

Decompressing the JPEGs may be slow (we should benchmark this). We can potentially get around this by beginning to decompress the next minibatch of JPEGs while we are calling ForwardBackward on the current minibatch of JPEGs. If decompressing and ForwardBackward take around the same time, this buys us a 2x speedup at the cost of making the code a more complex.

Another possibility is to change the degree to which we compress the JPEGs so that decompressing is faster.

robertnishihara commented 8 years ago

The main change will be to modify ImageNetPreprocessor in SparkNet/src/main/scala/libs/Preprocessor.scala to take a JPEG and decompress it. This will be similar to the way we do the other preprocessing there (like subtracting the mean image and cropping the image).

rahulbhalerao001 commented 8 years ago

Hello Robert,

I wanted to confirm my understanding of the App and ask two questions about the proposed solution. I really appreciate your support until now, and want to thank you for your prompt responses. As I understand, the following are high level activities that occur in the ImageNet App. Please correct me if I am wrong:

  1. Preprocessing       a. Master directs workers to download tars from S3       b. Images untarred       c. Data frame created and partitioned among all workers       d. Mean Image Computed       e. Caffe models initialized on all workers, and conversion to NDArray and subtraction from meanimage.

2.Training Repeat       a. Receive updated parameters from master.       b. Choose valid random index r and batch of indexes from r to r+256 from trainPartition.       c. Compute gradient based using this batch and update the parameters.       d. Send updated parameters back to master.

Questions:

  1. Gzip of JPEG images does not reduce its size. I picked up the first tar(train.00000.tar) from the Imagenet training set and got the following findings       a. Size of tar - 18 MB       b. Size after zipping - 17 MB       c. Size of raw images - 20 MB As can be seen, compressing does not reduce the size of the size of the images. So I feel that if uncompressed JPEGs are not fitting into memory, the compressed one might also not fit, and so believe that this solution might not solve this problem.

2.The total size of all the unprocessed training images is 138 GB. I was using a 5 worker g2.8xlarge cluster where each worker has a memory of 60 GB. Out of that 30 GB can be easily allocated to Spark and so a total of 150 GB can be made available to the RDDs. Given this, I feel that since the unprocessed size is 138 GB, and resizing reduces its size further, the RDDs should not have spilled to disk. Do you feel there might be a problem in some other area of the system/App. To recall, the App execution was happening smoothly over 1/10th of the data but failed when using entire data as per #63.

Thanks, Rahul

robertnishihara commented 8 years ago

That looks mostly correct. The "decompression" that I'm referring to is not "tarred -> untarred" but rather "JPEG -> Array[Byte]", where the values in the Array[Byte] are the pixel values in the image.*\ That decompression step increases the size of the data a bunch I think. Concretely, this decompression step happens in SparkNet/src/main/scala/loaders/ImageNetLoader.scala in the line

val decompressedResizedImage = ScaleAndConvert.decompressImageAndResize(content, height, width)

Looking at decompressImageAndResize in SparkNet/src/main/scala/preprocessing/ScaleAndConvert.scala, the decompression step happens concretely in the line

val resizedImage = Thumbnails.of(im).forceSize(width, height).asBufferedImage()

Some points:

*\ Since the images are RGB 256x256, each image is 3x256x256 bytes, which is almost 200KB, therefore the full dataset (200KB x 1.2 million images) is about 235GB. Seems like this should still fit in memory... did I do that calculation correctly?

rahulbhalerao001 commented 8 years ago

Thank you for the detailed and thorough explanation. I plan to do some measurements in next 2-3 days and support your calculation with the measurement data. I am interested in looking at what is the data size after the preprocessing and how much spills over to disk. I wanted to know that if we are pre-shuffling the data, then can we simply pick up the batches sequentially at train time i.e. if we have a good pre-shuffle then we can choose images with index numbers of 0-255, 256-511 ... as our batches. That way we will read the RDD partition sequentially, and even if it spills to disk, we will have page in and page out once for say 30GB of data. This basically will ensure our working set is in memory. Such an predictable workload, might even allow an intelligent RDD swap algorithm to do this swapping proactively in background, further reducing the penalty.

robertnishihara commented 8 years ago

You're absolutely right that in principle, spilling to disk shouldn't be a problem at all. After all, Caffe reads data from disk. Deep learning is computationally expensive enough that there is enough time to prefetch the next minibatch while you're computing on your current minibatch. And since deep learning cycles predictably through the data, you know exactly what data you need next.

The same is true with decompression. We should be preemptively decompress the next minibatch while we are computing on the first minibatch. I tried something similar a while ago using scala's Future feature, and it helped a bit, but Spark was still too slow when reading RDDs from disk.

rahulbhalerao001 commented 8 years ago

Thank you for again for your continued help and support. With this clarifications I will independently try out some ideas, and if I get good results, will propose a code change. For purpose of documentation, I will want to expand on two points that you mentioned.

1. After all, Caffe reads data from disk This is true for standalone caffe, but in this integration data is fed from Sparknet in memory. eg. val tops = caffeNet.Forward(inputs) and there are no intermediate disk writes in the calls from SparkNet to Caffe.

2. but Spark was still too slow when reading RDDs from disk This can arise when the compressed data itself does not fit in memory, but if it fits, then we eliminate disk access from critical path.

robertnishihara commented 8 years ago

Thanks for clarifying! Yes, I was referring to standalone Caffe.

michaelklachko commented 7 years ago

Has anyone benchmarked decompressing JPEGs? With multiple fast video cards we might need to prepare several mini-batches at once while a single mini-batch is being forwarded through the network (on each card), can this become a bottleneck? I guess if each GPU has a dedicated CPU core servicing it, and multiple cores work in parallel, it shouldn't be much worse, is that the case?