locationtech / geotrellis

GeoTrellis is a geographic data processing engine for high performance applications.
http://geotrellis.io
Other
1.33k stars 360 forks source link

Failure to stream GeoTiff window #1812

Closed echeipesh closed 7 years ago

echeipesh commented 7 years ago

Basic scenario is I'm trying to do windowed read for 300MB GeoTiff, noticed that the spark job is taking way too long. The attached log is me breaking down the the process to StreamingByteReader reading a 256x256 window 300mb in about 2.5 min so something is probably causing it pull down the whole file.

Two possible cases:

  1. We screwed up Find it and fix it

  2. GeoTiff is Somehow Malformed We need to be able to detect this case and find a way to complete the spark job successfully.

s3> test:console

[info] Starting scala interpreter...
[info]
import geotrellis.raster._
import geotrellis.vector._
import geotrellis.proj4._
import geotrellis.spark._
import geotrellis.spark.util._
import geotrellis.spark.tiling._
Welcome to Scala 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_66).
Type in expressions for evaluation. Or try :help.

scala> import geotrellis.raster.io.geotiff.tags.TiffTags
import geotrellis.raster.io.geotiff.tags.TiffTags

scala> import org.apache.spark._
import org.apache.spark._

scala> import geotrellis.spark.io.s3._
import geotrellis.spark.io.s3._

scala> import geotrellis.spark.io._
import geotrellis.spark.io._

scala> import com.amazonaws.services.s3.model._
import com.amazonaws.services.s3.model._

scala> import geotrellis.spark.io.s3.util._
import geotrellis.spark.io.s3.util._

scala> import geotrellis.util._
import geotrellis.util._

scala>     implicit val sc = SparkUtils.createSparkContext("Tree Canopy ETL", new SparkConf(true).setMaster("local[*]"))
13:52:58 NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13:52:58 log: Logging initialized @63710ms
13:52:59 Server: jetty-9.2.z-SNAPSHOT
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@41743514{/jobs,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@516cfc87{/jobs/json,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@4850fb4{/jobs/job,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@130bdba6{/jobs/job/json,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@73b79046{/stages,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@4509d273{/stages/json,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@695d8dc9{/stages/stage,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@24c6adaf{/stages/stage/json,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@6b3c2cd1{/stages/pool,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@60adf18f{/stages/pool/json,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@659b0b96{/storage,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@74a1795f{/storage/json,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@7e7199be{/storage/rdd,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@715f3e76{/storage/rdd/json,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@4c254c70{/environment,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@33b1bf2a{/environment/json,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@41ff6c8b{/executors,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@49db564c{/executors/json,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@2f7bd5cb{/executors/threadDump,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@118d67e1{/executors/threadDump/json,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@2b2cca6f{/static,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@38d69070{/,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@4972df2c{/api,null,AVAILABLE}
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@33ba1b8c{/stages/stage/kill,null,AVAILABLE}
13:52:59 ServerConnector: Started ServerConnector@5b235011{HTTP/1.1}{0.0.0.0:4040}
13:52:59 Server: Started @63846ms
13:52:59 ContextHandler: Started o.s.j.s.ServletContextHandler@79c6370e{/metrics/json,null,AVAILABLE}
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@185f23b5

scala> val bucket = "geotrellis-test"
bucket: String = geotrellis-test

scala> val prefix = "rf-test/356f564e3a0dc9d15553c17cf4583f21-6.tif"
prefix: String = rf-test/356f564e3a0dc9d15553c17cf4583f21-6.tif

scala>     val conf =sc.hadoopConfiguration
conf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml

scala>     S3InputFormat.setBucket(conf, bucket)

scala>     S3InputFormat.setPrefix(conf, prefix)

scala> val dimRdd = sc.newAPIHadoopRDD[GetObjectRequest, TiffTags, TiffTagsS3InputFormat]( conf, classOf[TiffTagsS3InputFormat], classOf[GetObjectRequest], classOf[TiffTags] ).mapValues { tiffTags => (tiffTags.cols, tiffTags.rows) }
dimRdd: org.apache.spark.rdd.RDD[(com.amazonaws.services.s3.model.GetObjectRequest, (Int, Int))] = MapPartitionsRDD[1] at mapValues at <console>:41

scala> val windows = dimRdd.flatMap { case (objectRequest, (cols, rows)) => RasterReader.listWindows(cols, rows, Some(256)).map((objectRequest, _)) }
windows: org.apache.spark.rdd.RDD[(com.amazonaws.services.s3.model.GetObjectRequest, geotrellis.raster.GridBounds)] = MapPartitionsRDD[2] at flatMap at <console>:45

scala> windows.first
res5: (com.amazonaws.services.s3.model.GetObjectRequest, geotrellis.raster.GridBounds) = (com.amazonaws.services.s3.model.GetObjectRequest@31b37e0e,GridBounds(0,0,255,255))

scala> windows.collect
res7: Array[(com.amazonaws.services.s3.model.GetObjectRequest, geotrellis.raster.GridBounds)] = Array((com.amazonaws.services.s3.model.GetObjectRequest@39da1d2a,GridBounds(0,0,255,255)), (com.amazonaws.services.s3.model.GetObjectRequest@39da1d2a,GridBounds(0,256,255,511)), (com.amazonaws.services.s3.model.GetObjectRequest@39da1d2a,GridBounds(0,512,255,767)), (com.amazonaws.services.s3.model.GetObjectRequest@39da1d2a,GridBounds(0,768,255,1023)), (com.amazonaws.services.s3.model.GetObjectRequest@39da1d2a,GridBounds(0,1024,255,1279)), (com.amazonaws.services.s3.model.GetObjectRequest@39da1d2a,GridBounds(0,1280,255,1535)), (com.amazonaws.services.s3.model.GetObjectRequest@39da1d2a,GridBounds(0,1536,255,1791)), (com.amazonaws.services.s3.model.GetObjectRequest@39da1d2a,GridBounds(0,1792,255,...
scala> val options = S3GeoTiffRDD.Options(maxTileSize=Some(256))
options: geotrellis.spark.io.s3.S3GeoTiffRDD.Options = Options(List(.tif, .TIF, .tiff, .TIFF),None,TIFFTAG_DATETIME,yyyy:MM:dd HH:mm:ss,Some(256),None,None,None,<function0>)

scala> val(objectRequest, pixelWindow) = windows.first
objectRequest: com.amazonaws.services.s3.model.GetObjectRequest = com.amazonaws.services.s3.model.GetObjectRequest@2a409c88
pixelWindow: geotrellis.raster.GridBounds = GridBounds(0,0,255,255)

scala> StreamingByteReader(S3RangeReader(objectRequest, options.getS3Client()))
res11: geotrellis.util.StreamingByteReader = geotrellis.util.StreamingByteReader@125c1c55

scala> val rr = implicitly[RasterReader[S3GeoTiffRDD.Options, (ProjectedExtent, Tile)]]
rr: geotrellis.spark.io.RasterReader[geotrellis.spark.io.s3.S3GeoTiffRDD.Options,(geotrellis.vector.ProjectedExtent, geotrellis.raster.Tile)] = geotrellis.spark.io.RasterReader$$anon$1@5e4d2b1

scala> val reader = StreamingByteReader(S3RangeReader(objectRequest, options.getS3Client()))
reader: geotrellis.util.StreamingByteReader = geotrellis.util.StreamingByteReader@45241265

scala>rr.readWindow(reader, pixelWindow, options)
res12: (geotrellis.vector.ProjectedExtent, geotrellis.raster.Tile) = (ProjectedExtent(Extent(313254.15713798243, 3096167.825055048, 313382.15713798243, 3096295.825055048),geotrellis.proj4.CRS$$anon$3@c180fffb),UByteRawArrayTile([B@1c75c077,256,256))

scala> val tile1 = rr.readWindow(reader, pixelWindow, options)
geotrellis.raster.io.geotiff.reader.MalformedGeoTiffException: incorrect byte order
  at geotrellis.raster.io.geotiff.reader.GeoTiffReader$.readGeoTiffInfo(GeoTiffReader.scala:274)
  at geotrellis.raster.io.geotiff.reader.GeoTiffReader$.readSingleband(GeoTiffReader.scala:98)
  at geotrellis.raster.io.geotiff.SinglebandGeoTiff$.streaming(SinglebandGeoTiff.scala:125)
  at geotrellis.spark.io.RasterReader$$anon$1.readWindow(RasterReader.scala:90)
  at geotrellis.spark.io.RasterReader$$anon$1.readWindow(RasterReader.scala:82)
  ... 42 elided

scala> val reader = StreamingByteReader(S3RangeReader(objectRequest, options.getS3Client()))
reader: geotrellis.util.StreamingByteReader = geotrellis.util.StreamingByteReader@4eee253f

scala> val tile1 = rr.readWindow(reader, pixelWindow, options)

scala> time{ rr.readWindow(reader, pixelWindow, options) }
Elapsed time: 136208121520ns
res18: (geotrellis.vector.ProjectedExtent, geotrellis.raster.Tile) = (ProjectedExtent(Extent(313254.15713798243, 3096167.825055048, 313382.15713798243, 3096295.825055048),geotrellis.proj4.CRS$$anon$3@c180fffb),UByteRawArrayTile([B@7d811aa7,256,256))

scala> Duration(136208121520l, NANOSECONDS).toSeconds
res24: Long = 136
lossyrob commented 7 years ago

This is an optimization that will go into either a bugfix version of 1.0 or 1.1

lossyrob commented 7 years ago

Solved by #1905