xpdAcq / rapidz

Fork of Streamz
https://xpdacq.github.io/rapidz
BSD 3-Clause "New" or "Revised" License
5 stars 7 forks source link

Handle parallel filter better #14

Open CJ-Wright opened 5 years ago

CJ-Wright commented 5 years ago

Parallel filter causes a whole host of problems:

  1. It causes join nodes to break. In a serial pipeline the data simply doesn't flow to the next node, so zips don't emit and combine_latest nodes don't update/emit. This is no longer true in parallel, since we must now emit on every piece of data.
  2. This causes problems with SHED since when we inspect the data to make a descriptor we could get a string because we inspected the sentinel.
CJ-Wright commented 5 years ago

combine_latest would need to make a future which represents the latest actually good future or the current data if good.

def if_good(x, y):
    xx = result_maybe(x)
    if xx != sentinel:
        return xx
    else:
        return result_maybe(y) 

def update(self, x, who=None):
    if not self.cache_future:
        self.cache_future[who] = x
    else:
        self.cache_future[who] = submit(if_good, x, self.cache_future[who])
        ... # emit out the futures if all exist

This might work? It will loose any incoming data that is before all channels get a good data set, but we might do that already?