vmlaker / mpipe

Python API for writing multiprocessing pipelines
http://vmlaker.github.io/mpipe
MIT License
86 stars 25 forks source link

Pipeline.results() returns partial results when there are multiple end stages #9

Open jumpoutofworld opened 9 years ago

jumpoutofworld commented 9 years ago

Pipeline.get return only one result from its output_stages . I can not catch the design, or a bug ?

def get(self, timeout=None): """Return result from the pipeline.""" result = None for stage in self._output_stages: result = stage.get(timeout) return result

vmlaker commented 9 years ago

Hi, if you have a pipeline implementation that illustrates the problem you're seeing, I will gladly take a look. If this is related to #8, then fan-in is the culprit.

CowBoy4mH3LL commented 4 years ago

Not sure if this is already addressed but I noticed the same.. .seems like the following may solve the issue

def get(self, timeout=None):
"""Return result from the pipeline."""
result = None
for stage in self._output_stages:
result = stage.get(timeout)
**yield** result

Also, there seems to be no need for the pipe.results method if the above is altered and get becomes an iterator.

For the timebeing I am using a debugger without making any changes to the code. Will happily do so if needed.

Cool application @vmlaker . Saved me the pain of kafka and storm using python.

CowBoy4mH3LL commented 4 years ago

I meant generator :)