bmwcarit / barefoot

Java map matching library for integrating the map into software and services with state-of-the-art online and offline map matching that can be used stand-alone and in the cloud.
Apache License 2.0
665 stars 186 forks source link

Problems about scalable offline map matching #104

Closed CallumWang closed 6 years ago

CallumWang commented 6 years ago

Hello: Recently, i am trying to move this wonderful procedure onto spark cluster. I mainly change your example code, to load roadmap from an existing file. for example, xx.bfmap. So i change this

"val matcher = sc.broadcast(new BroadcastMatcher(host, port, database, user, pass, config))"

into

"val matcher = sc.broadcast(new BroadcastMatcher(bfmappath))"

which bfmappath is the filepath of xx.bfmap. and i change the BoradcastMatcher object as follows

val map: RoadMap = Loader.roadmap(bfmapPath,true).construct

For example “root/data/xx.bfmap”, it test right on local model. which means i can get the roadmap object. however, when i use

spark-submit --master yarn-cluster --class com.bmwcarit.barefoot.taxi.PointMM test.jar

it seems it cannot find the xx.bfmap path and return the null type roadmap. I consider put the xx.bfmap onto HDFS,but how should i write the bfmappath as parameters then? i am new to spark and hadoop,and please forgive me if i don't explain it clearly. looking forward to your apply. Thanks.

CallumWang commented 6 years ago

well, in fact, i wonder

"as an alternative map data can be stored/loaded as serialized object via HDFS"

as you write in this section, how to do for this?

jongiddy commented 6 years ago

Since you have the file serialized locally, you can just hadoop fs -put it into HDFS. Add the HDFS URI as a parameter to your Spark job.

https://github.com/jongiddy/barefoot-hdfs-reader.git has a class that you can add to your Spark map-matching code to read the map file from HDFS using:

val map = RoadMap.Load(new HdfsMapReader(new URI(map_uri))).construct()

where map_uri is the HDFS URI for the serialized map.

CallumWang commented 6 years ago

Thank you for your HdfsMapReader . I manage to get the read the map file from HDFS. But i get another problem now. The output on spark job is different from the procedure i run in my intellij idea. The first picture is log on spark job. The second is log on my pc (win10, intellij idea) proonspark proonpc

as you can see, on spark job it is full of HMM break, while on my local pc it works well. Since the the number and index of road is created the same, i think it is not caused by the bfmap. So what's the problem? Could you give me some advice? Thanks a lot.

smattheis commented 6 years ago

After you've read the road data, you should construct the map with map.construct() which kind of finalizes the read step and creates the spatial index and the routing graph.

CallumWang commented 6 years ago

Thanks for your reply. In fact, I did map.construct() after read this road data. In details, the code i use for spark is as follows. the version of spark i use is 1.6.2 in the main class, i change your example code on spark like this:

val matcher = sc.broadcast(new BroadcastMatcher(bfmapPath)).value
... some code to read trace data to tracerdd
val matches = tracerdd. groupBy(x=>x._1).map(x=>{
        val trip = x._2.map({
          x => new MatcherSample(x._1,x._2,x._3,x._4)
        }).toList
        matcher.mmatch(trip,10,500)
      })

in the BroadcastMatcher , the code like this:

object BroadcastMatcher {
  private var instance = null: Matcher

  private def initialize(bfmappath:String) {
    if (instance != null) return
    this.synchronized {
      if (instance == null) { // initialize map matcher once per Executor (JVM process/cluster node)

        val map = RoadMap.Load(new HdfsMapReader(new URI(bfmappath))).construct

        val router = new Dijkstra[Road, RoadPoint]()
        val cost = new TimePriority()
        val spatial = new Geography()

        instance = new Matcher(map, router, cost, spatial)
      }
    }
  }
}

@SerialVersionUID(1L)
class BroadcastMatcher(bfmappath:String) extends Serializable {

  def mmatch(samples: List[MatcherSample]): MatcherKState = {
    mmatch(samples, 0, 0)
  }

  def mmatch(samples: List[MatcherSample], minDistance: Double, minInterval: Int): MatcherKState = {
    BroadcastMatcher.initialize(bfmappath)
    BroadcastMatcher.instance.mmatch(new ArrayList[MatcherSample](samples.asJava), minDistance, minInterval)
  }
}

As you can see, i did the map.construct(). and it works well when i run locally in my intellij idea. But when i transfer to spark (yarn cluster). It return HMM breaks. If you can think of other reasons I might be getting the HMM breaks, I'd really appreciate your thoughts. If there's further debug information I can enable to help find the problem, I'm happy to do that. Thanks a lot

smattheis commented 6 years ago

Sorry, I haven't seen that map construction is actually shown in your log. Anyways, from remote it's difficult to see what could be the problem. With regard, to "IntelliJ" vs "Spark" execution, do you use the same reading/parsing of your trace data? (Usually, it's different because in Spark you also read your data from HDFS.) Have you checked if there is some (typical) error, e.g., swapped lat/lon or cut off decimals of floating point values due to errorneous implicit number types (integer vs. float)?

CallumWang commented 6 years ago

Thank you.Finally I manage to run it on spark, in fact ,the "IntelliJ" and "Spark" execution is intrinsically the same judging from the result it generates. The only different is about the logs which,in fact,doesn't matter . Thanks again. It is really nice of you to provide this wonderful program.

biallen commented 5 years ago

谢谢。最后我设法在spark上运行它,事实上,“IntelliJ”和“Spark”执行从它产生的结果来看本质上是相同的。唯一不同的是日志,实际上并不重要。 再次感谢。提供这个精彩的节目真是太好了。

Hello: Recently, i am trying to move this wonderful procedure onto spark cluster. I mainly change your example code, to load roadmap from an existing file. for example, xx.bfmap. So i change this

"val matcher = sc.broadcast(new BroadcastMatcher(host, port, database, user, pass, config))"

into

"val matcher = sc.broadcast(new BroadcastMatcher(bfmappath))"

which bfmappath is the filepath of xx.bfmap. and i change the BoradcastMatcher object as follows

val map: RoadMap = Loader.roadmap(bfmapPath,true).construct

For example “root/data/xx.bfmap”, it test right on local model. which means i can get the roadmap object. however, when i use

spark-submit --master yarn-cluster --class com.bmwcarit.barefoot.taxi.PointMM test.jar

it seems it cannot find the xx.bfmap path and return the null type roadmap. I consider put the xx.bfmap onto HDFS,but how should i write the bfmappath as parameters then? i am new to spark and hadoop,and please forgive me if i don't explain it clearly. looking forward to your apply. Thanks.

hey can i ask you some question ? i dowload about spark some code but in idea have some error ,how to solve this question?

image thanks