apache / sedona

A cluster computing framework for processing large-scale geospatial data
https://sedona.apache.org/
Apache License 2.0
1.96k stars 692 forks source link

geospark geomesa interoperation #253

Closed geoHeil closed 1 year ago

geoHeil commented 6 years ago

As a data scientist I want to be able to mix and match spatial libraries for spark. Currently, it is rather XOR as they do not integrate with each other and have overlapping classes and UDF function names.

In particular I would want to be able to easily integrate geospark and geomesa

One possibility could be to write my own udf registrator: https://github.com/DataSystemsLab/GeoSpark/blob/master/sql/src/main/scala/org/datasyslab/geosparksql/UDF/UdfRegistrator.scala

def registerAll(sparkSession: SparkSession): Unit = {
Catalog.expressions.foreach(f=>FunctionRegistry.builtin.registerFunction("geospark_"+f.getClass.getSimpleName.dropRight(1),f))
    Catalog.aggregateExpressions.foreach(f=>sparkSession.udf.register("geospark_"+f.getClass.getSimpleName,f))
  }

However this is still not handling overlapping classes (JTS, geotools)

geoHeil commented 6 years ago

The suggested workaround seems to work partially. When not renaming UDF, this is a fallback to geomesa's functions.

But when renaming also the UDF (I actually want to get the speedup of geospark) the functions do not seem to be properly registered

Exception in thread "main" org.apache.spark.sql.AnalysisException: Undefined function: 'geospark_ST_Point'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 0
geoHeil commented 6 years ago

An reproducible example can be found at https://github.com/geoHeil/geomesa-geospark

geoHeil commented 6 years ago

I get the following problems:

clash of classes

18-07-17 21:36:03 WARN UDTRegistration: Cannot register UDT for com.vividsolutions.jts.geom.Geometry, which is already registered.

when changing the scope from compileOnly to compile and executing in IDEA. Execution via the fat jar from the build tool in a shell fails with a time out.

jiayuasu commented 6 years ago

@geoHeil This is probably because GeoMesa also has its own customize Geometry kryo serializer which is same as GeoSpark. GeoSpark wrote a bunch of code to put spatial indexes and geometries into an array. Since we both utilize JTS geometry, this could be a conflict.

geoHeil commented 6 years ago

See the latest updates to https://github.com/geoHeil/geomesa-geospark

one problems remains:

Exception in thread "main" java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.UnsafeArrayData cannot be cast to org.apache.spark.sql.catalyst.InternalRow
comparison

geospark & geomesa

regular join

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)], output=[count#120L])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#124L])
      +- *Project
         +- BroadcastNestedLoopJoin BuildRight, Inner,  **org.apache.spark.sql.geosparksql.expressions.ST_Contains$**
            :- LocalTableScan [geom_polygons#72]
            +- BroadcastExchange IdentityBroadcastMode
               +- LocalTableScan [geom_points#60]

geospark solo

optimized range join

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)], output=[count#81L])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#85L])
      +- *Project
         +- RangeJoin geom_polygons#43: geometry, geom_points#31: geometry, false
            :- LocalTableScan [geom_polygons#43]
            +- LocalTableScan [geom_points#31]
geoHeil commented 6 years ago

@jiayuasu do you believe this conflict is causing the problem that spark resorts to regular, i.e. no longer optimized joins?

jiayuasu commented 6 years ago

@geoHeil You probably can try to register GeoSpark join strategy manually: https://github.com/DataSystemsLab/GeoSpark/blob/master/sql/src/main/scala/org/datasyslab/geosparksql/utils/GeoSparkSQLRegistrator.scala

In other words, add the following line:

sparkSession.experimental.extraStrategies = JoinQueryDetector :: Nil
geoHeil commented 6 years ago

@jiayuasu thanks a lot. This is correct / was lacking from my registrator. Now optimized range joins are used as well.

Do you have an opinion regarding UDT registration / clashing class names? Or a better Idea than my own above with shading?

geospark

import com.vividsolutions.jts.geom.Geometry
import com.vividsolutions.jts.index.SpatialIndex
UDTRegistration.register(classOf[Geometry].getName, classOf[GeometryUDT].getName)
UDTRegistration.register(classOf[SpatialIndex].getName, classOf[IndexUDT].getName)

geomesa

import com.vividsolutions.jts.geom._
val typeMap: Map[Class[_], Class[_ <: UserDefinedType[_]]] = Map(
    classOf[Geometry]            -> classOf[GeometryUDT],
    classOf[Point]               -> classOf[PointUDT],
    classOf[LineString]          -> classOf[LineStringUDT],
    classOf[Polygon]             -> classOf[PolygonUDT],
    classOf[MultiPoint]          -> classOf[MultiPointUDT],
    classOf[MultiLineString]     -> classOf[MultiLineStringUDT],
    classOf[MultiPolygon]        -> classOf[MultiPolygonUDT],
    classOf[GeometryCollection]  -> classOf[GeometryCollectionUDT]
  )
  1. is there a good way to merge the registrations (prevent double registrations (or can these simply be ignored?
  2. is geospark adding some custom (https://github.com/jiayuasu/JTSplus) code under the namespace of com.vividsolutions.jts.* i.e. when the order is
    • geospark
    • geomesa no functionality is lost, any warning regarding double registration of the types could be ignored
geoHeil commented 6 years ago

Geomesa will serialize using JTS

override def serialize(obj: T): InternalRow = {
    new GenericInternalRow(Array[Any](WKBUtils.write(obj)))
  }

  override def sqlType: DataType = StructType(Seq(
    StructField("wkb", DataTypes.BinaryType)
  ))
  override def deserialize(datum: Any): T = {
    val ir = datum.asInstanceOf[InternalRow]
    WKBUtils.read(ir.getBinary(0)).asInstanceOf[T]
  }

geospark using

def serialize(geometry: Geometry): Array[Byte] = {
    val out = new ByteArrayOutputStream()
    val kryo = new Kryo()
    val geometrySerde = new GeometrySerde()
    val output = new Output(out)
    geometrySerde.write(kryo, output, geometry)
    output.close()
    return out.toByteArray
  }

  def deserialize(values: ArrayData): Geometry = {
    val in = new ByteArrayInputStream(values.toByteArray())
    val kryo = new Kryo()
    val geometrySerde = new GeometrySerde()
    val input = new Input(in)
    val geometry = geometrySerde.read(kryo, input, classOf[Geometry])
    input.close()
    return geometry.asInstanceOf[Geometry]
  }

is there any problem if JTS code (from geomesa) is serialized via the geospark serializer? Any problems regarding efficiency?

geoHeil commented 6 years ago

According to James (from geomesa gitter chat)

One strategy that might work would be for GeoSpark and GeoMesa to agree on the classnames for UDT registrations and then for end users to register ONE AND ONLY ONE set of the UDTs... the UDFs could be based on those classnames, and there's a fight chance that'd let someone 'mix and match' (as well as combine UDFs between packages)

is there some interest from both projects to collaborate here?

geoHeil commented 6 years ago
cannot resolve 'CAST(`hw_aggreagtion_area` AS ARRAY<TINYINT>)' due to data type mismatch: cannot cast org.apache.spark.sql.jts.PointUDT@449554e8 to org.apache.spark.sql.geosparksql.UDT.GeometryUDT@3f2c1eb5;

this is then the problem of clashing UDT. How could this be resolved (quickly)?

jnh5y commented 5 years ago

@geoHeil your blog post about this work is great!

@jiayuasu any thoughts integration points?

geoHeil commented 4 years ago

With the upgrade to geomesa 2.4.x geotools was upgrade to version 21 this also internally switches to locationtech based JTS.

https://github.com/DataSystemsLab/GeoSpark/issues/410

Unfortunately, having two versions of geotools on the classpath is causing troubles for me

geoHeil commented 4 years ago

Instead of some discussions in the gitter channels of geospark and geoemsa perhaps this is the better place to continue the discussion https://github.com/DataSystemsLab/GeoSpark/issues/253

Tasks to be done:

Short term

Long term

@jiayuasu , @jnh5y what do you think about this? @jnh5y I believe in some of the gitter discussion you (or maybe Emilio - at least someone) mentioned to maybe have some interns working on this. Do you know of any progress there? I believe this was on the geomesa gitter channel.

elahrvivaz commented 4 years ago

Just as a clarification, shading refers to packaging the transitive dependencies in an uber-jar, but what you are referring to is shading + relocation, which will hide the transitive dependencies from everything else on the classpath. That seems like a good medium-term solution, although I will note that you may run into issues in your end project if you try to use the shade plugin there, while having a dependency that is also shaded/relocated (i.e. 2 levels of shading will likely not work).