python-bonobo / bonobo

Extract Transform Load for Python 3.5+
https://www.bonobo-project.org/
Apache License 2.0
1.58k stars 143 forks source link

Graceful termination hangs when filtering data #391

Open BalzySte opened 3 years ago

BalzySte commented 3 years ago

Hi, I've encountered a possible bug while trying to filter out data passed along the pipeline. I've built a short demo to demonstrate what I'm experiencing in a more complex scenario.

import bonobo
from time import sleep

data = 10

def extract():
    for _ in range(100):
        print('Yielding ', data)
        yield data
        sleep(1)

def filter_data(value):
    if value > 0:
        return value

def load(value):
    print('Received ', value)

if __name__ == '__main__':
    graph = bonobo.Graph()
    graph.add_chain(extract, filter_data, load)
    bonobo.run(graph)
    print('Done')

Filtering works out fine. The issue arise when I try to gracefully terminate the pipeline execution with a single Ctrl + C.
When the extractor yields 10 and data is not filtered a single Ctrl + C terminates the execution. When instead data is 0 and is filtered out by filter_data, the pipeline halts the execution but the bonobo.run method hangs and does not return. I need to force quit with a repeated SIGKILL.

Am I filtering data in the way bonobo expects me to do it? I'm quite new to the library and I haven't found any tip regarding filtering/removing data from the pipeline. Is having a processor return None the correct way to deal with filtering operations?

I'm running python 3.7.9 with bonobo 0.6.4.

Any help is appreciated, thanks!