PickwickSoft / pystreamapi

The Python Stream API Library offering Streams like you know from Java/Kotlin/Scala in Python with some cool extensions
https://pystreamapi.pickwicksoft.org/
GNU General Public License v3.0
29 stars 4 forks source link

`Stream.of(...)` consumes the underlying iterable eagerly #94

Open adrian-herscu opened 3 weeks ago

adrian-herscu commented 3 weeks ago

Describe the bug Stream.of(...) consumes the underlying iterable eagerly. Perhaps there is another way of wrapping an iterable?

To Reproduce

def gen() -> Generator[int, None, None]:
    for i in range(1, 4):
        print(">>")
        yield i

def should_stream0():
    for i in gen():
        print(i)

def should_stream2():
    Stream.of(gen()) \
        .map(lambda i: str(i)) \
        .for_each(print)

Expected behavior should_stream2 should behave the same as should_stream0 by printing >> interleaved with numbers.

Machine (please complete the following information):

adrian-herscu commented 2 weeks ago

BTW... from __csv_loader.py:

def __load_csv(file_path, cast, delimiter, encoding):
    """Load a CSV file and convert it into a list of namedtuples"""
    # skipcq: PTC-W6004
    with open(file_path, mode='r', newline='', encoding=encoding) as csvfile:
        csvreader = reader(csvfile, delimiter=delimiter)

        # Create a namedtuple type, casting the header values to int or float if possible
        header = __get_csv_header(csvreader)

        Row = namedtuple('Row', list(header))

        mapper = LoaderUtils.try_cast if cast else lambda x: x

        # Process the data, casting values to int or float if possible
        data = [Row(*[mapper(value) for value in row]) for row in csvreader]
    return data

what happens if the file is too big to fit in memory?...

garlontas commented 2 weeks ago

Thanks @adrian-herscu for your helping improve pystreamapi!

You are right that generators are not handled as they should be. The problem is very visible in this snippet:

def infinite_gen():
    yield from iter(int, 1)

Stream.of(infinite_gen()) \
    .map(lambda x: x * 2) \ # 💀 Infinite loop
    .limit(10) \ 
    .for_each(print)

Currently, you have to position limit at the beginning of the stream as the first intermediate operation. In Java, any order is supported as long as you use limit somewhere.

def infinite_gen():
    yield from iter(int, 1)

Stream.of(infinite_gen()) \
    .limit(10) \ 
    .map(lambda x: x * 2) \
    .for_each(print) # 👍 Working perfectly

Pystreamapi is executing all operations lazy, but internally uses lists, being the reason the whole generator is consumed with the first intermediate operation.

The best solution is to use generators internally so the generators are not completeley consumed.

Note: The missing pattern in your output is not necessarily always an issue, since there are intermediate ops which require the whole generator to be consumed (sorted, distinct, etc.)