python-streamz / streamz

Real-time stream processing for python
https://streamz.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
1.24k stars 149 forks source link

ENH: add interval to window/rolling #246

Open martindurant opened 5 years ago

martindurant commented 5 years ago

Currently we keep a certain buffer length, and emit on every new item. Might be useful to ony emit every Nth item, like Spark-streaming does https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#window-operations

This can be achieved already with normal window-aggregate followed by the Slice operator in #241 , but would save on unnecessary computations.

ea42gh commented 5 years ago

one pattern I need frequently is to skip a certain buffer length initially, followed by repeating take n, skip m e.g., given a stream of integers 1,2,3... skip 2, take 3, skip1 would get (3,4,5) (7,8,9) (11,12,13) ...

martindurant commented 5 years ago

This is a bit like the "range" functionality

ea42gh commented 5 years ago

This is the kludge I had come up with. There may be better:

def kill_grab_skip( source, kill, grab, skip, element=0 ):
    '''remove an initial number of elements, then repeat a fixed grab,skip pattern

    Args:
        source  : stream node to append to
        kill    : number of initial elements to ignore
        grab    : number of elements to return
        skip    : number of subsequent elements to ignore

    Returns:
        the streamz node chain created
    '''
    def skip_first( x, skip):
        return x[skip:]

    if skip < kill:
        # since skip is greater than kill: transform to partition of length (skip, kill)
        p      = source.slice(kill-skip, None, None ).partition(skip+grab)
        if skip > 0:
            p = p.map(lambda x: skip_first(x,skip))
    elif skip == kill:
        p      = source.partition( skip+grab ).map(lambda x: skip_first(x,skip))
    else:
        # kill is less than skip: emit startup elements to get to the kill == skip case
        p      = source.partition( skip+grab ).map(lambda x: skip_first(x,skip))
        for i in range(skip-kill):
            source.emit(element)
    return p

print('3,8,0 pattern')
source = streamz.Stream();p= kill_grab_skip(source,3,8,0); p.sink(print)
for i in range(1,30): source.emit(i)
martindurant commented 5 years ago

Are you sure slice does not do the same thing as this?

In [2]: s = streamz.Stream()
In [3]: s2 = s.slice(3, None, 8)
In [4]: s2.sink(print)
Out[4]: <sink: print>
In [6]: for i in range(1, 30):
   ...:     s.emit(i)
   ...:
9
17
25

https://github.com/python-streamz/streamz/blob/master/streamz/core.py#L771

ea42gh commented 5 years ago

The call to slice picks every 8th element. I need to pick a contiguous set of more than 1 element, e.g., [8,9,10], [16,17,18], ...

So yes, slice comes close, and I did use it in the function I posted above, followed by partition() to make sure I obtain the wanted data set as soon as it is available. I really dislike the emit() calls in the function though.

Not sure github issues is a good place to discuss this, though?

martindurant commented 5 years ago

OK, understood. In that case, feel free to add as PR with relevant tests.