Open nyurik opened 3 years ago
Hi @nyurik, this is really interesting. I'm glad to help you.
I couldn't quite get the requirements for the output files.
Is each element considered separately (as they are stored in the PBF file)? So, for example, does an OSM way need to know the geographic locations of its nodes or is it ok to just store the node ids?
Storing them separately would really simplify things, because we would not have to create a (potentially huge) index to perform these id lookups.
The most efficient strategy depends on the size and the amount of preprocessing that each element needs. Elements in osmpbf need a reference to their PrimitiveBlock to access a shared stringtable. To be able to put elements in a queue, you would have to make owned versions of them because the PrimitiveBlock is not guaranteed to exist anymore when writing to a gzip file in another thread. The best way to avoid this overhead is to let each worker thread deal with one Blob at a time and write directly to the gzip files when iterating over the elements.
So the strategy would be to create a BlobReader and let each thread handle decoding, iterating and writing the elements. There might be better solutions if decoding is not the bottleneck :)
@b-r-u yes and no, each element is considered separately (i.e. can be recorded in any of the resulting files), but no - each way element must resolve their node positions (to compute their center). This is not as critical for the relations. My old python implementation used osmium
lib which handled the node index internally.
Agree about gzip threading -- seems it would make things far easier if each thread simply keeps a thread-local output stream open, and writes to it. The only concern is that I would have to keep track of all open files in a shared state and close them after the processing is done -- since I don't control the thread creation when using your library, I cannot rely on some automagical stream flushing+closing when the parsing is done.
I was thinking about something like this as a starting point:
use osmpbf::{BlobReader, BlobDecode, Element};
use rayon::prelude::*;
fn main() -> anyhow::Result<()> {
let reader = BlobReader::from_path("bremen-latest.osm.pbf")?;
reader.par_bridge()
.try_for_each(
|blob| -> anyhow::Result<()> {
if let BlobDecode::OsmData(block) = blob?.decode()? {
block.for_each_element(|elem| {
match elem {
Element::Node(node) => {
},
Element::DenseNode(node) => {
},
Element::Way(way) => {
},
Element::Relation(rel) => {
},
}
})
}
Ok(())
})?;
Ok(())
}
So using rayon for parallelization. You would also need to keep track of the open streams, maybe try_for_each_init
is helpful because it is able to retain state between different blobs (https://docs.rs/rayon/1.5.0/rayon/iter/trait.ParallelIterator.html#method.try_for_each_init).
Edit: There are still some difficult issues. I'm not sure how to best parse the file in parallel while creating an index of the node positions and using this (possibly incomplete) index at the same time. When doing two passes over the whole file it becomes easy but might not be as efficient. First pass is reading/writing the nodes and creating the node index. Second pass is reading/writing ways and relations using the node index.
To expand this small example and find usability issues with osmpbf I created a repo: https://github.com/b-r-u/osm2gzip
Maybe it can evolve to a useful tool.
Thank you @b-r-u , this is exactly what i was looking for!!! One last thing -- how should I deal with the way's positions resolution? I need to compute center point for ways somehow (I won't need to do it for relations). Thanks!
A node index is still missing in the code, I can add it later today. It should be a HashMap from node id to position, which can be used in a second pass to compute centroids.
I'm afraid a hash map won't work when processing a planet file unless I have tons of ram. Is there a memmapped implementation somewhere we can use?
Oh, I guess that's true. I was thinking that it may work with a 64 bit key (the node id) and a 64 bit value (two i32 coordinates), but with the planet file this would still be more than 100 GB. I also couldn't find a memmapped HashMap.
There are some helpful notes by the Osmium project on their use of indexes (https://osmcode.org/osmium-concepts/#indexes). For the planet file they suggest a dense array which is directly indexed by the node id. Unfortunately, the memmap crate does not support growable memory maps, so we would need to guess the maximum node id or split the index into multiple memmapped files.
Another interesting option is the fst crate. But it requires that the node ids are sorted: https://docs.rs/fst/0.4.5/fst/#example-stream-to-a-file-and-memory-map-it-for-searching
Thanks, didn't know about fst, left a question with some rough stats. It looks like there are about 20% wasted space because of the deleted nodes (if storing densly without node ID). Has any other rust-based implementations done such index?
Nope, per an awesome conversation, fst would not be a good general solution here, unless the data is never updated - fst must be readonly after creation.
I decided to give it another stab using memmap2 to implement just the caching part, but it is going slow because Rust is still new to me. I don't think fixed mmap size is a problem -- if the file has to grow, simply close the mmap while keeping the file open, use file.set_len(new_size)
, and re-create the mmap. Obviously the file would grow in some large increments, e.g. 100MB to make sure this doesn't happen too often. Also, it would be good to remain compatible with libosmium's storage as implemented in location.hpp
Sorry for my late reply! Following our discussion, I actually implemented a proof of concept in May but didn't get around to include it in this library. It also re-creates the memory map in fixed increments (1 GB):
use std::convert::TryInto;
use std::fs::{File, OpenOptions};
use std::io;
use fst::{IntoStreamer, Map, MapBuilder, Streamer};
use memmap::{Mmap, MmapMut};
use osmpbf::{BlobDecode, BlobReader, DenseNode, Element, ElementReader};
// Pack two i32 into one u64
fn pack(a: i32, b: i32) -> u64 {
((a as u32) as u64) << 32 | ((b as u32) as u64)
}
// Unpack two i32 from one u64
fn unpack(x: u64) -> (i32, i32) {
((x >> 32) as i32, x as i32)
}
fn unpack_to_coords(x: u64) -> (f32, f32) {
let (lat, lon) = unpack(x);
(lat as f32 * 1e-7, lon as f32 * 1e-7)
}
struct FlatIndex<'a> {
file: &'a File,
mmap: MmapMut,
}
impl<'a> FlatIndex<'a> {
const SIZE_STEP: usize = 1024*1024*1024;
fn new(file: &'a File) -> Result<Self, std::io::Error> {
Ok(Self {
file,
mmap: unsafe { MmapMut::map_mut(file)? },
})
}
fn add_node(mut self, node: &DenseNode) -> anyhow::Result<Self> {
let id = node.id();
if id < 0 {
anyhow::bail!("nope");
}
let id: u64 = id as u64;
let index: usize = id as usize * 8;
if index < self.mmap.len() {
let coords = pack(node.decimicro_lat(), node.decimicro_lon());
(self.mmap[index..(index+8)]).copy_from_slice(&coords.to_be_bytes());
Ok(self)
} else {
let file = self.file;
self.finish()?;
let new_size = (index / Self::SIZE_STEP + 1) * Self::SIZE_STEP;
println!("new_size: {} == {}", new_size, index / Self::SIZE_STEP + 1);
file.set_len(new_size as u64)?;
let mut ne = Self::new(file)?;
if index >= ne.mmap.len() {
anyhow::bail!("NOPE");
}
let coords = pack(node.decimicro_lat(), node.decimicro_lon());
(ne.mmap[index..(index+8)]).copy_from_slice(&coords.to_be_bytes());
Ok(ne)
}
}
fn add_node_batch(self, values: &mut [(u64, u64)]) -> anyhow::Result<Self> {
if values.is_empty() {
return Ok(self);
}
values.sort_by_key(|x| x.0);
let mut fi = self;
let max_index: usize = values.last().unwrap().0 as usize * 8;
if max_index >= fi.mmap.len() {
let file = fi.file;
fi.finish()?;
let new_size = (max_index / Self::SIZE_STEP + 1) * Self::SIZE_STEP;
println!("new_size: {} == {}", new_size, max_index / Self::SIZE_STEP + 1);
file.set_len(new_size as u64)?;
fi = Self::new(file)?;
}
for &(index, coords) in values.iter() {
let index = index as usize;
(fi.mmap[index..(index+8)]).copy_from_slice(&coords.to_be_bytes());
}
Ok(fi)
}
fn finish(self) -> anyhow::Result<()> {
self.mmap.flush()?;
Ok(())
}
}
fn main() -> anyhow::Result<()> {
// Open *.osm.pbf file
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
println!("Need *.osm.pbf file as first argument.");
return Ok(());
}
let reader = BlobReader::from_path(&args[1])?;
let file = OpenOptions::new().read(true).write(true).create(true).open("flat_index")?;
file.set_len(FlatIndex::SIZE_STEP as u64)?;
file.sync_all()?;
// This is where we'll write our map to.
let mut fi = FlatIndex::new(&file)?;
// Insert each node.
let mut num_nodes: u64 = 0;
for blob in reader {
if let BlobDecode::OsmData(block) = blob?.decode()? {
let mut batch = vec![];
for node in block.groups().flat_map(|g| g.dense_nodes()) {
batch.push((node.id as u64, pack(node.decimicro_lat(), node.decimicro_lon())));
num_nodes += 1;
}
fi = fi.add_node_batch(batch.as_mut_slice()).unwrap();
}
}
// Finish construction of the map and flush its contents to disk.
fi.finish()?;
println!("Stored {} nodes", num_nodes);
file.sync_all()?;
let l = file.metadata().unwrap().len();
println!("file len {}", l);
Ok(())
}
Thanks @b-r-u , your code gave me a huge boost in understanding Rust :) I did some more work on your example trying to make it into a reusable caching library:
The biggest challenge is how to make it multithreaded without frequent locking. First I wanted to find out how to write to memap in parallel without syncing -- I posted a stackoverflow question and also got awesome feedback from the memmap2 author. The solution provided seems great except for one minor issue - handling file growth, which requires all threads to be paused.
The current approach of set_value(index, value)
resizing the file when index > file_size
might not work because there would need to be some locking mechanism for each set_value
call to avoid a race condition -- to ensure that the memmap is not being re-created.
An alternative approach might be for each thread to process PBF blocks until ALL threads encounter the index > file_size
condition. Once encountered, the thread would pause processing, e.g. send partially-processed block via channel and stop. Once all threads stop (all threads hit the size limit or run out of things to do), the main thread would resize the file, re-create the memmap, and continue PBF parsing.
To be honest I have no idea if the current osmpbf
would fit into this paradigm, or if a different one would be required?
~After some more hacking, I arrived at https://gist.github.com/nyurik/daea497c122e92ef2637904fc284fd6c but sadly doesn't compile yet (fighting Rust lifetime challenges). Once I figure out how to handle that, I will just need to solve the possible thread slowdown on file grow, and hopefully that can be made as a proper cache lib.~
@b-r-u I think i got my lib to work properly. Would love your feedback. I will try to publish it soon. When I ran it against a recent planet file saving just the nodes, it generated a 65GB file in 23 minutes. Not ideal (CPU was rarely exceeding 200%), so some optimizations are still in order. https://github.com/nyurik/osm-node-cache
Hey @nyurik, that's a great project! I will try to find some time to look into it.
thx, I have already published a few revisions of the lib - https://crates.io/crates/osmnodecache , do let me know what you think, and thanks for all your help!
@b-r-u i created a few PRs, please let me know what you think of them. In the mean time, should this be closed (and thank you so much for all the help to get started with Rust and Geo) :) Oh, and one more thing -- I think your lib is perfectly suited for https://github.com/georust -- and moving it there would raise community's awareness of the project, and help more people collaborate on it easily. What do you think?
Hi, I just started down the Rusty road, and would love to get some suggestions on the best usage of osmpbf.
I would like to re-implement Sophox parser -- a tool that converts PBF into hundreds of gzip files of similar size in the most efficient way possible (current Python code takes ~3 days).
My current plan is to use
par_map_reduce()
, process each element, and add it to some queue. Several gzip sinks would listen to the queue, pick up entries, and save them to gzip files compress in the process. Once the number of elements in a file exceeds a threshold, the file would rotate. I have no idea how appropriate this is with Rust paradigms, or if some other pattern would be betterP.S. I saw something similar in peermaps/ingest, but not sure if that's the right approach
cc: @substack
Thanks!