robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

send_soon under concurrent.futures.ProcessPoolExecutor #702

Closed albertalexandrov closed 3 years ago

albertalexandrov commented 3 years ago

Hi and many thanks for the lib!

I'm sure whether this bug or not. When I run the example send_soon method works as expected. But when I run my app (see app code below) this does not happen. And as far as I can see the problem is that messages are not getting out from queue. Messages are being put in it - queue size is increasing.

Checklist

Steps to reproduce

Run this app:

import asyncio
from concurrent.futures import ProcessPoolExecutor

import faust

loop = asyncio.get_event_loop()

app = faust.App('my-app-name', broker='kafka://localhost:9092')
sink = app.topic('topic')

@app.task()
async def check():
    tasks = []

    with ProcessPoolExecutor(max_workers=3) as executor:
        # here I would like to run folder files processing in parallel manner
        for dir_ in ['dir1', 'dir2', 'dir3']:
            task = asyncio.create_task(run_dir_handling(executor, dir_))
            tasks.append(task)

        await asyncio.gather(*tasks)

async def run_dir_handling(executor, dir_):
    await loop.run_in_executor(executor, handle, dir_)

def handle(directory):
    print('Handle directory')

    # finding archives in directory
    # unpacking 7z with mdb-files
    # converting mdb tables to csv
    # reading csv to dataframe
    # some data manipulating
    # and at last sending dataframe records to kafka

    # `send_soon` is not non-`async def but `send` is async
    # async `soon` cannot be implemented because of
    #    `await loop.run_in_executor(executor, handle, dir_) TypeError: cannot pickle 'coroutine' object` error

    sink.send_soon(value={'dir': directory})  # always <FutureMessage pending>

Expected behavior

Messages are in topic.

Actual behavior

Messages in queue are not being handled.

Versions