basiszwo / flak-flink-geomesa

Apache Flink RichSinkFunction for Apache Accumulo
GNU General Public License v3.0
5 stars 0 forks source link

Validate geo position #1

Open basiszwo opened 7 years ago

basiszwo commented 7 years ago

We need to validate the geo position to avoid out of bounds errors

java.lang.IllegalArgumentException: requirement failed: Value out of bounds ([-90.0 90.0]): 114.03863525390625
    at scala.Predef$.require(Predef.scala:219)
    at org.locationtech.geomesa.curve.NormalizedDimension$class.normalize(NormalizedDimension.scala:17)
    at org.locationtech.geomesa.curve.NormalizedLat.normalize(NormalizedDimension.scala:23)
    at org.locationtech.geomesa.curve.Z3SFC.index(Z3SFC.scala:27)
    at org.locationtech.geomesa.utils.uuid.Z3UuidGenerator$.createUuid(Z3FeatureIdGenerator.scala:93)
    at org.locationtech.geomesa.utils.uuid.Z3UuidGenerator$.createUuid(Z3FeatureIdGenerator.scala:70)
    at org.locationtech.geomesa.utils.uuid.Z3FeatureIdGenerator.createId(Z3FeatureIdGenerator.scala:32)
    at org.locationtech.geomesa.index.geotools.GeoMesaFeatureWriter$.featureWithFid(GeoMesaFeatureWriter.scala:64)
    at org.locationtech.geomesa.index.geotools.GeoMesaFeatureWriter.writeFeature(GeoMesaFeatureWriter.scala:128)
    at org.locationtech.geomesa.index.geotools.GeoMesaAppendFeatureWriter$class.write(GeoMesaFeatureWriter.scala:194)
    at org.locationtech.geomesa.accumulo.data.AccumuloAppendFeatureWriter.write(AccumuloFeatureWriter.scala:19)
    at org.locationtech.geomesa.index.geotools.GeoMesaFeatureStore.addFeatures(GeoMesaFeatureStore.scala:44)
    at one.flak.flinkgeomesa.sinks.BaseGeomesaSink.flushFeatureCollection(BaseGeomesaSink.java:117)
    at one.flak.flinkgeomesa.sinks.BaseGeomesaSink.invoke(BaseGeomesaSink.java:70)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
    at java.lang.Thread.run(Thread.java:748)
basiszwo commented 7 years ago

Can be fixed short term by catching Exception or IllegalArgumentException in https://github.com/basiszwo/flak-flink-geomesa/blob/7be6b6eecd076ed491b5b9376f9dcc61605baa26/src/main/java/one/flak/flinkgeomesa/sinks/BaseGeomesaSink.java#L120 but long term invalid positions should be rejected by sample not by batch (as the short term fix would do)

ankitK1987 commented 6 years ago

Hello Basiszwo,

Apology to post the comment here. I actually want to design a GIS solution with the help of GeoMesa. I came across this repo, but I don't know what exactly it does.. Could you please shed some light over it and whether you are using Flink with GeoMesa instead of Accumulo?

Thanks a lot Ankit Khandelwal