locationtech / geotrellis

GeoTrellis is a geographic data processing engine for high performance applications.
http://geotrellis.io
Other
1.34k stars 360 forks source link

Ingesting large geotiff from s3 doesn't seem to be partitioning data properly #2469

Closed jmelching closed 6 years ago

jmelching commented 6 years ago

I've been testing out the 1.2.0-RC1 with some existing code that just ingests one year of the USDA's cropland dataset and seeing a strange behavior... It appears that the entire geotiff (13 GB's) might be being read by a single task as it tries to crop the tiff into tiles. Here's the thread dump of the only running executor:

java.net.SocketInputStream.socketRead0(Native Method) java.net.SocketInputStream.socketRead(SocketInputStream.java:116) java.net.SocketInputStream.read(SocketInputStream.java:171) java.net.SocketInputStream.read(SocketInputStream.java:141) sun.security.ssl.InputRecord.readFully(InputRecord.java:465) sun.security.ssl.InputRecord.read(InputRecord.java:503) sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983) => holding Monitor(java.lang.Object@1820339677}) sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940) sun.security.ssl.AppInputStream.read(AppInputStream.java:105) => holding Monitor(sun.security.ssl.AppInputStream@1648873021}) org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:139) org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:200) org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178) org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137) com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:125) com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:107) com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) java.io.FilterInputStream.read(FilterInputStream.java:107) org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1792) org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1769) org.apache.commons.io.IOUtils.copy(IOUtils.java:1744) org.apache.commons.io.IOUtils.toByteArray(IOUtils.java:462) geotrellis.spark.io.s3.AmazonS3Client.readRange(AmazonS3Client.scala:93) geotrellis.spark.io.s3.util.S3RangeReader.readClippedRange(S3RangeReader.scala:48) geotrellis.util.RangeReader$class.readRange(RangeReader.scala:36) geotrellis.spark.io.s3.util.S3RangeReader.readRange(S3RangeReader.scala:38) geotrellis.util.StreamingByteReader$$anonfun$1.apply(StreamingByteReader.scala:90) geotrellis.util.StreamingByteReader$$anonfun$1.apply(StreamingByteReader.scala:90) geotrellis.util.StreamingByteReader$Chunk.data(StreamingByteReader.scala:43) geotrellis.util.StreamingByteReader.getBytes(StreamingByteReader.scala:98) geotrellis.raster.io.geotiff.LazySegmentBytes.getBytes(LazySegmentBytes.scala:104) geotrellis.raster.io.geotiff.LazySegmentBytes.readChunk(LazySegmentBytes.scala:81) geotrellis.raster.io.geotiff.LazySegmentBytes$$anonfun$getSegments$1.apply(LazySegmentBytes.scala:99) geotrellis.raster.io.geotiff.LazySegmentBytes$$anonfun$getSegments$1.apply(LazySegmentBytes.scala:99) scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) scala.collection.Iterator$class.foreach(Iterator.scala:893) scala.collection.AbstractIterator.foreach(Iterator.scala:1336) geotrellis.raster.io.geotiff.GeoTiffTile.crop(GeoTiffTile.scala:541) geotrellis.spark.io.RasterReader$$anon$1$$anonfun$readWindows$2.apply(RasterReader.scala:191) geotrellis.spark.io.RasterReader$$anon$1$$anonfun$readWindows$2.apply(RasterReader.scala:191) scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) scala.collection.TraversableLike$class.map(TraversableLike.scala:234) scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) geotrellis.spark.io.RasterReader$$anon$1.readWindows(RasterReader.scala:191) geotrellis.spark.io.RasterReader$$anon$1.readWindows(RasterReader.scala:173) geotrellis.spark.io.s3.S3GeoTiffRDD$$anonfun$1.apply(S3GeoTiffRDD.scala:182) geotrellis.spark.io.s3.S3GeoTiffRDD$$anonfun$1.apply(S3GeoTiffRDD.scala:181) scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) scala.collection.Iterator$class.foreach(Iterator.scala:893) scala.collection.AbstractIterator.foreach(Iterator.scala:1336) scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:185) scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1336) org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1011) org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1009) org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980) org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) org.apache.spark.scheduler.Task.run(Task.scala:99) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)

I have ran this before with a 1.2.0 milestone version successfully. I originally used 1.2.0-M1 to fix some of the issues i had with bigtiffs. I have tried on spark 2.1 and 2.2. I am using the Spark ETL standard stuff and just calling : Etl.ingestProjectedExtent, SpatialKey, Tile with input: { "format": "geotiff", "name": "cropland_crop_raw_melch.tif", "cache": "NONE", "maxTileSize": 512, "numPartitions": 10000, "backend": { "type": "s3", "path": "s3://---.analytics/dsw/data/staging/v2.1/cropland/tiff/melch.tif" }

and output

{ "backend": { "type": "s3", "path": "s3://---.services/goliath/scratch/melching/catalog" }, "encoding":"geotiff", "reprojectMethod": "buffered", "pyramid": false, "tileSize": 256, "keyIndexMethod": { "type": "zorder" }, "resampleMethod": "nearest-neighbor", "layoutScheme": "floating", "crs": "EPSG:5070" }

pomadchin commented 6 years ago

Changes since M1:

pomadchin commented 6 years ago

Bug investigation results:

The regression was introduced in #2402 and a bit corrected in #2439 though probably it made the problem even harder. I didn't notice that #2402 introduced some additional partitioning logic instead of this, M1 logic where we were trying to pack segments into some window partitioning logic taking into account segments location and their sizes.

link where this code was applied to track down the issue

// how bytes calculated now
val windowBytes = gb.sizeLong * depth // depth depends on the cell type
//> 523264

// what happens indeed (bytes required to perform crop on a such window)
val segmentBytes =
  md.segmentLayout.intersectingSegments(gb).map { i =>
    md.segmentBytes.getSegmentByteCount(i) * md.bandCount
  } sum
//> 157502464

// all in all 111 partitions
// 258 windows in each
// each window ~ 157502464 bytes
// size of each partition would be 
// 157502464 * 256 = 40320630784 bytes = 40320.5 mb ~ 40 gigs to fetch per partition

// how this function worked before #2402 (and had to work even in theory)
// all in all 111 partitions
// 873 windows each (each window) is a segment
// each segment size 153811
// 153811 * 873 = 134277003 bytes = 134 mb per partition
// means it reads ~ 13-14 gigs into spark memory and reads only 134 mb per partition 

What happens: partitioning is not optimal. It picks up some segments, and just generates windows without taking care about tiff segments and without taking into account segment sizes. Also the problem with double segment reads was introduced in #2402 again.

The solution is to rollback to M1 function implementation or to improve logic with @jamesmcclain powers.

13Gb tile provided by @jmelching: s3://bigtiffs-test/2469/2013_30m_cdls.tif

pomadchin commented 6 years ago

Hope it's enough for the proof.

master: m3.xlarge slaves (2): m3.xlarge

Tests on M1 function:

import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.spark.io.s3._

implicit val _sc = sc
val rdd = S3GeoTiffRDD.spatial("bigtiffs-test", "2469/2013_30m_cdls.tif")

rdd.count()

// ==================================
// Repartition into 111 partitions.
// ==================================
// res7: Long = 96523
screen shot 2017-11-07 at 16 03 28

Tests on RC1 / master:

// could not wait until finish, but we can notice that it works really long

screen shot 2017-11-07 at 18 07 34
echeipesh commented 6 years ago

@jmelching Thank you for testing and the report, we've cut RC2 that should resolve this issue. Please let us know if it behaves for you.