geodocker / geodocker-geomesa

Containers for GeoMesa enable Accumulo
70 stars 40 forks source link

Help on geomesa spark with hbase datasource #15

Open qubin-ben opened 6 years ago

qubin-ben commented 6 years ago

I know this is not a project for geomesa, but I failed to find issue request part in that part. Hoping to get any help under this project.

Back to the topic:

I am encountering issue when launching a spark-shell command remotely to geomesa spark cluster. It works fine when I configure spark-shell, hbase, and spark on the same node. It also works fine when I launch the spark-shell on the same host with the HBase server(Spark cluster has 1 master + 2 workers, HBase Datasource working on standalone mode) running on. It seems to me spark-shell can get the metadata as sql "desc gdelt" works fine but encounter issues when reading gdelt_gdelt_z3_v2 table. Could you help to take a look?

Thanks in advance!

Here is the spark shell commands and error message

======================== Screen capture:

scala> dataFrame.createOrReplaceTempView("gdelt") 2018-10-26 14:28:19 WARN Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.

scala> val sql = " desc gdelt " sql: String = " desc gdelt "

scala> val result = sparkSession.sql(sql) result: org.apache.spark.sql.DataFrame = [col_name: string, data_type: string ... 1 more field]

scala> result.show(50, false) +---------------------+---------+-------+ |col_name |data_type|comment| +---------------------+---------+-------+ |fid |string |null | |GLOBALEVENTID |string |null | |SQLDATE |int |null | |MonthYear |int |null | |Year |int |null | |FractionDate |double |null | |Actor1Code |string |null | |Actor1Name |string |null | |Actor1CountryCode |string |null | ... |Actor2Geo_ADM2Code |string |null | |Actor2Geo_Lat |double |null | +---------------------+---------+-------+ only showing top 50 rows

scala> val sql = "select from gdelt limit 100000" sql: String = select from gdelt limit 100000

scala> val result = sparkSession.sql(sql) result: org.apache.spark.sql.DataFrame = [fid: string, GLOBALEVENTID: string ... 62 more fields]

scala> result.show org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange SinglePartition +- (1) LocalLimit 100000 +- (1) Scan GeoMesaRelation(org.apache.spark.sql.SQLContext@26fda5ee,SimpleFeatureTypeImpl gdelt identified extends Feature(GLOBALEVENTID:GLOBALEVENTID,SQLDATE:SQLDATE,MonthYear:MonthYear,Year:Year,FractionDate:FractionDate,Actor1Code:Actor1Code,Actor1Name:Actor1Name,Actor1CountryCode:Actor1CountryCode,Actor1KnownGroupCode:Actor1KnownGroupCode,Actor1EthnicCode:Actor1EthnicCode,Actor1Religion1Code:Actor1Religion1Code,Actor1Religion2Code:Actor1Religion2Code,Actor1Type1Code:Actor1Type1Code,Actor1Type2Code:Actor1Type2Code,Actor1Type3Code:Actor1Type3Code,Actor2Code:Actor2Code,Actor2Name:Actor2Name,Actor2CountryCode:Actor2CountryCode,Actor2KnownGroupCode:Actor2KnownGroupCode,Actor2EthnicCode:Actor2EthnicCode,Actor2Religion1Code:Actor2Religion1Code,Actor2Religion2Code:Actor2Religion2Code,Actor2Type1Code:Actor2Type1Code,Actor2Type2Code:Actor2Type2Code,Actor2Type3Code:Actor2Type3Code,IsRootEvent:IsRootEvent,EventCode:EventCode,EventBaseCode:EventBaseCode,EventRootCode:EventRootCode,QuadClass:QuadClass,GoldsteinScale:GoldsteinScale,NumMentions:NumMentions,NumSources:NumSources,NumArticles:NumArticles,AvgTone:AvgTone,Actor1Geo_Type:Actor1Geo_Type,Actor1Geo_FullName:Actor1Geo_FullName,Actor1Geo_CountryCode:Actor1Geo_CountryCode,Actor1Geo_ADM1Code:Actor1Geo_ADM1Code,Actor1Geo_ADM2Code:Actor1Geo_ADM2Code,Actor1Geo_Lat:Actor1Geo_Lat,Actor1Geo_Long:Actor1Geo_Long,Actor1Geo_FeatureID:Actor1Geo_FeatureID,Actor2Geo_Type:Actor2Geo_Type,Actor2Geo_FullName:Actor2Geo_FullName,Actor2Geo_CountryCode:Actor2Geo_CountryCode,Actor2Geo_ADM1Code:Actor2Geo_ADM1Code,Actor2Geo_ADM2Code:Actor2Geo_ADM2Code,Actor2Geo_Lat:Actor2Geo_Lat,Actor2Geo_Long:Actor2Geo_Long,Actor2Geo_FeatureID:Actor2Geo_FeatureID,ActionGeo_Type:ActionGeo_Type,ActionGeo_FullName:ActionGeo_FullName,ActionGeo_CountryCode:ActionGeo_CountryCode,ActionGeo_ADM1Code:ActionGeo_ADM1Code,ActionGeo_ADM2Code:ActionGeo_ADM2Code,ActionGeo_Lat:ActionGeo_Lat,ActionGeo_Long:ActionGeo_Long,ActionGeo_FeatureID:ActionGeo_FeatureID,DATEADDED:DATEADDED,SOURCEURL:SOURCEURL,dtg:dtg,geom:geom),StructType(StructField(fid,StringType,false), StructField(GLOBALEVENTID,StringType,true), StructField(SQLDATE,IntegerType,true), StructField(MonthYear,IntegerType,true), StructField(Year,IntegerType,true), StructField(FractionDate,DoubleType,true), StructField(Actor1Code,StringType,true), StructField(Actor1Name,StringType,true), StructField(Actor1CountryCode,StringType,true), StructField(Actor1KnownGroupCode,StringType,true), StructField(Actor1EthnicCode,StringType,true), StructField(Actor1Religion1Code,StringType,true), StructField(Actor1Religion2Code,StringType,true), StructField(Actor1Type1Code,StringType,true), StructField(Actor1Type2Code,StringType,true), StructField(Actor1Type3Code,StringType,true), StructField(Actor2Code,StringType,true), StructField(Actor2Name,StringType,true), StructField(Actor2CountryCode,StringType,true), StructField(Actor2KnownGroupCode,StringType,true), StructField(Actor2EthnicCode,StringType,true), StructField(Actor2Religion1Code,StringType,true), StructField(Actor2Religion2Code,StringType,true), StructField(Actor2Type1Code,StringType,true), StructField(Actor2Type2Code,StringType,true), StructField(Actor2Type3Code,StringType,true), StructField(IsRootEvent,IntegerType,true), StructField(EventCode,StringType,true), StructField(EventBaseCode,StringType,true), StructField(EventRootCode,StringType,true), StructField(QuadClass,IntegerType,true), StructField(GoldsteinScale,DoubleType,true), StructField(NumMentions,IntegerType,true), StructField(NumSources,IntegerType,true), StructField(NumArticles,IntegerType,true), StructField(AvgTone,DoubleType,true), StructField(Actor1Geo_Type,IntegerType,true), StructField(Actor1Geo_FullName,StringType,true), StructField(Actor1Geo_CountryCode,StringType,true), StructField(Actor1Geo_ADM1Code,StringType,true), StructField(Actor1Geo_ADM2Code,StringType,true), StructField(Actor1Geo_Lat,DoubleType,true), StructField(Actor1Geo_Long,DoubleType,true), StructField(Actor1Geo_FeatureID,StringType,true), StructField(Actor2Geo_Type,IntegerType,true), StructField(Actor2Geo_FullName,StringType,true), StructField(Actor2Geo_CountryCode,StringType,true), StructField(Actor2Geo_ADM1Code,StringType,true), StructField(Actor2Geo_ADM2Code,StringType,true), StructField(Actor2Geo_Lat,DoubleType,true), StructField(Actor2Geo_Long,DoubleType,true), StructField(Actor2Geo_FeatureID,StringType,true), StructField(ActionGeo_Type,IntegerType,true), StructField(ActionGeo_FullName,StringType,true), StructField(ActionGeo_CountryCode,StringType,true), StructField(ActionGeo_ADM1Code,StringType,true), StructField(ActionGeo_ADM2Code,StringType,true), StructField(ActionGeo_Lat,DoubleType,true), StructField(ActionGeo_Long,DoubleType,true), StructField(ActionGeo_FeatureID,StringType,true), StructField(DATEADDED,StringType,true), StructField(SOURCEURL,StringType,true), StructField(dtg,TimestampType,true), StructField(geom,org.apache.spark.sql.jts.PointUDT@4cd6143c,true)),Map(geomesa.feature -> gdelt, hbase.catalog -> gdelt, hbase.config.paths -> /home/qubin.qb/rpm/hbase-1.4.8/conf/hbase-site.xml),Filter.INCLUDE,None,null,null,null,null) [fid#6,GLOBALEVENTID#7,SQLDATE#8,MonthYear#9,Year#10,FractionDate#11,Actor1Code#12,Actor1Name#13,Actor1CountryCode#14,Actor1KnownGroupCode#15,Actor1EthnicCode#16,Actor1Religion1Code#17,Actor1Religion2Code#18,Actor1Type1Code#19,Actor1Type2Code#20,Actor1Type3Code#21,Actor2Code#22,Actor2Name#23,Actor2CountryCode#24,Actor2KnownGroupCode#25,Actor2EthnicCode#26,Actor2Religion1Code#27,Actor2Religion2Code#28,Actor2Type1Code#29,... 40 more fields] PushedFilters: [], ReadSchema: struct<fid:string,GLOBALEVENTID:string,SQLDATE:int,MonthYear:int,Year:int,FractionDate:double...

at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371) at org.apache.spark.sql.execution.BaseLimitExec$class.inputRDDs(limit.scala:62) at org.apache.spark.sql.execution.GlobalLimitExec.inputRDDs(limit.scala:107) at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489) at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258) at org.apache.spark.sql.Dataset.head(Dataset.scala:2489) at org.apache.spark.sql.Dataset.take(Dataset.scala:2703) at org.apache.spark.sql.Dataset.showString(Dataset.scala:254) at org.apache.spark.sql.Dataset.show(Dataset.scala:723) at org.apache.spark.sql.Dataset.show(Dataset.scala:682) at org.apache.spark.sql.Dataset.show(Dataset.scala:691) ... 50 elided Caused by: java.io.IOException: Expecting at least one region for table : gdelt_gdelt_z3_v2 at org.apache.hadoop.hbase.mapreduce.MultiTableInputFormatBase.getSplits(MultiTableInputFormatBase.java:197) at org.locationtech.geomesa.hbase.jobs.GeoMesaHBaseInputFormat.getSplits(GeoMesaHBaseInputFormat.scala:51) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:127) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.locationtech.geomesa.spark.SpatialRDD.getPartitions(GeoMesaSpark.scala:69) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.ShuffleDependency.(Dependency.scala:91) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:321) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 83 more

spark commands list:

import org.locationtech.geomesa.spark import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources

val dsParams = Map( "hbase.zookeepers" -> "x.x.x.x", "hbase.catalog" -> "gdelt")

// Create SparkSession val sparkSession = SparkSession.builder().appName("testSpark").config("spark.sql.crossJoin.enabled", "true").enableHiveSupport().master("spark://x.x.x.x:7077").getOrCreate()

// Create DataFrame using the "geomesa" format val dataFrame = sparkSession.read.format("geomesa").options(dsParams).option("geomesa.feature", "gdelt").load() dataFrame.createOrReplaceTempView("gdelt") val sql = " desc gdelt " val result = sparkSession.sql(sql) //result.show(50, false)

val sql = "select * from gdelt limit 100000" val result = sparkSession.sql(sql) result.show

BTW: how can I register geomesa-user mail list? The official website reports I need to send mail to geomesa-users-join@locationtech.org manually, but I still get the right to send mail to mail list after sending request to geomesa-users-join@locationtech.org .