adamfranco / curvature

Find roads that are the most curvy or twisty based on Open Street Map (OSM) data.
http://roadcurvature.com/
225 stars 39 forks source link

Improved post processing structure #13

Closed Fonsan closed 8 years ago

Fonsan commented 8 years ago

This change is to be considered a draft and I would like some feedback if this change is going in the right direction. My python is still rudimentary, and any input on best practices is highly appreciated.

The aim is to improve the way we scale the post processing pipeline, I have moved each of them into a proper class under a separate sub package and written tests for them.

This enables us to still write complex expressions such as:

./curvature-calculate andorra-latest.osm.pbf \
 | ./curvature-pp add_length \
 | ./curvature-pp add_points \
 | ./curvature-pp filter_radius --max 200 \
 | ./curvature-pp remove_fields --fields county \
 | ./curvature-pp filter_curvature --min 300 \
 | ./curvature-pp sort --key curvature --direction ASC \
 | ./curvature-pp head -n 2 \
 | ./msgpack-reader

But also express them in pure python and expose them to end consumers of this library without the need of serialising to messagepack between each post processor. In addition I designed the interface of the post processing classes to feature streaming support, thus removing the need to read all paths into memory.

head = Head(num=1)
filter_curvature = FilterCurvature(min=5)
data = [
  {'curvature': 2},
  {'curvature': 5},
  {'curvature': 6},
  ...
  ...

] # Could also be a messagepack Unpacker
chain = reduce(lambda acc, processor: processor.process(acc), [data, filter_curvature, head])
chain[0] # => {'curvature': 5}

The chain above might be hard to wrap ones head around at first but it supplies a fully streamable data source, note that the third element would never be yielded in any underlying python code thus reducing the needed memory in order to process the paths, much like ordinary piping of commands in a *nix shell.

I have moved all of the post processors into a single program in order to keep the exposed interface clean.

I have added a Head post processor which is excellent for looking at example output

In order to get py.test to work it needed the project to be a "package" so I added a setup.py

We might consider giving the output scripts the same love but perhaps in a later branch

TODO

adamfranco commented 8 years ago

Hi Erik, I've read through the patch changes as well as done some reading on functional techniques in Python and feel like this is a useful direction to head. As you indicated when we first started this refactoring process, retaining the ability to output the raw data as a MessagePack stream at any point is still a useful feature as it can allow additional programs to be injected into the pipeline or use intermediate data. That said, serializing to MessagePack every step is extra overhead that isn't needed in many cases. This new method that allows all processing to stay within a single executable script can save that overhead and possibly provide some flexibility. Also, I appreciate the ability to write tests for each stage, these will come in handy.

For my current usage, I'm interested in a processing chain that does several steps of post-processing, then sends the results to multiple output files, as shown in the curvature.sh shell script. If doing this all in Python I could probably just send the full final iterator to each output sequentially or use something like itertools.tee() to split the iterator and send it to each filtering/output step in parallel.

Overall, this looks like a good direction to continue on. :+1: In the post-processing and output stages, the only thing I see so far that requires loading the full result set into memory is sorting -- all of the other post-processing and output modules should be able to work on a stream of items. The initial collecting and curvature calculation is the next place I'll need to look into for reducing the need to load quite so much data into memory since the process is currently very heavy on memory usage, precluding me from processing large inputs (like Germany) on small-memory machines. :sweat_smile:

One other thing I've just come across that might be helpful is stream.py. I'm not sure if it is worth the added dependency, but it might provide cleaner syntax to replace the reduce(...) call.

adamfranco commented 8 years ago

Hi @Fonsan, I was just wondering if you've had a chance to make any progress on this new post-processing structure. I'm circling back around to some more Curvature improvements and was hoping to finish up the main structure of the refactoring process in the next few weeks so that I can start using the refactor branch in my processing pipeline that generates the world-wide KML output I host and move on to other Curvature projects.

One thing I just noticed as I was looking into incorporating this PR, it seems that your [post_processors branch]() has all of the commits of your filter_surface branch and a few additions, but rebased onto the master branch in a way that duplicates all of the commits on the refactor branch. To stay sane, I'd suggest, resetting your post_processors branch to the current HEAD of refactor and then cherry-pick 1ab602fd, d722a2d0, eba55ff4, and 47a96b97 onto it. This will allow a clean merge onto refactor after you finish up what you're working on. :smiley:

Fonsan commented 8 years ago

Hi @adamfranco a simple rebase against refactor had the equivalent result.

"Updating the readme" in this pull request is the only step I consider necessary before merging into refactor

Merging refactor to master would be awesome!

Fonsan commented 8 years ago

After pouring through https://docs.python.org/3/library/itertools.html I realise that their might be a more elegant way of expressing a few of the post processors but I am fine with their current state for now

Fonsan commented 8 years ago

We could merge refactor and choose not to officially expose the api in the readme of the post_processors for now as their api is subject to change

Fonsan commented 8 years ago

I just realised a major issue with tertools.tee() if one consumer reads from one iterable python will need to keep the every object in memory until it is has been consumed in every iterable. If one applies filters it becomes near impossible to sync this and memory becomes a huge issue.

long_roads_list = tee(filter_length.FilterLength(min=2500).process(roads_list.pop()), 3)
long_roads = long_roads_list.pop()
soft_long_roads = filter_surface.FilterSurface(exclude_surfaces=['asphalt','paved', 'unknown']).process(long_roads_list.pop())
hard_long_roads = filter_surface.FilterSurface(include_surfaces=['asphalt','paved']).process(long_roads_list.pop())

I see two solutions either we ditch the stream and pass filenames to every process that can then read the file multiple times which is also inefficient or we allow a callback interface.

This will in turn force us away from the pull interface through python iterators since everything will become push.

class Streamer(object):
  def __init__(self, iterable):
    self.iterable = iterable
    self.callbacks = []

  def add_callback(self, callback):
    self.callbacks.append(callback)

  def run(self):
    for item in iterable:
      for callback in callbacks:
        callback(item)

streamer = Streamer(msgpack.Unpacker(sys.stdin, use_list=True))
def foo(item)
  if item['curvature'] > 5:
    print 'foo'
streamer.add_callback(foo)
Fonsan commented 8 years ago

Perhaps we could support both versions by exposing a different method as well:


class Processor(object):
    def process(self, iterable):
        for item in iterable:
            results = run(item)
            if results:
                for result in results:
                    yield(result)

class FilterLength(Processor):
    def __init__(self, min=None, max=None):
        self.min = min
        self.max = max

    def run(self, item):
        if self.min is not None:
            if item['length'] < self.min:
                return [item]
        if self.max is not None:
            if item['length'] > self.max:
                return [item]
adamfranco commented 8 years ago

Hi @Fonsan, I've done a bunch of README clean-up and reworked a bunch of my normal usage into a few scripts in processing_changes/ as a way to validate that this new code can still generate the output I need. 👍