azavea / osmesa

OSMesa is an OpenStreetMap processing stack based on GeoTrellis and Apache Spark
Apache License 2.0
80 stars 26 forks source link

Minutely updates, augmented diffs, and queries #25

Open geohacker opened 6 years ago

geohacker commented 6 years ago

Earlier this week, @lossyrob @moradology @kamicut and I got together in Philly to talk about taking OSMesa forward and kicked around some ideas. I just want to drop notes here, and invite comments from anyone who's interested.

Recap of what we have now

  1. Weekly updated full history ORC files hosted on AWS.
  2. Collection of scripts that runs on Apache Spark that runs periodic analysis jobs.

Where we want to get to

  1. Minutely updates.
  2. Augmented diffs for storing minutely changes.
  3. Streaming minutely augmented diffs.
  4. Infrastructure for arbitrary queries using tags and bboxes.
  5. Infrastructure for periodic analytics jobs.

To be clear, we're not proposing above as the only future of OSMesa - rather thinking about different parts we need to build for making this work be useful for the larger OSM community. We’re using the repo to anchor this discussion for now, and will eventually fork into others when needed.

ORC to augmented diffs

Nodecache for lookup

Minutely processing

Analytics/Query Data Store

Batch jobs

Query server

2018-01-29 16 02 18

mojodna commented 6 years ago

How did the RocksDB approach work? Was that a single instance w/ a local DB?

I've been looking a little bit into whether surfacing OSM data through WFS 3 makes any sense (probably not) and how well Elasticsearch would do as a data store (no idea on scaling requirements, but it seems to have decent spatial support, does document searching quickly, and includes a Spark SQL API that incorporates push-down and write support).

Publishing snapshots to S3 (or elsewhere) might be an alternative means to helping others provision similar infrastructure; not quite as ideal as the initial HBase model w/ read-only replicas though.

mojodna commented 6 years ago

Oh, and Elasticsearch percolation is a neat concept that would allow consumers to subscribe to queries of interest as new data is introduced.

geohacker commented 6 years ago

How did the RocksDB approach work? Was that a single instance w/ a local DB?

If I remember right this was a single instance with a local DB, but I'll tag @lukasmartinelli who worked on it closely.

ES certainly sounds cool to me. How does that stand with lookup times of Dynamodb?

lukasmartinelli commented 6 years ago

If I remember right this was a single instance with a local DB, but I'll tag @lukasmartinelli who worked on it closely.

Yes with the advantage that node lookup can happen in memory/ssd instead of a network round trip like with DynamoDB.

The idea was to do in RocksDB on memory+disk what osmium does with the NodeIndex in memory to be able to scale this even as OSM data grows (or in this case all versions of a node which leads to a way bigger data set).

There are some experiments previously using DynamoDB and RocksDB - but collectively we think DynamoDB might be better because it’s managed and hopefully won’t get too expensive.

Using DynamoDB for pure node lookup I am worried that this will get expensive really quickly (x$$$k).

geohacker commented 6 years ago

@kamicut @mojodna - could you post notes from our previous call? Thank you! Let's look for people who might have time to kick the elastic search idea a bit.

geohacker commented 6 years ago

After a chat with @dereklieu and @kamicut, we think the next short-term goal here is to seed all nodes into an AWS managed elasticsearch instance using an osmesa script.

If we can build a proof of concept, test for lookup speed, and room for scale then I think we can start sketching out further steps!

lossyrob commented 6 years ago

One idea I just had that is relevant to this discussion: we had talked about using the Augmented Diff stream as a way to update a distributed database that contains all history, such as HBase on S3, so that analytics can be performed on the most up-to-date version of OSM. This seems like a heavy lift and a lot of maintenance burden for batch and interactive processing e.g. through a Zeppelin or Jupyter notebook. What we have been doing with OSMesa-flavored work is to read from the public ORC files, which are about a week old. This is great because you are simply reading a publicly available file from S3, and so there's no DB or permissions to worry about.

For the batch/analytics case, instead of maintaining a fully up to date history distributed database, what we can instead do is have an update mechanism that reads in AugDiff files off of S3 to bring the Spark DataFrame up to date. So you'd read from the ORC file, and then have an easy call to update to latest, which would go fetch the relevant diffs and apply them in order to have your DF be brought to the most recent update. This adds a line of code to analysis/batch jobs/notebooks (maybe 2 with the import statement), but allows the goal of doing large analytic or batch jobs on the most up to date data with a much more simplified backend.

The work required here would be to ensure the AugDiff process outputs files that can be easily consumed by Spark into DataFrames, and writing the logic to read in the appropriate diff files, and use them to update a history/snapshot DataFrame read in from the ORC files.

kamicut commented 6 years ago

Augdiff

@lossyrob That makes sense. In that case we can work backwards from the dataframe to figure out what the AugDiff format is, and then write the node cache code accordingly.

One thing that I was thinking about is, did we rule out running the OSM API with its own replication process and write a postgres trigger that writes from the db to AugDiff format? Does it not scale @geohacker @lossyrob?

OSMesa meeting notes

I did find some notes that Seth posted in the Google Hangout a month ago:

traits

Overpass Feature Parity

Past

Options

Problems

Today - node caching / lookups

https://blog.mapillary.com/tech/2017/01/12/scaling-down-an-elasticsearch-cluster.html

select sum(cardinality(nds) + cardinality(members)) from planet 5,125,065,779 full history: 15,465,761,461 way geometries: s3://osm-pds-tmp/way-geoms-20180129/ point geometries: s3://osm-pds-tmp/point-geoms-20180129/

geohacker commented 6 years ago

This seems like a heavy lift and a lot of maintenance burden for batch and interactive processing e.g. through a Zeppelin or Jupyter notebook.

@lossyrob Agree that this is going to be a heavy lift.

For the batch/analytics case, instead of maintaining a fully up to date history distributed database, what we can instead do is have an update mechanism that reads in AugDiff files off of S3 to bring the Spark DataFrame up to date. So you'd read from the ORC file, and then have an easy call to update to latest, which would go fetch the relevant diffs and apply them in order to have your DF be brought to the most recent update.

@lossyrob perhaps I'm not reading this right. Wouldn't this mean that a DF would be outdated in the next minute? How much do you think the overhead of fetching and updating ORC would be? I can imagine this happen to practically all analytical queries. I think we will also need a way to keep track of what adiffs map to this area/feature?

OSM API with its own replication process and write a postgres trigger

@kamicut I don't think we were considering this seriously, and I'm not very confident.

lossyrob commented 6 years ago

Wouldn't this mean that a DF would be outdated in the next minute?

In an analytics job, this is fine. If we had a totally up-to-date ORC file, and you read it in as part of a Spark job, the expectation wouldn't be that the DF changes underneath in the process of analysis, but that you'd be working with the most up-to-date information from the start of the job.

Updating the ORC is something that @mojodna looked into but found to be untenable.

I don't want to get "analytics queries" (e.g. a query server like something from overpass) and analytic batch jobs (e.g. generating user and campaign statistics for the entire world, for every user, for every campaign over history) to get confused. For the query server, we would need something that would be up to date in order to respond quickly to queries. For large batch jobs that take a while to run anyway, updating the DF from the ORC snapshot up to recency will add some processing time, but I don't think enough to become an unattractive option.

mojodna commented 6 years ago

Updating the ORC is something that @mojodna looked into but found to be untenable.

For 2 reasons: small files (mitigated if we batch diffs up on a daily basis) and the need to periodically compact. ORC includes methods (but no CLI) to merge files at the stripe level (effectively concatenation, so inefficiency increases over time as non-optimal stripes are added), which could facilitate periodic compaction processes. (But atomicity! Maybe we don't care...)

+1 on creating a tool that can bring an ORC file up to date using minutely diffs. This should be relatively straightforward (and moderately parallelizable since order doesn't matter)--we don't need to reconstruct augmented diffs or anything, just append all of the edits in history form and use the resulting file as input.

kamicut commented 6 years ago

@mojodna @lossyrob so you're saying

  1. Read in ORC file to a dataframe
  2. Bring in minutely diffs into the dataframe to bring it to current time
  3. Do batch analytic jobs

I'm wondering if there's a way to 'pickle' (serialize) the dataframe after (2)? That way we can just boot another batch process by reading the pickled DF and running (2) again.

mojodna commented 6 years ago

Yup, and yup (df.format("orc").save(<path>) will write the dataframe out (to S3))!

lossyrob commented 6 years ago

Good point that we wouldn't need augmented diffs to bring an ORC file up to date in a DataFrame - however order would matter, because we'd have to update visibility flags, yeah? So it wouldn't just be an append.

mojodna commented 6 years ago

Nah, I'm pretty sure that visible is specific to an individual version (https://wiki.openstreetmap.org/wiki/OsmChange includes a visible attribute, cf. https://www.openstreetmap.org/api/0.6/changeset/46685591/download, https://www.openstreetmap.org/api/0.6/changeset/45652310/download), so as long as we're windowing correctly and sorting things there to generate validity time ranges, we should be good.

See https://www.openstreetmap.org/node/1/history for an interesting node history (deleted, undeleted, etc.)

jamesmcclain commented 6 years ago

Hello, I am currently working on this task, but am late to the conversation and am perhaps missing some context.

I read the thread above, I would like to ask for comments on the following.

The basic idea that I have been working on is to read the bulk data and produce an index that maps more primitive objects to a sets of aggregations that they participate in (e.g. single nodes to sets of ways, single ways to sets of relations [and I think single relations to sets of relations]). I think that that is consistent with previous discussion.

If the reverse index is keyed not just by id but id and timestamp, and if queries of the type "largest item < x" are supported by whatever mechanism is used to store it, then I think that would also be sufficient to answer augmented diff queries for any moment in time between the last moment covered by the initial ORC file and up to the moment of last update received. Geometric information (e.g. quad-tree node numbers) could be included to support various types of spatial queries.

The last sentence above leads me to the last comment that I want to throw out for discussion: it could be possible to support all of the desired operations without using a separate ("real") database (e.g. dynamodb rocksdb, hbase) but instead just use ORC files. The point above about fragmentation and compaction is well taken, and it is also understood that the ORC format was not designed with random access efficiency as its first priority, but nonetheless it might be worth considering just from a simplicity standpoint.

The main outstanding question that I have (aside from compaction) is whether the "largest item < x" types queries are well supported by dataframes on top of ORC files. I have not confirmed that to be so, and if it isn't then ORC files are obviously not appropriate. In any case, I am using ORC files for my prototyping activity.

mojodna commented 6 years ago

queries of the type "largest item < x"

Can you elaborate on this? What would a specific question be?

Geometric information (e.g. quad-tree node numbers) could be included to support various types of spatial queries.

I've been thinking about this too (within the context of ORC files, predicate pushdown, and ORC's ability to skip stripes based on internal indexes). I'd been envisioning (z, type, id, minX, minY, maxX, maxY) (where z = zcurve(zcurve(minX, minY), zcurve(maxX, maxY)) (dumb?) for locality). Queries could then be WHERE x >= minX AND x <= maxX AND y >= minY AND y <= maxY AND ST_Intersects(...) without needing to trigger a full table scan (since ORC would transparently skip stripes outside the min/max range and probable geometries would be local to one another thanks to the Z curve).

The last sentence above leads me to the last comment that I want to throw out for discussion: it could be possible to support all of the desired operations without using a separate ("real") database (e.g. dynamodb rocksdb, hbase) but instead just use ORC files. The point above about fragmentation and compaction is well taken, and it is also understood that the ORC format was not designed with random access efficiency as its first priority, but nonetheless it might be worth considering just from a simplicity standpoint.

:+1:, at least for prototyping (and perhaps beyond). I suspect we can deal with some of the random access inefficiency by batching lookups to coalesce them across scans.

ProcessOSM creates a lookup table from node → way (inverse of way nds) that could be written to an ORC file: https://github.com/azavea/osmesa/blob/5d9a5b63b0219ef15037e7f979f59e5b6e5cdc73/src/ingest/src/main/scala/osmesa/ingest/ProcessOSM.scala#L175-L176

Overall, I think we need:

(I'm proposing updated + validUntil to provide a window to compare the timestamp of the item being looked up and determine which target version to use. I'm proposing role because my intuition suggests that it's important; it may well not be.)

lossyrob commented 6 years ago

I think this approach may not work for some of the streaming applications of this that we're looking for. Perhaps elaborating on how this approach would work on an example would help me understand how it would work...one of the applications we want to apply the augmented diffs to, is to stream in minutely changes and apply them to a set of vector tiles. Having a spark cluster up and running, taking in minutely updates, and either updating an ORC file from it's daily point in the snapshot, or updating an in memory DataFrame, seems like a heavy task to do in the streaming context. Whereas, if we have a quick-access node cache that has reverse index tables Seth mentioned, we could quickly take a replication file, rebuild the geometries, construct the Augmented Diff containing the changes, and send that off to a stream for consumption. Some other process would pick that up, read in the appropriate vector tiles, update them and replace them in the set. How would the ORC-based creation happen for this?

jamesmcclain commented 6 years ago

queries of the type "largest item < x"

Can you elaborate on this? What would a specific question be?

Sorry, about not being clear. I am referring to queries of the form "largest number smaller than 50". In this particular case, I am suggesting that if (for example) there is a list of ways that is jointly ordered by id and timestamp, then a query for all ways which meet a particular node at a particular time can be answered by the most recent ("largest") instance of a node that occurs before the given time. Essentially I would like to treat id+timestamp as the node id, not just id.

jamesmcclain commented 6 years ago

Overall, I think we need:

Much appreciated, what I suggested above is certainly not exclusive of any of these.

jamesmcclain commented 6 years ago

Having a spark cluster up and running, taking in minutely updates, and either updating an ORC file from it's daily point in the snapshot, or updating an in memory DataFrame, seems like a heavy task to do in the streaming context.

What I had in mind was to update a dataframe, but my understanding is that the dataframe needs to be backed by something (orc files or something else).

I was just considering whether the orc format can efficiently support the operations that are needed if for whatever reason data are not available in-memory (e.g. after a restart).

Another question that I have is under what circumstances (other than restart) does this even matter; I don't know if dataframes are able to discard parts of themselves (e.g. under memory pressure or some failure) and reload from the updated backing store or if they use the more typical spark mechanism.

These are question that will be important after getting something initially working, I think.

lossyrob commented 6 years ago

Ah, OK, I think I may have my wires crossed on what tasks we're talking about. There's the "updating a DataFrame, that is read in from ORC and update it to the most recent version based on replication files", which has been some of the latter part of the conversation on this issue, and separately the Augmented Diff pipeline, which is different (the ORC update might rely on the AugDiffs, but as was pointed out above it doesn't need to be).

In the former case, which I think is maybe what we're talking about, we read in the ORC file that represents up to a certain point in history, and then read in replication files to update the DataFrame from there. The desired output is a whole DataFrame that we can do analytics on moving forward, not to do AugDiff queries against. These DFs would be ephemeral, and on restart or job fail the job would run again and bring itself to the latest version via the same process. This is encapsulated in issue #48, and if this is what we're talking about I'd suggest we continue the conversation.

The latter case wouldn't be around updating a DataFrame, or probably use DataFrames at all. It would support the streaming case mentioned above.

Am I on target saying we're discussing the former case and not the latter case?

mojodna commented 6 years ago

Sorry, about not being clear. I am referring to queries of the form "largest number smaller than 50". In this particular case, I am suggesting that if (for example) there is a list of ways that is jointly ordered by id and timestamp, then a query for all ways which meet a particular node at a particular time can be answered by the most recent ("largest") instance of a node that occurs before the given time. Essentially I would like to treat id+timestamp as the node id, not just id.

Ah, gotcha. In Presto, I'd been using the max_by(timestamp, version) aggregate function with GROUP BY id, version, timestamp. In Spark SQL, I was using the updated...validUntil range as a filter on the JOIN. In something like RocksDB where one is iterating over (sorted) keys, what you're describing here.

Much appreciated, what I suggested above is certainly not exclusive of any of these.

Agreed, just wanted to take a stab at concretely enumerating what we need for augmented diffs.

Another question that I have is under what circumstances (other than restart) does this even matter; I don't know if dataframes are able to discard parts of themselves (e.g. under memory pressure or some failure) and reload from the updated backing store or if they use the more typical spark mechanism.

I started looking at Apache Ignite as a backend when pondering this but didn't conclude anything. Good stuff to pay attention to, since it sounds like no one here knows the answer yet.

Am I on target saying we're discussing the former case and not the latter case?

Should we split this into multiple issues? Augmented diff generation (against something) + streaming (to something) replication + one-shot replication updates (covered by #48).

mojodna commented 6 years ago

@jamesmcclain have a look at https://github.com/osmlab/osm-wayback if you haven't already. @jenningsanderson is using RocksDB (for full geometry storage) with id + timestamp keys as you suggest ^^.

jenningsanderson commented 6 years ago

:wave: To chime in briefly, I've been very impressed (and shocked) with the performance of rocksdb in this domain; I've listed a few performance stats when run locally here: https://github.com/osmlab/osm-wayback#performance; I've also experimenting with redis as a node cache for historic node locations, though am less impressed. osm-wayback is currently focused on differences in tags between versions, but am currently expanding to handle historic geometries... though have realized from @mojodna that w/o "augmented diffs" and changeset information, this question becomes far more complex. Still, rocksdb is lightning fast, especially if you pbf encode OSM objects as values in the DB... definitely don't overlook it as a backend object store for reconstructing geometries or diffs...

jamesmcclain commented 6 years ago

@jamesmcclain have a look at https://github.com/osmlab/osm-wayback if you haven't already. @jenningsanderson is using RocksDB (for full geometry storage) with id + timestamp keys as you suggest ^^.

Okay, will definitely have a look. Thanks for the reference.

jamesmcclain commented 6 years ago

I've been very impressed (and shocked) with the performance of rocksdb

Okay, very well taken. It looks as though there is tremendous impetus behind rocksdb. I would like to get a basic prototype working, then move to that.

moradology commented 6 years ago

It looks as though there is tremendous impetus behind rocksdb

Yeah, my cursory exploration of our options for the node cache suggested that rocksdb was the best price/performance/ease-of-use compromise. We lose the security/ease of a hosted system but gain the ability to use a single SSD for the entire problem.

kamicut commented 6 years ago

When we had this meeting last month we were considering rocksdb or elasticsearch for the node cache. We set up meetings around the elasticsearch experiment with @moradology @geohacker @dereklieu and others, because our initial thought was that rocksdb does not have an easy API or bindings. Is this no longer the case?

I also got very confused as to what we're tackling in this ticket. Should we close it in favor of #52 and #48?

jenningsanderson commented 6 years ago

@kamicut -- rocksdb does not have an easy API or good bindings for anything (except maybe Java?) :( I've been forced to relearn C++. https://github.com/dberesford/rocksdb-node is the best I've found for node, but cannot get it working properly with column families; if you don't use column families, then most leveldb bindings can be used (like levelUp or levelDown), which are much more actively developed.

lossyrob commented 6 years ago

In case people are watching this issue and not the others we broke out, see this comment for the current approach of a spike to create the stream of Augmented Diffs.