azavea / osmesa

OSMesa is an OpenStreetMap processing stack based on GeoTrellis and Apache Spark
Apache License 2.0
80 stars 26 forks source link

Augmented Diff Generation #52

Open mojodna opened 6 years ago

mojodna commented 6 years ago

Since #25 covers a number of different, intertwined pieces, I wanted to break out the augmented diff generation component.

Overview

Augmented diffs are intended to provide both elements that were actually modified during a time period (the timestamp attribute within the <new> element should be within the requested window (see Caveats below) as well as elements which refer to them. In the case of modifications (indirect or direct) to ways or relations, member (way nds or relation members) element metadata (coordinates, authorship, etc.) is inlined to avoid the need for additional lookups.

Output Format(s)

Row-based

The simplest format that likely meets our needs is row-based, such that it can be combined with the intermediate format containing reconstructed way geometries.

Fields needed are:

Overpass Augmented OsmChange

For compatibility with Overpass-generated augmented diffs (which extend the OsmChange format, as used by minutely diffs, swapping the root <osmChange> for <osm>), OSMesa should match that format.

Overpass augmented diff sequences can be converted to Unix timestamps using <sequence> * 60 + 1347432900 (28019862018-01-10T02:41:00.000Z).

Example

Simplified version of sequence 2801986:

<?xml version="1.0" encoding="UTF-8"?>
<osm version="0.6" generator="Overpass API 0.7.54.13 ff15392f">
<note>The data included in this document is from www.openstreetmap.org. The data is made available under ODbL.</note>
<meta osm_base="2018-03-19T22:34:02Z"/>

<action type="modify">
<old>
  <node id="182611666" lat="38.5466070" lon="-75.7592310" version="2" timestamp="2009-09-29T12:01:08Z" changeset="2675490" uid="147510" user="woodpeck_fixbot"/>
</old>
<new>
  <node id="182611666" lat="38.5465913" lon="-75.7593007" version="3" timestamp="2018-01-10T02:41:50Z" changeset="55309654" uid="1231505" user="fortchagos"/>
</new>
</action>
<!-- ... -->
</action>
<action type="delete">
<old>
  <node id="3357186352" lat="-7.2006844" lon="113.2426484" version="1" timestamp="2015-02-18T03:36:36Z" changeset="28924003" uid="617220" user="raniedwianugrah"/>
</old>
<new>
  <node id="3357186352" visible="false" version="2" timestamp="2018-01-10T02:41:43Z" changeset="55309655" uid="2294556" user="anisa berliana"/>
</new>
</action>
<!-- ... -->
<action type="create">
  <node id="5330324023" lat="-27.4891386" lon="-58.7847889" version="1" timestamp="2018-01-10T02:41:36Z" changeset="55309653" uid="7435203" user="Mirkox30">
    <tag k="amenity" v="pharmacy"/>
    <tag k="name" v="Farmacia"/>
  </node>
</action>
<!-- ... -->
<action type="modify">
<old>
  <way id="17632925" version="2" timestamp="2014-11-30T18:13:33Z" changeset="27137865" uid="105946" user="ElliottPlack">
    <bounds minlat="38.5463900" minlon="-75.7608930" maxlat="38.5476470" maxlon="-75.7592310"/>
    <nd ref="182611666" lat="38.5466070" lon="-75.7592310"/>
    <nd ref="182611669" lat="38.5465580" lon="-75.7593610"/>
    <nd ref="182611672" lat="38.5465010" lon="-75.7594450"/>
    <nd ref="182611676" lat="38.5464360" lon="-75.7595060"/>
    <nd ref="182611679" lat="38.5463980" lon="-75.7595820"/>
    <nd ref="182611683" lat="38.5463900" lon="-75.7596350"/>
    <nd ref="182611687" lat="38.5464090" lon="-75.7597800"/>
    <nd ref="182611690" lat="38.5465390" lon="-75.7599710"/>
    <nd ref="182611694" lat="38.5468330" lon="-75.7602460"/>
    <nd ref="182611697" lat="38.5470580" lon="-75.7604820"/>
    <nd ref="182611699" lat="38.5471190" lon="-75.7605200"/>
    <nd ref="182611701" lat="38.5472940" lon="-75.7605660"/>
    <nd ref="182611702" lat="38.5475500" lon="-75.7606650"/>
    <nd ref="182611704" lat="38.5476070" lon="-75.7607260"/>
    <nd ref="182611707" lat="38.5476470" lon="-75.7608930"/>
    <tag k="tiger:reviewed" v="no"/>
    <tag k="highway" v="service"/>
    <tag k="service" v="driveway"/>
    <tag k="tiger:cfcc" v="A41"/>
    <tag k="tiger:county" v="Dorchester, MD"/>
  </way>
</old>
<new>
  <way id="17632925" version="3" timestamp="2018-01-10T02:41:51Z" changeset="55309654" uid="1231505" user="fortchagos">
    <bounds minlat="38.5463900" minlon="-75.7608930" maxlat="38.5476470" maxlon="-75.7593007"/>
    <nd ref="182611666" lat="38.5465913" lon="-75.7593007"/>
    <nd ref="182611669" lat="38.5465580" lon="-75.7593610"/>
    <nd ref="182611672" lat="38.5465010" lon="-75.7594450"/>
    <nd ref="182611676" lat="38.5464360" lon="-75.7595060"/>
    <nd ref="182611679" lat="38.5463980" lon="-75.7595820"/>
    <nd ref="182611683" lat="38.5463900" lon="-75.7596350"/>
    <nd ref="182611687" lat="38.5464090" lon="-75.7597800"/>
    <nd ref="182611690" lat="38.5465390" lon="-75.7599710"/>
    <nd ref="182611694" lat="38.5468330" lon="-75.7602460"/>
    <nd ref="182611697" lat="38.5470580" lon="-75.7604820"/>
    <nd ref="182611699" lat="38.5471190" lon="-75.7605200"/>
    <nd ref="182611701" lat="38.5472940" lon="-75.7605660"/>
    <nd ref="182611702" lat="38.5475500" lon="-75.7606650"/>
    <nd ref="182611704" lat="38.5476070" lon="-75.7607260"/>
    <nd ref="182611707" lat="38.5476470" lon="-75.7608930"/>
    <tag k="highway" v="service"/>
    <tag k="service" v="driveway"/>
    <tag k="tiger:cfcc" v="A41"/>
    <tag k="tiger:county" v="Dorchester, MD"/>
    <tag k="tiger:reviewed" v="yes"/>
  </way>
</new>
</action>
<!-- ... -->
<action type="delete">
<old>
  <way id="328897117" version="1" timestamp="2015-02-18T03:36:37Z" changeset="28924003" uid="617220" user="raniedwianugrah">
    <bounds minlat="-7.2014516" minlon="113.2419865" maxlat="-7.2012318" maxlon="113.2423079"/>
    <nd ref="3357186374" lat="-7.2012318" lon="113.2420205"/>
    <nd ref="3357186375" lat="-7.2012922" lon="113.2423079"/>
    <nd ref="3357186382" lat="-7.2014516" lon="113.2422739"/>
    <nd ref="3357186381" lat="-7.2013912" lon="113.2419865"/>
    <nd ref="3357186374" lat="-7.2012318" lon="113.2420205"/>
    <tag k="M:hazard_prone" v="yes"/>
    <tag k="access:roof" v="no"/>
    <tag k="addr:full" v="Jalan Samsul arifin"/>
    <tag k="amenity" v="school"/>
    <tag k="building" v="yes"/>
    <tag k="building:condition" v="good"/>
    <tag k="building:levels" v="1"/>
    <tag k="building:roof" v="tile"/>
    <tag k="building:structure" v="reinforced_masonry"/>
    <tag k="building:walls" v="bata (brick)1"/>
    <tag k="capacity:persons" v="500"/>
    <tag k="name" v="SMKN2 SAMPANG"/>
    <tag k="operator:type" v="goverment"/>
    <tag k="school:type_idn" v="SMK"/>
    <tag k="water_supply" v="pipeline"/>
  </way>
</old>
<new>
  <way id="328897117" visible="false" version="2" timestamp="2018-01-10T02:41:43Z" changeset="55309655" uid="2294556" user="anisa berliana"/>
</new>
</action>
<!-- ... -->
<action type="create">
  <way id="552017489" version="1" timestamp="2018-01-10T02:41:03Z" changeset="55309649" uid="83188" user="dannmer">
    <bounds minlat="39.9990851" minlon="-75.3383729" maxlat="39.9997818" maxlon="-75.3372020"/>
    <nd ref="5329383215" lat="39.9997818" lon="-75.3372020"/>
    <nd ref="5330326034" lat="39.9997692" lon="-75.3374422"/>
    <nd ref="5330326035" lat="39.9997486" lon="-75.3376219"/>
    <nd ref="5330326036" lat="39.9997219" lon="-75.3378284"/>
    <nd ref="5330326037" lat="39.9996870" lon="-75.3380510"/>
    <nd ref="5330326038" lat="39.9996438" lon="-75.3382254"/>
    <nd ref="5330326039" lat="39.9995925" lon="-75.3383112"/>
    <nd ref="5330326040" lat="39.9995185" lon="-75.3383380"/>
    <nd ref="5330326041" lat="39.9994034" lon="-75.3383729"/>
    <nd ref="5330326042" lat="39.9992863" lon="-75.3383568"/>
    <nd ref="5330326043" lat="39.9991692" lon="-75.3383434"/>
    <nd ref="5330326044" lat="39.9990851" lon="-75.3383380"/>
    <tag k="highway" v="path"/>
    <tag k="surface" v="unpaved"/>
  </way>
</action>
<!-- ... -->
<action type="create">
  <relation id="7889640" version="1" timestamp="2018-01-10T02:41:49Z" changeset="55309654" uid="1231505" user="fortchagos">
    <bounds minlat="38.5457446" minlon="-75.7590167" maxlat="38.5461621" maxlon="-75.7586478"/>
    <member type="way" ref="521907495" role="inner">
      <nd lat="38.5460902" lon="-75.7589233"/>
      <nd lat="38.5458915" lon="-75.7588504"/>
      <nd lat="38.5458696" lon="-75.7589480"/>
      <nd lat="38.5458491" lon="-75.7589405"/>
      <nd lat="38.5458519" lon="-75.7589281"/>
      <nd lat="38.5457889" lon="-75.7589050"/>
      <nd lat="38.5458391" lon="-75.7586817"/>
      <nd lat="38.5458603" lon="-75.7586895"/>
      <nd lat="38.5458569" lon="-75.7587045"/>
      <nd lat="38.5461179" lon="-75.7588003"/>
      <nd lat="38.5460902" lon="-75.7589233"/>
    </member>
    <member type="way" ref="552017524" role="outer">
      <nd lat="38.5461621" lon="-75.7587751"/>
      <nd lat="38.5458156" lon="-75.7586478"/>
      <nd lat="38.5458047" lon="-75.7586963"/>
      <nd lat="38.5458008" lon="-75.7586949"/>
      <nd lat="38.5457918" lon="-75.7587349"/>
      <nd lat="38.5457743" lon="-75.7587285"/>
      <nd lat="38.5457505" lon="-75.7588347"/>
      <nd lat="38.5457674" lon="-75.7588410"/>
      <nd lat="38.5457591" lon="-75.7588780"/>
      <nd lat="38.5457636" lon="-75.7588797"/>
      <nd lat="38.5457446" lon="-75.7589640"/>
      <nd lat="38.5458880" lon="-75.7590167"/>
      <nd lat="38.5459136" lon="-75.7589026"/>
      <nd lat="38.5461168" lon="-75.7589772"/>
      <nd lat="38.5461621" lon="-75.7587751"/>
    </member>
    <tag k="highway" v="pedestrian"/>
    <tag k="surface" v="concrete"/>
    <tag k="type" v="multipolygon"/>
  </relation>
</action>
</osm>

Element Ordering

Elements should be ordered such that any references to other elements in the same diff appear after the element itself. In practice, this means 1) nodes, 2) ways, 3) relations referencing only nodes + ways, 4) relations referencing relations that previously appeared, 5) ...

Overpass appears to further order by 1) modify, 2) delete, 3) create (although this isn't strictly followed--see the full augmented diff--timestamps may be taken into account.

Caveats

Overpass aims to answer the question "what changed in the requested time period?" When provided with sequences, they're converted into timestamps (see above) and the minute following is used as the time period being queried.

Due to the way that OSM minutely diffs are created (more context), they won't contain all changes made during the minute they "represent" if database transactions are open. Rather than retroactively updating elements that changed during a time period, the question we should be asking is "what changes did we find out during the requested time period?" (breaking the assumption of a match between <new> timestamp values and the time associated with a sequence).

Another way to think about this is "what elements were affected by a given minutely diff" and genuinely just augmenting that diff (populating referenced elements and adding referred-to ones) rather than trying to be clever with time ranges.

In the existing Overpass implementation, changeset values for referring elements match the changeset in which the referring element was modified. This introduces subtle bugs, e.g. mapbox/osm-adiff-parser#2, when attempting to aggregate changes by changeset (which is another process that requires updating past knowledge because changesets are replicated separately and may remain open (collecting additional changes) for 24 hours after first appearing in the minutely diff replication stream.

GeoJSON

Bonus points: a GeoJSON FeatureCollection containing 1 or 2 Features (re-assembled geometries) representing the old and new versions of modified elements (indirect or direct).

JSON

Newline-delimited JSON representation of OsmChange (action may be able to be omitted, as it can be inferred by the presence of <old> or version / visible attributes, at least for non-referring elements), for easy consumption?

Related Work

@geohacker was previously producing JSON augmented diffs aggregated by changeset. The post goes into a bit more detail about edge cases that make augmented diff aggregation difficult.

osm-replication-streams contains a parser for Overpass augmented diffs that outputs GeoJSON as described above.

@kamicut used osm-replication-streams to build a Lambda function that consumes minutely diffs and outputs them as individual messages on a Kinesis stream.

mojodna commented 6 years ago

To facilitate these, we need the ability to make lookups against the following inverted indices:

(inclusion of role is speculative)

mojodna commented 6 years ago

It struck me that while generating XML is beneficial for consumers that already understand OsmChange, a simpler row-based format would suffice for our needs. I've updated the description accordingly.

mojodna commented 6 years ago

I also attempted to clarify my proposal re: date ranges in the Caveats section by simplifying "augmented diff" to actually mean "augmentation of a specific diff" rather than a window of time.

jamesmcclain commented 6 years ago

Much appreciated.

jamesmcclain commented 6 years ago

Work in progress can be found here.

Summary of ORC-backed Dataframe Approach

This approach uses an evolving dataframe backed by ORC files rather than a traditional database to hold to additional index information ("reverse lookup" information) needed to support creation of augmented diffs.

The present fragments that I have include four tables:

  1. osm which contains the bulk osm data which is initialized from an OSM ORC file and is appended-to over time in response to change information.
  2. node_to_ways which contains pointers from each <node id, timestamp> pair to the ways that it participates in
  3. way_to_relations which contains pointers from each <way id, timestamp> pair to the relations that it participates in
  4. relation_to_relations which contains pointers from each <relation id, timestamp> pair to the relations that it participates in

The schemas are as follows:

scala> val osm = spark.table("osm")
osm: org.apache.spark.sql.DataFrame = [id: bigint, tags: map<string,string> ... 11 more fields]

scala> osm.printSchema
root
 |-- id: long (nullable = true)
 |-- tags: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- lat: decimal(9,7) (nullable = true)
 |-- lon: decimal(10,7) (nullable = true)
 |-- nds: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ref: long (nullable = true)
 |-- members: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- ref: long (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- changeset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- uid: long (nullable = true)
 |-- user: string (nullable = true)
 |-- version: long (nullable = true)
 |-- visible: boolean (nullable = true)
 |-- type: string (nullable = true)
scala> val nodeToWays = spark.table("node_to_ways")
nodeToWays: org.apache.spark.sql.DataFrame = [id: bigint, valid_from: bigint ... 1 more field]

scala> nodeToWays.printSchema
root
 |-- id: long (nullable = true)
 |-- valid_from: long (nullable = true)
 |-- way_id: long (nullable = true)
scala> val wayToRelations = spark.table("way_to_relations")
wayToRelations: org.apache.spark.sql.DataFrame = [id: bigint, valid_from: bigint ... 1 more field]

scala> wayToRelations.printSchema
root
 |-- id: long (nullable = true)
 |-- valid_from: long (nullable = true)
 |-- relation_id: long (nullable = true)
scala> val relationToRelations = spark.table("relation_to_relations")
relationToRelations: org.apache.spark.sql.DataFrame = [id: bigint, valid_from: bigint ... 1 more field]

scala> relationToRelations.printSchema
root
 |-- id: long (nullable = true)
 |-- valid_from: long (nullable = true)
 |-- relation_id: long (nullable = true)

The last three tables have been kept separate for the sake of simplicity, but it might eventually be helpful to combine them. The first table is there so that the results of the query code shown below can be elaborated into a full augmented diff.

Rump Query

As I understand the problem, the core primitive needed to construct augmented diffs is what the Overpass API refers to as the recurse operation. That operation takes an ID and returns its combinatorial closure: all of the ways that it participates in, all of the relations that it and its ways participate in, all of the relations that it, its ways, and those relations participate in, &c. An incomplete example of how that is accomplished with the indices above is below:

private def recurseNode(nodeId: Long) = {
  var keepGoing = true
  val relations = mutable.Set.empty[Long]

  val ways = nodeToWays
    .filter(col("id") === nodeId)
    .map({ r => r.getAs[Long]("way_id") })
    .collect
    .toSet

  relations ++=
  (if (ways.isEmpty)
    Set.empty[Long]
  else {
    wayToRelations
      .filter(col("id").isin(ways.toSeq:_*))
      .map({ r => r.getAs[Long]("relation_id") })
      .collect
      .toSet
  })

  while (keepGoing) {
    keepGoing = false
    val newRelations =
      if (relations.isEmpty) Set.empty[Long]
      else {
        relationToRelations
          .filter(col("id").isin(relations.toSeq:_*))
          .map({ r => r.getAs[Long]("relation_id") })
          .collect
          .toSet
      }
    if (!newRelations.subsetOf(relations)) {
      keepGoing = true
      relations ++= newRelations
    }
  }

  (ways, relations)
}

The code above is incomplete in that it only handles queries starting from a single node id, does not pay attention to timestamps (needed for queries about state before the present moment), should perhaps accept initial batches not singletons, &c, but it shows a basic outline. (In fact, the code above not only ignores timestamps, it actually assumes that no updates have been made, so it is not to be taken literally.)

I will have more to say about query efficiency below, but it can be seen that one of the basic operations that needs to be efficiently supported is search for an id in a particular set. Looking at that in isolation gives

scala> val list = List(1L, 2L, 3L)
list: List[Long] = List(1, 2, 3)

scala> wayToRelations.filter(col("id").isin(list:_*)).explain
== Physical Plan ==
*(1) Filter id#9L IN (1,2,3)
+- *(1) FileScan orc default.way_to_relations[id#9L,valid_from#10L,relation_id#11L] Batched: false, Format: ORC, Location: InMemoryFileIndex[file:/home/jmcclain/Desktop/spark-warehouse/way_to_relations], PartitionFilters: [], PushedFilters: [In(id, [1,2,3])], ReadSchema: struct<id:bigint,valid_from:bigint,relation_id:bigint>

where we see that the list (1,2,3) has been pushed-down, indicating that efficient operation is at least not impossible.

Comparison to e.g. RocksDB-based Approach

I did some experimenting with RocksDB. Unfortunately, what I found was that the size of the data on disk inflated by a factor of about 100 when I tried to store the equivalent of the OSM table above. The attempt was naive and can be criticized in various ways (I was just serializing spark sql row objects into values, I included all of the columns, &c), but even if the inflation came down by a factor of 10 it seems like it would still be too large. This implies (to me) that if the search index itself is stored in RocksDB, some other database would be needed to store the bulk data needed to produce actual augmented diffs (if this is not correct, I am happy to hear it).

That still leaves open the possibility of using RocksDB for the reverse index and storing the bulk data elsewhere, but that is a complication that we hope to avoid.

I can say more about query efficiency of the spark sql based approach versus that of RocksDB if there is interest (this comment is getting long).

Cons

  1. Inherent latency of Spark SQL queries. Hopefully this can be managed by batching work together, e.g. performing the recurse operation on a set of node ids instead of an individual ids.
  2. Fragmentation of ORC files. Depending on the rate at which the dataframes persisted to disk, we could end up with a highly fragmented database. The metastore contains information about the ranges covered by each file, so it is hoped that a great many files can be pruned from consideration before being opened. It is understood that, a very large number of very tiny files could turn the metastore into a bottleneck.

Pros

  1. Simpler, fewer moving parts.
  2. Easy to try to something else if this fails. As a proportion, the actual database-specific part of the project seems fairly small.
lossyrob commented 6 years ago

Thanks for the great writeup, @jamesmcclain.

Do we need to store a copy of the history ORC file already created, or is there a way to point to the history that is already generated and write updates to a separate bucket? Not necessarily now in v1, but something to think about as far as data storage, if we could utilize the main ORC file and push updates to it with the fragments that might be a good process for keeping the history file up to date / be the compaction process.

My opinion is that this approach could very well be good enough to handle the stream of OSM replication files, and we should try to disprove it by trying to break it in prototypes to see if it holds up. Some ways I think it could potentially break:

This is looking good, I'm hoping this approach holds up to scrutiny because it seems like the most straightforward one/quickest to prove out.

jamesmcclain commented 6 years ago

Do we need to store a copy of the history ORC file already created, or is there a way to point to the history that is already generated and write updates to a separate bucket?

Yes, I believe that that can be done. I don't have the examples to hand right now, but I have seen examples of creating tables that span multiple URIs.

All three of the questions above are indeed questions. I hope that answers will be forth-coming soon.

kamicut commented 6 years ago

Thanks for the write up @jamesmcclain, very useful and can't wait to see the results of the experiments!

It's unclear to me how this system batches up Augmented Diffs based on changesets that could potentially be still open.

I don't think this should be the job of this process. This can be a downstream process (similar to planet-stream) that takes in augmented diff updates and keeps track of the changeset replication files. I think @mojodna has improved on the planet-stream process for Missing Maps.

mojodna commented 6 years ago

I don't think this should be the job of this process. This can be a downstream process (similar to planet-stream) that takes in augmented diff updates and keeps track of the changeset replication files.

Agreed. Within Spark, a stream-stream join could handle the problem of varying windows of validity.

My approach for Missing Maps was to avoid generating complete roll-ups and update changeset stats as changes arrived that were associated with them.