cgarciae / pypeln

Concurrent data pipelines in Python >>>
https://cgarciae.github.io/pypeln
MIT License
1.55k stars 98 forks source link

[Bug] maxsize is ignored when chaining multiple stages #111

Open bzamecnik opened 12 months ago

bzamecnik commented 12 months ago

Describe the bug A clear and concise description of what the bug is.

Although maxsize on stages such as map() should limit the queue of items in the stage, it does not work as expected, when multiple map() stages are chained. The first stage is drained immediately.

Note: In addition to that each stage adds two extra items over maxsize.

Minimal code to reproduce Small snippet that contains a minimal amount of code.

import logging
import threading
import time

import pypeln as pl

# since print() among threads results in wrong ordering
logger = logging.getLogger('foo')
logging.basicConfig(level=logging.DEBUG)

def load(x):
    logger.debug(f"{threading.get_ident()} loading {x}")
    return x

def process(x):
    time.sleep(0.1) # some slow computation
    return f"processed {x}"

def show_results(stage):
    for result in stage:
        logger.debug(f"{threading.get_ident()} result '{result}'")

stage = pl.thread.map(load, range(10), workers=1, maxsize=1)
stage = pl.thread.map(process, stage, workers=1)

show_results(stage)

Output (wrong):

DEBUG:foo:140324684944960 loading 0
DEBUG:foo:140324684944960 loading 1
DEBUG:foo:140324684944960 loading 2
DEBUG:foo:140324684944960 loading 3
DEBUG:foo:140324684944960 loading 4
DEBUG:foo:140324684944960 loading 5
DEBUG:foo:140324684944960 loading 6
DEBUG:foo:140324684944960 loading 7
DEBUG:foo:140324684944960 loading 8
DEBUG:foo:140324684944960 loading 9
DEBUG:foo:140325310869696 result 'processed 0'
DEBUG:foo:140325310869696 result 'processed 1'
DEBUG:foo:140325310869696 result 'processed 2'
DEBUG:foo:140325310869696 result 'processed 3'
DEBUG:foo:140325310869696 result 'processed 4'
DEBUG:foo:140325310869696 result 'processed 5'
DEBUG:foo:140325310869696 result 'processed 6'
DEBUG:foo:140325310869696 result 'processed 7'
DEBUG:foo:140325310869696 result 'processed 8'
DEBUG:foo:140325310869696 result 'processed 9'

If we only have one stage it works well (OK):

stage = pl.thread.map(load, range(10), workers=1, maxsize=1)
show_results(stage)
DEBUG:foo:140324659766848 loading 0
DEBUG:foo:140324659766848 loading 1
DEBUG:foo:140325310869696 result '0'
DEBUG:foo:140324659766848 loading 2
DEBUG:foo:140325310869696 result '1'
DEBUG:foo:140324659766848 loading 3
DEBUG:foo:140325310869696 result '2'
DEBUG:foo:140324659766848 loading 4
DEBUG:foo:140325310869696 result '3'
DEBUG:foo:140324659766848 loading 5
DEBUG:foo:140325310869696 result '4'
DEBUG:foo:140324659766848 loading 6
DEBUG:foo:140325310869696 result '5'
DEBUG:foo:140324659766848 loading 7
DEBUG:foo:140325310869696 result '6'
DEBUG:foo:140324659766848 loading 8
DEBUG:foo:140325310869696 result '7'
DEBUG:foo:140324659766848 loading 9
DEBUG:foo:140325310869696 result '8'
DEBUG:foo:140325310869696 result '9'

If we set maxsize in the second stage it limits the queue in the first stage, not the second (wrong):

stage = pl.thread.map(load, range(10), workers=1)
stage = pl.thread.map(process, stage, workers=1, maxsize=1)

show_results(stage)
DEBUG:foo:140324684944960 loading 0
DEBUG:foo:140324684944960 loading 1
DEBUG:foo:140324684944960 loading 2
DEBUG:foo:140325310869696 result 'processed 0'
DEBUG:foo:140324684944960 loading 3
DEBUG:foo:140324684944960 loading 4
DEBUG:foo:140325310869696 result 'processed 1'
DEBUG:foo:140325310869696 result 'processed 2'
DEBUG:foo:140324684944960 loading 5
DEBUG:foo:140325310869696 result 'processed 3'
DEBUG:foo:140324684944960 loading 6
DEBUG:foo:140325310869696 result 'processed 4'
DEBUG:foo:140324684944960 loading 7
DEBUG:foo:140324684944960 loading 8
DEBUG:foo:140325310869696 result 'processed 5'
DEBUG:foo:140325310869696 result 'processed 6'
DEBUG:foo:140324684944960 loading 9
DEBUG:foo:140325310869696 result 'processed 7'
DEBUG:foo:140325310869696 result 'processed 8'
DEBUG:foo:140325310869696 result 'processed 9'

Expected behavior A clear and concise description of what you expected to happen.

Library Info Please provide os info and elegy version.

import pypeln
print(pypeln.__version__)

0.4.9

Screenshots If applicable, add screenshots to help explain your problem.

Additional context Add any other context about the problem here.