locationtech / geowave

GeoWave provides geospatial and temporal indexing on top of Accumulo, HBase, BigTable, Cassandra, Kudu, Redis, RocksDB, and DynamoDB.
Apache License 2.0
501 stars 190 forks source link

overlay analysis #1474

Open scially opened 5 years ago

scially commented 5 years ago

I ingest two shape file into geowave, and how do I perform overlay analysis based on SimpleFeature(and i can get Geometry) from geowave?

rfecher commented 5 years ago

One tool that we provide is a spatial join spark operation. Using the CLI it is geowave analytic spatialjoin or you can follow along with our jupyter notebook example that performs the operation. While at a small scale you can loop over N geometries from what dataset and compare it with M geometries from the other, that N * M comparisons is infeasible at larger scales. The spatial join spark operation we provide is an indexed operation with as you might expect much better performance at larger scales. After you get the pairs of overlapping geometries you can use JTS to manipulate the result if you prefer the intersections rather than the pairs (eg. geom1.intersection(geom2)).

scially commented 5 years ago

Thank you very much, this is exactly what I need, then I will give you feedback on the results.

scially commented 5 years ago

i submit my job to yarn ,but there are one error: java.lang.NullPointerException at org.locationtech.geowave.core.index.persist.PersistenceUtils.fromBinary(PersistenceUtils.java:148)


18/12/17 18:21:51 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.NullPointerException
java.lang.NullPointerException
    at org.locationtech.geowave.core.index.persist.PersistenceUtils.fromBinary(PersistenceUtils.java:148)
    at org.locationtech.geowave.core.store.index.PrimaryIndex.fromBinary(PrimaryIndex.java:93)
    at org.locationtech.geowave.core.store.index.CustomNameIndex.fromBinary(CustomNameIndex.java:61)
    at org.locationtech.geowave.core.index.persist.PersistenceUtils.fromBinary(PersistenceUtils.java:148)
    at org.locationtech.geowave.core.store.metadata.AbstractGeoWavePersistence.fromValue(AbstractGeoWavePersistence.java:341)
    at org.locationtech.geowave.core.store.metadata.AbstractGeoWavePersistence.entryToValue(AbstractGeoWavePersistence.java:347)
    at org.locationtech.geowave.core.store.metadata.AbstractGeoWavePersistence.internalGetObject(AbstractGeoWavePersistence.java:293)
    at org.locationtech.geowave.core.store.metadata.AbstractGeoWavePersistence.getObject(AbstractGeoWavePersistence.java:240)
    at org.locationtech.geowave.core.store.metadata.IndexStoreImpl.getIndex(IndexStoreImpl.java:54)
    at org.locationtech.geowave.core.store.AdapterToIndexMapping.getIndices(AdapterToIndexMapping.java:69)
    at org.locationtech.geowave.analytic.spark.spatial.SpatialJoinRunner.getIndicesForAdapter(SpatialJoinRunner.java:136)
    at org.locationtech.geowave.analytic.spark.spatial.SpatialJoinRunner.createRDDFromOptions(SpatialJoinRunner.java:291)
    at org.locationtech.geowave.analytic.spark.spatial.SpatialJoinRunner.loadDatasets(SpatialJoinRunner.java:316)
    at org.locationtech.geowave.analytic.spark.spatial.SpatialJoinRunner.run(SpatialJoinRunner.java:107)
    at com.wh.GOverLay.main(GOverLay.java:58)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
18/12/17 18:21:51 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.lang.NullPointerException)

and Below is my code

public class GOverLay {
    public static void main(String[] args)
            throws InterruptedException, ExecutionException, IOException {

        // input accumulo
        AccumuloOptions options = new AccumuloOptions();
        AccumuloRequiredOptions inputOperations = new AccumuloRequiredOptions("server7:2181,server8:2181,server9:2181",
                "accumulo",
                "geowave",
                "geowave",
                "geowave.over",
                options);

        // output accumulo
        AccumuloRequiredOptions outputOperations = new AccumuloRequiredOptions("server7:2181,server8:2181,server9:2181",
                "accumulo",
                "geowave",
                "geowave",
                "geowave.lgcy",
                options);

        SparkSession spark = SparkSession.builder()
                .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
                .config("spark.kryo.registrator", "org.locationtech.geowave.analytic.spark.GeoWaveRegistrator")
                //.config("spark.default.parallelism", "6000")
                .appName("Overlay")
                .getOrCreate();
        SpatialJoinRunner joinRunner = new SpatialJoinRunner(spark);

        // set layer1
        joinRunner.setLeftStore(inputOperations.createPluginOptions());
        joinRunner.setLeftAdapterTypeName("csztgh");
        // set layer2
        joinRunner.setRightStore(inputOperations.createPluginOptions());
        joinRunner.setRightAdapterTypeName("qmdltb");
        // set out layer
        joinRunner.setOutputStore(outputOperations.createPluginOptions());
        joinRunner.setOutputLeftAdapterTypeName("leftjoin");
        joinRunner.setOutputLeftAdapterTypeName("rightjoin");

        joinRunner.setPredicate(new GeomIntersects());
        //joinRunner.setPartCount(6000);

        joinRunner.run();
    }
}

And i package into a jar, use SparkLauncher to submit

public class SparkRun {
    public static void main(String[] args)
            throws IOException, InterruptedException {

        CountDownLatch countDownLatch = new CountDownLatch(1);

        HashMap env = new HashMap();

        env.put("HADOOP_CONF_DIR", "/home/hwang/hadoop-conf");
        SparkAppHandle handle = new SparkLauncher(env)
                .setAppResource("/home/hwang/eclipse-workspace/geowaveoverlay/overlay/target/overlay-1.0-SNAPSHOT.jar")
                .addJar("/home/hwang/eclipse-workspace/geowaveoverlay/overlay/target/overlay-1.0-SNAPSHOT.jar")
                .setMainClass("com.wh.GOverLay")

                .setAppName(new Date().toString())
                .setSparkHome("/home/hwang/spark-2.2.0-bin-hadoop2.6")
                .setMaster("yarn-cluster")
                .setVerbose(true)
                .setConf(SparkLauncher.DRIVER_MEMORY, "4g")
                .startApplication();
        // Use handle API to monitor / control application.
        handle.addListener(new SparkAppHandle.Listener() {
            public void stateChanged(SparkAppHandle sparkAppHandle) {
                System.out.println("state:" + sparkAppHandle.getState().toString());
            }

            public void infoChanged(SparkAppHandle sparkAppHandle) {
                System.out.println("Info:" + sparkAppHandle.getState().toString());
            }

        });
        countDownLatch.await();
    }
}
rfecher commented 5 years ago

how do you package it? Do you use the maven shade plugin? If so, make sure to use the ServicesResourceTranformer such as this. And if not, make sure the META-INF/services/* files are included and appended together when multiple libraries contribute the same file.

scially commented 5 years ago

Do you mean this "/home/hwang/eclipse-workspace/geowaveoverlay/overlay/target/overlay-1.0-SNAPSHOT.jar"? I use maven shade plugin to package jar which includes geowave-analytic-spark-SNAPSHOT-1.0.0.jarand geowave-accumulo-datastore-SNAPSHOT-1.0.0.jar and so on. Do you mean when i package my jar, i need use ServicesResourceTranformer?

rfecher commented 5 years ago

yes, the shade plugin by default would just overwrite when multiple maven modules use the same file, but this behavior makes no sense with Java SPI files. The ServicesResourceTranformer is useful to append SPI files for this reason.

scially commented 5 years ago

thanks... I will try it and hope I am allowed to consult you in the future if I have any problem

scially commented 5 years ago

This is my key code,but when i submit to yarn by spark-submit, my cluster resources are running out and there has some error:

       SparkConf conf = GeoWaveSparkConf.getDefaultConfig()
                .set("spark.yarn.jars","hdfs://10.66.150.5:8020/spark-jars/*.jar")
                //.setJars(new String[]{"/home/hwang/eclipse-workspace/geowaveoverlay/overlay/target/overlay-1.0-SNAPSHOT.jar"})
                .setMaster("yarn-client");

        SparkSession spark = GeoWaveSparkConf.createDefaultSession(conf);

        GeomFunctionRegistry.registerGeometryFunctions(spark);
        GeoWaveSpatialEncoders.registerUDTs();

        SpatialJoinRunner joinRunner = new SpatialJoinRunner(spark);
        // set layer1
        joinRunner.setLeftStore(inputOperations1.createPluginOptions());
        joinRunner.setLeftAdapterTypeName("qmdltb"); //csztgh
        // set layer2
        joinRunner.setRightStore(inputOperations2.createPluginOptions());
        joinRunner.setRightAdapterTypeName("csztgh"); //qmdltb
        // set out layer
        joinRunner.setOutputStore(outputOperations.createPluginOptions());
        joinRunner.setOutputLeftAdapterTypeName("leftjoin");
        joinRunner.setOutputLeftAdapterTypeName("rightjoin");

        joinRunner.setPredicate(new GeomIntersects());
        // is it too small?
        joinRunner.setPartCount(1000);
18/12/20 18:34:34 INFO scheduler.DAGScheduler: Job 2 failed: isEmpty at TieredSpatialJoin.java:371, took 714.866837 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 148 in stage 53.0 failed 4 times, most recent failure: Lost task 148.3 in stage 53.0 (TID 101253, server7, executor 84): ExecutorLostFailure (executor 84 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 138297 ms
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2024)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2045)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2064)
        at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1462)
        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462)
        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1461)
        at org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544)
        at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
        at org.locationtech.geowave.analytic.spark.spatial.TieredSpatialJoin.join(TieredSpatialJoin.java:371)
        at org.locationtech.geowave.analytic.spark.spatial.SpatialJoinRunner.run(SpatialJoinRunner.java:114)
        at com.wh.GOverLay.main(GOverLay.java:74)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

My cluster configuration is snipaste_2018-12-20_20-37-38 And the intersect of two shape that have been ingest to accumulo is: image

snipaste_2018-12-20_20-41-58

And i have a problem: I use arcgis for overlay analysis, only a few minutes, but for geowave and spark, why it is need large resource and so slow? And Is there a problem with my code? thanks....

scially commented 5 years ago

the Yarn running situation

snipaste_2018-12-20_21-04-02

image

snipaste_2018-12-20_21-07-49

Is it necessary to increase server memory? But in this case, is the memory required by the program too much?

rfecher commented 5 years ago

Our expert in the spatial join implementation, @JWileczek, probably has some ideas too, but one thing I think you should try is to explicitly set min and max splits to some reasonable value (it should be an option from the geowave spatial join runner). I know it defaults to spark.default.parallelism if its not explicilty provided and that is set in your spark config. But if thats not set, it roughly uses the number of ranges as the number of splits which is often way too many input splits and results in a lot of unnecessary overhead.

rfecher commented 5 years ago

also, I should say that distributed processing isn't necessarily faster than local - if the size of the job is small enough that it can be done locally, there is overhead associated with distributing it. But leveraging the distributed technologies does allow you to scale the job well beyond anything that would even be feasible within a reasonable time locally. That being said, for this to fail, something must be going on (again my inclination is too many splits, but I'm really just glancing at this and could be missing something important).

JWileczek commented 5 years ago

Hi @scially, sorry you are having such issues with the spatial join hopefully I'll be able to help you track down what the issue is and get you moving forward. I'll need some more information to better help you along in this scenario mainly I need the size of the cluster you are working with for Spark and the size of the datasets you are attempting to join. When it comes to spatial joins in Spark there are many things that will effect the job itself and its performance is largely driven by the configuration of the Spark cluster itself. Like Rich has mentioned above one thing that greatly effects the performance of the join is the number of splits (a.k.a partitions) used for the data when it is loaded into Spark. However many splits you specify when loading the RDD or via the setPartCount function of the runner will be the number of partitions the resulting RDD will use when loading the data. This partition count is very important to Spark because it defines the base block size and parallelism Spark will use during the join. The partition count can be both too large or too small dependent on the size of the data we are working with, and the size of the cluster we are trying to join on. Figuring out a adequate partition count is largely what these memory issues boil down to. In an ideal world we would be able to calculate this for you, but the join algorithm is not at that point yet in its current implementation as it is a fairly complicated problem to solve. When it comes to thinking about partition count I like to start just by considering the total dataset size vs the size/memory of an single executor that we are working with. In Spark, individual tasks operate on partitions of data and a single thread of an executor can only operate on one task at a time. Using too few partitions can leave large parts of the cluster un-utilized for the work, and can still result in OOM errors depending on the data size we are working with. Using an extreme example to demonstrate the point: If we had a dataset that takes up 250gb of space and we split it across 4 partitions for instance. Spark would allow you to do this, but if you wanted to then reasonably work with this data without OOM errors you would need executors capable of holding roughly 62.5gb in memory because tasks that run on the executors will still try to load at least 1 partition(s) worth of data to run the task. This example is unrealistic but demonstrates the basic issue. The issue becomes more complicated once you consider the fact that executors will attempt to run multiple tasks at once depending on the number of cores you've configured the executors to use. This paired with the fact that some partitions will be larger than others makes coming up with a perfect partition count very difficult. For spatial joins, it's actually recommended to err on the side of too many partitions rather than too few. I hope this information helps you understand a little more about what is happening, and to help you further if you can provide me with the following information we can hopefully come up with some configuration settings that will help you move forward:

  1. What is the size of the data that we are attempting to join?
  2. What is size of the YARN cluster we are working with? Master/Slave node count, Hardware resources of master and slave node (RAM, Disk, Cores)
  3. What is the configuration for the Spark executors? Memory and Cores mainly

When working with very large datasets (over 200gb) it's not unheard of to see partition counts >3000 and sometimes up to 6000. In my experience, aiming for max partition sizes of less than 64mb has yielded the best results but that again is dependent on executor size as well.

scially commented 5 years ago

firstly, thanks for you and rfecher ... there are some configuration:

  1. I don't know how to describe the data size to you. So i give you two will give you two instructions:

    1. size of shapfile about two datasets shapefile-size attribute1 attributes

    2. row counts of accumulo i ingest two shapefile into accumulo, below is my code

      
      geowave config addstore -t accumulo -u geowave -p geowave -i accumulo  -z server7:2181,server8:2181,server9:2181 --gwNamespace geowave.over1 overlay1

geowave config addindex -c EPSG:2384 -t spatial -ps HASH -np 4 overlay1-spatial

geowave ingest localtogw -t 4 -f geotools-vector ./qmdltb.shp overlay1 overlay1-spatial

geowave config addstore -t accumulo -u geowave -p geowave -i accumulo -z server7:2181,server8:2181,server9:2181 --gwNamespace geowave.over2 overlay2

geowave config addindex -c EPSG:2384 -t spatial -ps HASH -np 4 overlay2-spatial

geowave ingest localtogw -t 4 -f geotools-vector ./csztgh.shp overlay2 overlay2-spatial

![accumulo](https://user-images.githubusercontent.com/22127608/50319819-4f4d8800-0504-11e9-9c71-f2507c59f569.jpg)
 And i also published to geserver
![geoserver](https://user-images.githubusercontent.com/22127608/50320054-94be8500-0505-11e9-9cc0-e9b88c3f90ed.jpg)

2. My Cluster Configuration:
    1. Cluster Server
![cpu](https://user-images.githubusercontent.com/22127608/50320762-d13fb000-0508-11e9-86c6-31a8d8f0a1ef.jpg)

    2. Yarn Cluster
![yarn](https://user-images.githubusercontent.com/22127608/50320577-ff70c000-0507-11e9-96ac-1d7251a10e50.jpg)

3. Spark executors

spark2-submit --master yarn --deploy-mode client --class com.wh.GOverLay ./overlay-1.0-SNAPSHOT.jar 6000 # 6000 is partCount

And i don't set extra parameters, so i guess the executors memory is default:

--executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G). --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode)


Don't know if this information meets the requirements? 
thanks very much...
scially commented 5 years ago

there is my code

        AccumuloOptions options = new AccumuloOptions();
        AccumuloRequiredOptions inputOperations1 = new AccumuloRequiredOptions("server7:2181,server8:2181,server9:2181",
                "accumulo",
                "geowave",
                "geowave",
                "geowave.over1", //xzq
                options);
        AccumuloRequiredOptions inputOperations2 = new AccumuloRequiredOptions("server7:2181,server8:2181,server9:2181",
                "accumulo",
                "geowave",
                "geowave",
                "geowave.over2", //xzq
                options);

        // output accumulo
        AccumuloRequiredOptions outputOperations = new AccumuloRequiredOptions("server7:2181,server8:2181,server9:2181",
                "accumulo",
                "geowave",
                "geowave",
                "geowave.lgcy",
                options);

        SparkConf conf = GeoWaveSparkConf.getDefaultConfig()
                .set("spark.yarn.jars","hdfs://10.66.150.5:8020/spark-jars/*.jar")
                //.setJars(new String[]{"/home/hwang/eclipse-workspace/geowaveoverlay/overlay/target/overlay-1.0-SNAPSHOT.jar"})
                ;//.setMaster("yarn-client");

        SparkSession spark = GeoWaveSparkConf.createDefaultSession(conf);

        GeomFunctionRegistry.registerGeometryFunctions(spark);
        GeoWaveSpatialEncoders.registerUDTs();

        SpatialJoinRunner joinRunner = new SpatialJoinRunner(spark);

        // set layer1
        joinRunner.setLeftStore(inputOperations1.createPluginOptions());
        joinRunner.setLeftAdapterTypeName("qmdltb"); //csztgh
        // set layer2
        joinRunner.setRightStore(inputOperations2.createPluginOptions());
        joinRunner.setRightAdapterTypeName("csztgh"); //qmdltb
        // set out layer
        joinRunner.setOutputStore(outputOperations.createPluginOptions());
        joinRunner.setOutputLeftAdapterTypeName("leftjoin");
        joinRunner.setOutputLeftAdapterTypeName("rightjoin");

        joinRunner.setPredicate(new GeomIntersects());
        joinRunner.setPartCount(Integer.valueOf(args[0]));

        joinRunner.run();

and i set the partCount to 6, 6000, 10000 and 20000, but the program failed to execute.

However, i also try to make overlay analysis use other shapefile that has been ingested to geowave, and set PartCount to 1000 ,and succeed.

SPATIAL_IDEX_HASH_64

and this layer onle have 9 attribute, i use this layer to overlay this layer. xzq

hsg77 commented 5 years ago

@scially Your test data is too small. Is the number of 6000 partitions too large? 你这测试数据量太小,6000分区数是不是太大了

scially commented 5 years ago

@hsg77
但是我设置为60仍然不行 but it is not success when i set partcount=60

hsg77 commented 5 years ago

@scially When you upload layer data from geowave, try using partitioning strategy for NONE, but don't use it first. HASH and ROUND_ROBIN strategies, the data uploaded by query using ROUND_ROBIN strategy can not be found, and the data uploaded using NONE strategy can be found. 你geowave上传图层数据时采用分区策略为NONE试一下,(先不用 HASH和ROUND_ROBIN策略,我先前空间查询query采用ROUND_ROBIN策略上传的数据就查不出来,采用NONE策略上传的数据就可以查出来)

scially commented 5 years ago

@hsg77 我现在用的HASH_64,可以查出来~,但是我现在在做叠加分析,用geowave的 spark模块,但是总是有问题~ now i set stategies to HASH_64 and i can query fastly, but i have some question when i use spark module of geowave.

hsg77 commented 5 years ago

@scially Is it the same problem as issue #1469? How about using EPSG: 4326 coordinate system (world 1984)?

跟 issue#1469问题是不是一样的问题?你采用EPSG:4326坐标系(world 1984) 试一下呢?

scially commented 5 years ago

@hsg77 好像还真的是哦。多谢大佬,我试一下,可以留一个联系方式么? 不过我之前用其他坐标系的小型shape,然后也分析成功了。。。 我搞一份最新的代码试一下。 I used to use other coordinate system shapefiles that it's size is smaller than now for analysis, and it is success thanks .. and I cloned the latest code and try ...

scially commented 5 years ago

@hsg77 我更新了代码,但是还是不行。 我用wgs84坐标也试了下,也有问题。但是我用小数据量大概就2兆左右的,一共也就10条属性,这样做一个叠加分析,就可以。 i update code and try it again, and it still don't work.

hsg77 commented 5 years ago

@scially joinRunner.setOutputLeftAdapterTypeName("leftjoin"); joinRunner.setOutputLeftAdapterTypeName("rightjoin"); change to joinRunner.setOutputRightAdapterTypeName("rightjoin");

I used version 0.9.8, stored in HBase 1.4.6 My SHP data is uploaded to a store(namespace=tablename=overInputLayer) that contains two adapters (layer_1, layer_2)

Try the following code:

AccumuloOptions options = new AccumuloOptions(); AccumuloRequiredOptions input = new AccumuloRequiredOptions("server7:2181,server8:2181,server9:2181", "accumulo", "geowave", "geowave", "overInputLayer", options);

Conf. setJars (JavaSparkContext. jarOfClass (this. getClass ()); SparkSession spark = GeoWaveSparkConf.createDefaultSession(conf);

GeomFunctionRegistry.registerGeometryFunctions(spark); GeoWaveSpatialEncoders.registerUDTs();

SpatialJoinRunner joinRunner = new SpatialJoinRunner(spark);

joinRunner.setLeftStore(input .createPluginOptions()); joinRunner.setRightStore(input .createPluginOptions());

joinRunner.setLeftAdapterTypeName("layer_1"); joinRunner.setRightAdapterTypeName("layer_2");

// set out layer joinRunner.setOutputStore(outputOperations.createPluginOptions()); joinRunner.setOutputLeftAdapterTypeName("leftjoin"); joinRunner.setOutputRightAdapterTypeName("rightjoin");

joinRunner.setMaster("spark://master:7077");

joinRunner.setPredicate(new GeomIntersects()); joinRunner.run();

scially commented 5 years ago

@hsg77 OK,there is a error, thanks... i try it .

hsg77 commented 5 years ago

@scially Which version of geowave do you quote? Is it version 1.0 of geowave? Look at the word "org. location tech" in your code.

scially commented 5 years ago

@hsg77 1.0

scially commented 5 years ago

@hsg77 I have corrected the code, but it still doesn't work... i want to ask, is your program slow at this step I want to ask, is your program slow at this step?? image

hsg77 commented 5 years ago

@scially @rfecher @JWileczek The results of my operation are as follows: Upload data: partition strategy is none, partition number is 1 partcount=-1 LeftTier count = 81464, rightTier count = 12,1830 Spatial join result: Run 2 minutes 30 seconds Leftjoin count = 54, Rightjoin count = 16

LeftTier count = 688462, rightTier count = 1672008 Spatial join results: (1) Upload data: partition strategy is none, partition number is 1 partcount=-1 I reported an insufficient memory error. (2) Upload data: partition strategy hash, partition number 8, partcount=-1 I reported an insufficient memory error. (3) Upload data: partition strategy is hash, partition number 64, partcount = 1000 Run 53 minutes 56 seconds Leftjoin count = 11103, Rightjoin count = 17872

I also run very slowly.

hsg77 commented 5 years ago

@scially @rfecher When creating a layer, how to set the IndexStrategy of a layer to TieredSFCIndexStrategy

rfecher commented 5 years ago

When you call DataStore.addType() or DataStore.addIndex() providing the indices associated with a feature type, you have the opportunity to define whatever indexing you'd like. For example, this creates the default spatial indexing given set of options. Following similar logic, you could replace this line with a call to TieredSFCIndexFactory such as this factory method.

JWileczek commented 5 years ago

@hsg77 I think the issues with slowness is likely due to how Spark memory usage + garbage collection is being done for the job. I would try again with the following settings and see if you have different results:

partition strategy: hash
partition number: 8
partcount: 3000

In addition to these settings I generally have better results if I use a different garbage collection method for Spark than the default parallel collection method. G1GC is available to Spark as a garbage collection strategy and tends to lead to better results with larger Spatial Join jobs. You can see more about setting up G1GC here: https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html

For the feature counts you are trying to join I don't see needing more than 8 partitions for the store, but you'll potentially need more for the Spark RDD (partCount) based on how your executors are setup (cores, instances, memory usage). At a partition count of ~3000+ Spark begins to use compressed partitions, so generally the rule of thumb is if you are needing 1000-2000 partitions for a job just to bump it up to 3000+ so Spark will use less memory in the task tracking itself. Getting the speed up is a combination of finding the right Spark executor/memory configuration + garbage collection configuration + partition count configuration. I would start by modifying the garbage collection method since you have a working but slow result and then move from there. Hopefully, this will help you guys get some better speed out of the Spatial Join.

hsg77 commented 5 years ago

@rfecher The generated HBase table name changes to: Tk_1326426130_662275224 (using TieredSFCIndexStrategy) Tk_GEOWAVE_METADATA A little strange with the original index table name: Tk_SPATIAL_IDX (using XZHierarchicalIndexStrategy) Tk_GEOWAVE_METADATA

(1) Do you usually use TieredSFCIndexStrategy or XZHierarchicalIndexStrategy to create layers in your tests?

The input parameter indexStrategy is the TieredSFCIndexStrategy class (2) The support Join function returns always false?

public boolean supportsJoin(NumericIndexStrategy indexStrategy ) { return indexStrategy != null && indexStrategy.getClass().isInstance(TieredSFCIndexStrategy.class);

}

hsg77 commented 5 years ago

image

hsg77 commented 5 years ago

@JWileczek The main reason is that this line of code is slow to execute and runs out of memory:

this.combinedResults.isEmpty();

Is it related to the Spark operating environment?

hsg77 commented 5 years ago

@JWileczek Use partition strategy: hash partition number: 8 partcount: 50

-XX:+UseG1GC

LeftTier count = 688462, rightTier count = 1672008 Spatial join results: Run 33 minutes 36 seconds Leftjoin count = 11103, Rightjoin count = 17872

this.combinedResults.isEmpty() This line of code takes 32 minutes to execute.

There are 10 workers and 10 X 18G available memory.

One of the spark workers'execution logs:

Spark Executor Command: "/usr/cwgis/app/jdk/bin/java" "-cp" "/usr/cwgis/app/spark/jars/lib/:/usr/cwgis/app/spark/conf/:/usr/cwgis/app/spark/jars/:/usr/cwgis/app/hadoop/etc/hadoop/" "-Xmx8192M" "-Dspark.driver.port=41433" "-XX:+UseG1GC" "-XX:+PrintGCDetails" "-Dkey=value" "-Dnumbers=one two three" "-verbose:gc" "-XX:+UnlockDiagnosticVMOptions" "-XX:+G1SummarizeConcMark" "-XX:InitiatingHeapOccupancyPercent=35" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@node114:41433" "--executor-id" "7" "--hostname" "192.168.30.119" "--cores" "2" "--app-id" "app-20190214170628-0007" "--worker-url" "spark://Worker@192.168.30.119:42709"

scially commented 5 years ago

Is geowave suitable for overlay analysis similar to ArcGIS, where the user specifies two layers, then geowave analyzes and publishes to geoserver ?

hsg77 commented 5 years ago

@scially According to the intersection algorithm of spatial join in geowave space connection, I wrote a function similar to intersect in arcgis. Small amount of data can be used, and large amount of data needs memory overflow as 1000W. 我根据geowave的空间连接spatialjoin的相交算法写了一个类似于arcgis中intersect功能,小数据量可以,大数据量1000w一样要内存溢出

hsg77 commented 4 years ago

@scially //========================================发布到geoserver服务器上 this.gs.AddDataStoreAndLayer(outputStoreName,outputLeftJoinName,newFeatureAdapter); //======================================== `package com.sapsoft;

import it.geosolutions.geoserver.rest.GeoServerRESTManager; import it.geosolutions.geoserver.rest.GeoServerRESTPublisher; import it.geosolutions.geoserver.rest.GeoServerRESTReader; import it.geosolutions.geoserver.rest.encoder.GSLayerEncoder; import it.geosolutions.geoserver.rest.encoder.GSResourceEncoder; import it.geosolutions.geoserver.rest.encoder.datastore.GSSproutDatastoreEncoder; import it.geosolutions.geoserver.rest.encoder.feature.GSFeatureTypeEncoder; import mil.nga.giat.geowave.adapter.vector.FeatureDataAdapter; import org.apache.commons.httpclient.NameValuePair;

import java.io.File; import java.net.MalformedURLException; import java.net.URL;

//采用geoserver-manager开源项目扩展的功能 //vp:hsg //create date:2019-07-15 public class gsUtil { public final String RESTURL;

public  final String RESTUSER;

public  final String RESTPW;

public  final String GS_VERSION;

public  java.net.URL URL;

public  GeoServerRESTManager manager;

public  GeoServerRESTReader reader;

public  GeoServerRESTPublisher publisher;

//private static ResourceBundle bundle = ResourceBundle.getBundle("constant");

//初始化用户名密码赋值,发布图集时会进行身份认证
public gsUtil()
{
    RESTURL = getenv("gsmgr_resturl", Appsproutgis.geoserverUrl); //"http://192.168.30.114:8080/geoserver/"
    RESTUSER = getenv("gsmgr_restuser", Appsproutgis.geoserverUser);  //"admin"
    RESTPW = getenv("gsmgr_restpw", Appsproutgis.geoserverPassword);  //"geoserver"
    GS_VERSION = getenv("gsmgr_version", "2.13.2");
    try {
        URL = new URL(RESTURL);
        manager = new GeoServerRESTManager(URL, RESTUSER, RESTPW);
        reader = manager.getReader();
        publisher = manager.getPublisher();
    } catch (MalformedURLException e) {
        e.printStackTrace();
    }
}

//获取环境信息
private static String getenv(String envName, String envDefault) {
    String env = System.getenv(envName);
    String prop = System.getProperty(envName, env);
    return prop != null ? prop : envDefault;
}

public  boolean removeLayer(String workspace, String storeName) {
    return publisher.removeLayer(workspace, storeName);
}

public  boolean createWorkspace(String ws) {
    return publisher.createWorkspace(ws);
}

public  boolean removeDatastore(String workspace, String storename) {
    return publisher.removeDatastore(workspace, storename, true);
}

public  boolean createSproutDatastore(String workspace, String storename, String zookeeper, String gwNamespace, boolean IsSpark) {

    GSSproutDatastoreEncoder sds = new GSSproutDatastoreEncoder(storename);
    sds.setZookeeper(zookeeper);
    sds.setNamespace(storename);
    sds.setIsSpark(true);
    return publisher.createSproutDatastore(workspace, sds);
}
public  boolean publishLayer_sprout(String workspace,String storename,String layername,String srs,String defaultStyle)
{
    return publisher.publishLayer_sprout(workspace,storename,layername,srs,defaultStyle);
}

//自动添加sprout数据源和发布图层
public boolean AddDataStoreAndLayer(String outputStoreName, String outputLeftJoinName, FeatureDataAdapter newFeatureAdapter)
        throws Exception
{
    //=========================================================发布到geoserver服务器上
    //System.out.println("getFeatureLayer from geoserver="+this.gs.getFeatureLayer(outputLeftJoinName));
    this.publisher.removeLayer(Appsproutgis.geoserverWS,outputLeftJoinName);
    this.removeDatastore(Appsproutgis.geoserverWS,outputLeftJoinName);
    this.createSproutDatastore(Appsproutgis.geoserverWS,outputStoreName,Appsproutgis.zookeeper,outputStoreName,false);
    String defaultStyle="DistributedRender-Polygon";
    String tGeometryType=newFeatureAdapter.getFeatureType().getGeometryDescriptor().getType().getBinding().getName();
    System.out.println(tGeometryType);
    //com.vividsolutions.jts.geom.Point
    //com.vividsolutions.jts.geom.MultiLineString
    //com.vividsolutions.jts.geom.MultiPolygon
    if(tGeometryType.contains("MultiPoint") || tGeometryType.contains("Point"))      defaultStyle="DistributedRender-Point";
    if(tGeometryType.contains("MultiLineString") || tGeometryType.contains("LineString")) defaultStyle="DistributedRender-Line";
    if(tGeometryType.contains("MultiPolygon") || tGeometryType.contains("Polygon")) defaultStyle="DistributedRender-Polygon";
    this.publishLayer_sprout(Appsproutgis.geoserverWS,outputStoreName,outputStoreName,Appsproutgis.crs,defaultStyle);
    //=========================================================
    return true;
}

} `