locationtech / geowave

GeoWave provides geospatial and temporal indexing on top of Accumulo, HBase, BigTable, Cassandra, Kudu, Redis, RocksDB, and DynamoDB.
Apache License 2.0
502 stars 190 forks source link

Fix HilbertSFC Concurrency #1750

Closed echeipesh closed 3 years ago

echeipesh commented 4 years ago

This PR is intended to address problems initially uncovered by this event:

Exception in thread "main" java.lang.IllegalArgumentException
    at com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
    at com.google.uzaygezen.core.BacktrackingQueryBuilder.visit(BacktrackingQueryBuilder.java:111)
    at com.google.uzaygezen.core.ZoomingSpaceVisitorAdapter.visit(ZoomingSpaceVisitorAdapter.java:66)
    at com.google.uzaygezen.core.CompactHilbertCurve.accept(CompactHilbertCurve.java:201)
    at org.locationtech.geowave.core.index.sfc.hilbert.UnboundedHilbertSFCOperations.decomposeRange(UnboundedHilbertSFCOperations.java:323)
    at org.locationtech.geowave.core.index.sfc.hilbert.HilbertSFC.decomposeRange(HilbertSFC.java:199)
    at org.locationtech.geowave.core.index.sfc.binned.BinnedSFCUtils.getQueryRanges(BinnedSFCUtils.java:50)
    at org.locationtech.geowave.core.index.sfc.xz.XZHierarchicalIndexStrategy.getQueryRanges(XZHierarchicalIndexStrategy.java:140)
    at org.locationtech.geowave.core.index.sfc.xz.XZHierarchicalIndexStrategy.getQueryRanges(XZHierarchicalIndexStrategy.java:45)
    at org.locationtech.geowave.core.store.util.DataStoreUtils.constraintsToQueryRanges(DataStoreUtils.java:396)
    at org.locationtech.geowave.core.store.base.BaseConstraintsQuery.getRanges(BaseConstraintsQuery.java:323)
    at org.locationtech.geowave.core.store.base.BaseQuery.getReader(BaseQuery.java:128)
    at org.locationtech.geowave.core.store.base.BaseFilteredIndexQuery.getReader(BaseFilteredIndexQuery.java:129)
    at org.locationtech.geowave.core.store.base.BaseFilteredIndexQuery.query(BaseFilteredIndexQuery.java:81)
    at org.locationtech.geowave.core.store.base.BaseConstraintsQuery.query(BaseConstraintsQuery.java:241)
    at org.locationtech.geowave.core.store.base.BaseDataStore.queryConstraints(BaseDataStore.java:901)
    at org.locationtech.geowave.core.store.base.BaseDataStore.internalQuery(BaseDataStore.java:660)
    at org.locationtech.geowave.core.store.base.BaseDataStore.internalQuery(BaseDataStore.java:241)
    at org.locationtech.geowave.core.store.base.BaseDataStore.internalQuery(BaseDataStore.java:220)
    at org.locationtech.geowave.core.store.base.BaseDataStore.query(BaseDataStore.java:214)
  ... (client code)

This was produced while querying a single GeoWave DataStore instance across multiple threads. Likely issue identified by @rfecher is that CompactHilbertCurve is not [thread safe](https://github.com/GrammarViz2/Uzaygezen/blob/master/uzaygezen-core/src/main/java/com/google/uzaygezen/core/CompactHilbertCurve.java#L33.

https://github.com/locationtech/geowave/blob/0981350aac64e4f74c23c6bc7efe1ac7e7ca8fb3/core/index/src/main/java/org/locationtech/geowave/core/index/sfc/hilbert/HilbertSFC.java#L97

I was able to reproduce the in a unit test because of the amazing vmlens project. In core/index folder run mvn -Dtest=ThreadedHilbertSFCTest test.

The issue was triggered because in my instance the combined index bit size was 63 thus triggering use of UnboundedHilbertSFCOperations, this is not the normal case.

In the normal case of PrimitiveHilbertSFCOperations the use of compactHilbertCurve is already synchronized: https://github.com/locationtech/geowave/blob/0981350aac64e4f74c23c6bc7efe1ac7e7ca8fb3/core/index/src/main/java/org/locationtech/geowave/core/index/sfc/hilbert/PrimitiveHilbertSFCOperations.java#L124-L126

I followed the same pattern and synchronized access to compactHilbertCurve in unbounded case. This is potentially sub-optimal and maybe the better thing to do here is use ThreadLocal. However, compactHilbertCurve field in HilbertSFC is protected and thus part of the interface. I believe any change there would break binary comparability although I don't know what kind of brave soul would be extending this class or actually using this field. Happy to take direction on the best approach here.

vmlens also identified that LinkedHashMap is a potential problem (although it was NOT explicitly in my trace). Since the project already depends on caffeine using its Cache seemed like the thing to do.

I've confirmed that at minimum these changes solve the problem in my project by doing a publish local. Note, you can roll through the commits in this PR to see the output of vmlens for each of the described problems, pretty neat.

echeipesh commented 4 years ago

I'll have to come back to this in a day to figure out how I broke in CI. At a glance there is are suspicious warning

`AbstractGeoWavePersistence:208 - Object 'testIndex' not found. 'GEOWAVE_METADATA' table does not exist` 

so I may have broken serialization on something?

rfecher commented 4 years ago

I'll poke around too. It looks like the failure is when running spark spatial join from gRPC with an error message. My assumption was we're broadcasting the HilbertSFC instance and the addition of using caffeine messed up serialization. Whats strange though is we have a stand-alone Spark Spatial Join test that is nearly the same as the test from invoking it within a gRPC service and the standalone test passes every time, plus strangely the gRPC test seems to pass on some datastores but not all (maybe its just random?). I did re-rerun some of the tests on master just to make sure something really weird hasn't cropped up and master passes each time. I'll keep poking around as time permits. Thanks for digging in on this!

Here's the error:

020-09-17 13:24:45 WARN  BlockManager:66 - Block broadcast_1 could not be removed as it was not found on disk or in memory
2020-09-17 13:24:45 ERROR NIOServerCnxnFactory:44 - Thread Thread[grpc-default-executor-0,5,main] died
java.lang.NoClassDefFoundError: Lorg/eclipse/core/resources/IWorkspaceRoot;
    at java.lang.Class.getDeclaredFields0(Native Method)
    at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
    at java.lang.Class.getDeclaredFields(Class.java:1916)
    at org.apache.spark.util.SizeEstimator$.getClassInfo(SizeEstimator.scala:330)
    at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:222)
    at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:201)
    at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:69)
    at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
    at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
    at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
    at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:841)
    at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1404)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:123)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1482)
    at org.locationtech.geowave.analytic.spark.RDDUtils.broadcastIndexStrategy(RDDUtils.java:261)
    at org.locationtech.geowave.analytic.spark.GeoWaveRDDLoader.loadIndexedRDD(GeoWaveRDDLoader.java:56)
    at org.locationtech.geowave.analytic.spark.spatial.SpatialJoinRunner.createRDDFromOptions(SpatialJoinRunner.java:254)
    at org.locationtech.geowave.analytic.spark.spatial.SpatialJoinRunner.loadDatasets(SpatialJoinRunner.java:261)
    at org.locationtech.geowave.analytic.spark.spatial.SpatialJoinRunner.run(SpatialJoinRunner.java:97)
    at org.locationtech.geowave.analytic.spark.spatial.operations.SpatialJoinCommand.computeResults(SpatialJoinCommand.java:141)
    at org.locationtech.geowave.service.grpc.services.GeoWaveGrpcAnalyticSparkService.spatialJoinCommand(GeoWaveGrpcAnalyticSparkService.java:135)
    at org.locationtech.geowave.service.grpc.protobuf.AnalyticSparkGrpc$MethodHandlers.invoke(AnalyticSparkGrpc.java:411)
    at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171)
    at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)
    at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.eclipse.core.resources.IWorkspaceRoot
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.eclipse.jetty.webapp.WebAppClassLoader.loadClass(WebAppClassLoader.java:543)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 38 more
rfecher commented 3 years ago

I switched to using github actions in place of travis-ci which has been finicky and it actually seems to run fine on github actions. I took you branch and actually decided to leave the vmlens test in as a good reference so just took your initial 3 commits in PR #1766.