i2mint / creek

Simple streams
Apache License 2.0
2 stars 0 forks source link

Util func to segment indices given intervals (or other segmentation specification) #3

Open thorwhalen opened 2 years ago

thorwhalen commented 2 years ago

What we need is a function segment_indices(indices: Iterable, intervals: Iterable[Typle[BT, TT]]) that yields groups of indices according to containment of the indices in the intervals. To be clear, both indices and intervals should be iterated over once only.

The target use case: Segmenting a flattened stream (i.e. making slabs out of merged/flattened streams).

See we have:

from creek.multi_streams import MergedStreams

streams_map = {
         'hello': [(2, 'two'), (3, 'three'), (5, 'five')],
         'world': [(0, 'zero'), (1, 'one'), (3, 'three'), (6, 'six')]
     }
flattened_multi_stream = list(MergedStreams(streams_map))
assert flattened_multi_stream == [
    ('world', (0, 'zero')),
    ('world', (1, 'one')),
    ('hello', (2, 'two')),
    ('hello', (3, 'three')),
    ('world', (3, 'three')),
    ('hello', (5, 'five')),
    ('world', (6, 'six'))
]

Given an index/timestamp (e.g. the 0 of (0, 'zero')) segmentation definition (example, chunked intervals, or an (monotonous) iterable of intervals) we want to get an iterator/generator of groups of data items, one per segment.

This works for chunking:

from itertools import groupby

def make_slabs_from_chunked_indices(flattened_multi_stream, chk_size):
    grouped_by_chunk = groupby(flattened_multi_stream, 
                               key=lambda x: x[1][0] // 2)
    return map(lambda x: list(x[1]), grouped_by_chunk)

assert list(make_slabs_from_chunked_indices(flattened_multi_stream, 2)) == [
    [('world', (0, 'zero')), ('world', (1, 'one'))],
    [('hello', (2, 'two')), ('hello', (3, 'three')), ('world', (3, 'three'))],
    [('hello', (5, 'five'))],
    [('world', (6, 'six'))]
]

Something like this could work for intervals in general, but is not very clean.

from creek.tools import dynamically_index, IndexUpdater, segment_overlaps
from typing import Iterator
inf = float('inf')

intervals_index_update: IndexUpdater

def intervals_index_update(current_index, data_item, intervals):
    # make sure intervals is an iterator (needs to be "consumed")
    assert isinstance(
        intervals, Iterator), "intervals needs to be an iterator (feed me iter(intervals))"
    bt, tt = current_index  # we're indexing with intervals here!
    if bt <= data_item < tt:
        return current_index
    else:
        return next(intervals, None)

flattened_multi_stream = list(MergedStreams(streams_map))

indices = list(map(lambda x: x[1][0], flattened_multi_stream))
intervals = [(0, 2), (2, 4), (4, 6), (6, 8)]

assert list(dynamically_index(indices, start=(inf, inf),
                              idx_updater=partial(intervals_index_update,
                                                  intervals=iter(intervals)))) == [
    ((inf, inf), 0),
    ((0, 2), 1),
    ((0, 2), 2),
    ((2, 4), 3),
    ((2, 4), 3),
    ((2, 4), 5),
    ((4, 6), 6)
]

Perhaps the dynamical indexing setup should be revised?

┆Issue is synchronized with this Asana task by Unito

thorwhalen commented 2 years ago

The following seems to work but should be cleaned up:


from itertools import count
from creek.scrap.multi_streams import MergedStreams
from itertools import tee
from typing import Iterable, Tuple
from numbers import Number

BT = Number
TT = Number
Interval = Tuple[BT, TT]
Intervals = Iterable[Interval]

no_more_data = type('NoMoreData', (), {})()

def make_slabs(iterable, intervals: Intervals, key=None):
    """iterable and intervals assumed monotone increasing"""
    iterator = iter(iterable)
    if key is None:
        key = lambda x: x
    item = next(iterator, no_more_data)
    while item is not no_more_data:
        idx = key(item)
        for bt, tt in intervals:
            if item is no_more_data:
                break
            accum = list()
            while idx <= tt:
                accum.append(item)
                item = next(iterator, no_more_data)
                if item is no_more_data:
                    break
                idx = key(item)
            yield accum

# A simple example

intervals = [(0, 2), (2, 4), (4, 6), (6, 8)]
iterable = [0, 1, 2, 3, 3, 5, 6]
assert list(make_slabs(iterable, intervals)) == [
    [0, 1, 2], [3, 3], [5, 6]
]

# An example involving a key

streams_map = {
    'hello': [(2, 'two'), (3, 'three'), (5, 'five')],
    'world': [(0, 'zero'), (1, 'one'), (3, 'three'), (6, 'six')]
}
flattened_multi_stream = list(MergedStreams(streams_map))
assert flattened_multi_stream == [
    ('world', (0, 'zero')),
    ('world', (1, 'one')),
    ('hello', (2, 'two')),
    ('hello', (3, 'three')),
    ('world', (3, 'three')),
    ('hello', (5, 'five')),
    ('world', (6, 'six'))
]

assert list(make_slabs(flattened_multi_stream, intervals, key=lambda x: x[1][0])) == [
    [('world', (0, 'zero')), ('world', (1, 'one')), ('hello', (2, 'two'))],
    [('hello', (3, 'three')), ('world', (3, 'three'))],
    [('hello', (5, 'five')), ('world', (6, 'six'))]
]
cavart28 commented 2 years ago

Here is what I think is a better version, easier to follow with simpler logic and also with the more typical choice of interpreting tt as exclusive

from itertools import count
from creek.scrap.multi_streams import MergedStreams
from itertools import tee
from typing import Iterable, Tuple
from numbers import Number

BT = Number
TT = Number
Interval = Tuple[BT, TT]
Intervals = Iterable[Interval]

no_more_data = type('NoMoreData', (), {})()

def make_slabs(iterable, intervals: Intervals, key=None):

    if key is None:
        key = lambda x: x

    iterable = iter(iterable)
    item = next(iterable, no_more_data)

    for interval in intervals:
        # check whether there are more items to process
        if item is not no_more_data:
            # start accumulating items for the current interval
            accum = []
            # discard item if "under" the bt of the interval
            while item is not no_more_data and key(item) < interval[0]:
                item = next(iterable, no_more_data)
            # now the item is above the bt, accumulate if under the tt
            while item is not no_more_data and key(item) < interval[1]:
                accum.append(item)
                item = next(iterable, no_more_data)
            # yield accumulated items for the current interval, and proceed to the next interval
            yield accum
       # if no more item, no need to keep looping through the intervals
        else:
            break

# A simple example

intervals = [(0, 2), (2, 4), (4, 6), (6, 8)]
iterable = [0, 1, 2, 3, 3, 5, 6]
assert list(make_slabs(iterable, intervals)) == [
    [0, 1], [2, 3, 3], [5], [6]
]

# An example involving a key

streams_map = {
    'hello': [(2, 'two'), (3, 'three'), (5, 'five')],
    'world': [(0, 'zero'), (1, 'one'), (3, 'three'), (6, 'six')]
}
flattened_multi_stream = list(MergedStreams(streams_map))
assert flattened_multi_stream == [
    ('world', (0, 'zero')),
    ('world', (1, 'one')),
    ('hello', (2, 'two')),
    ('hello', (3, 'three')),
    ('world', (3, 'three')),
    ('hello', (5, 'five')),
    ('world', (6, 'six'))
]

list(make_slabs(flattened_multi_stream, intervals, key=lambda x: x[1][0]))
assert list(make_slabs(flattened_multi_stream, intervals, key=lambda x: x[1][0])) == [
            [('world', (0, 'zero')), ('world', (1, 'one'))], 
            [('hello', (2, 'two')), ('hello', (3, 'three')), ('world', (3, 'three'))],
            [('hello', (5, 'five'))],
            [('world', (6, 'six'))]]
thorwhalen commented 2 years ago

Yes, definitely better.

Further ideas for improvement:

Regarding the definition of interval containment (which indeed is common to define as bt <= x < tt), we can make that choice both explicit and open(-closed) to be changed by using the design of inifinite_sequence.py. In a nutshell:

from operator import le, lt, ge, gt

def make_slabs(
    iterable, intervals: Intervals, key=None,  # now
    # additionals:
    above_bt: Callable = ge,  
    below_tt: Callable = lt,
):
    ...

If needed, we can get the complimentary info through the map:

opposite_op = {
    le: gt,
    lt: ge,
    ge: lt,
    gt: le,
}

On a different note, I wonder if using the walrus operator in the two while loops might simplify (or on the contrary) the code. Perhaps it depends on how used to the walrus the reader is. But it does seem to be a common use case for those who do use it.