robinhood / faust

Python Stream Processing
Other
6.7k stars 538 forks source link

concurrent.futures.ProcessPoolExecutor only runs sequentially and errors testing #730

Closed Midnighter closed 2 years ago

Midnighter commented 2 years ago

Checklist

Steps to reproduce

I have defined a processor that looks somewhat like the following (simplified).

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
from typing import AsyncIterable

import faust

app = faust.App("long-jobs-processor")

class JobRequestedEvent(faust.Record, serializer="json", validation=True):

    expected_time: float

class JobCompletedEvent(faust.Record, serializer="json", validation=True):

    success: bool

job_requested_topic = app.topic("job-requested")
job_completed_topic = app.topic("job-completed")

def compute_tough_job(event: JobRequestedEvent) -> JobCompletedEvent:
    time.sleep(event.expected_time)
    return JobCompletedEvent(success=True)

@app.agent(job_requested_topic)
async def run_job(
    stream: faust.StreamT[JobRequestedEvent],
) -> AsyncIterable[JobCompletedEvent]:
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor(max_workers=10) as pool:
        async for event in stream:
            result = await loop.run_in_executor(pool, compute_tough_job, event)
            yield result

While constructing a more minimal example I also ran into an issue of not being able to test this because the loop gets closed.

import faust
import pytest

from process_pool_executor import app, run_job, JobRequestedEvent

@pytest.fixture
def app_instance(event_loop) -> faust.App:
    app.finalize()
    app.conf.store = "memory://"
    app.flow_control.resume()
    return app

@pytest.mark.asyncio
async def test_run_job(app_instance):
    async with run_job.test_context() as agent:
        results = await agent.join([
            JobRequestedEvent(expected_time=3) for _ in range(10)
        ])
        assert len(results) == 10
        assert all(event.success for event in results)

I appreciate any insights.

Expected behavior

A key component is the ProcessPoolExecutor which I was hoping would allow me to perform long running tasks and that I would be able to have multiple processes running concurrently.

Actual behavior

My problem with the real processor is that only one job gets processed by the pool at a time and I have no idea why.

Full traceback

The traceback only relates to the test, the processor runs fine but not concurrently.

/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/faust/agents/agent.py:926: in join
    return await self.kvjoin(
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/faust/agents/agent.py:944: in kvjoin
    posindex: MutableMapping[str, int] = {
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/faust/agents/agent.py:944: in <dictcomp>
    posindex: MutableMapping[str, int] = {
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/mode/utils/aiter.py:35: in aenumerate
    async for item in it:
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/faust/agents/agent.py:982: in _barrier_send
    await app.maybe_start_client()
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/faust/app/base.py:1327: in maybe_start_client
    await self.start_client()
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/faust/app/base.py:1317: in start_client
    await self.maybe_start()
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/mode/services.py:795: in maybe_start
    await self.start()
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/mode/services.py:736: in start
    await self._default_start()
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/mode/services.py:743: in _default_start
    await self._actually_start()
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/mode/services.py:767: in _actually_start
    await child.maybe_start()
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/mode/services.py:795: in maybe_start
    await self.start()
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/mode/services.py:736: in start
    await self._default_start()
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/mode/services.py:743: in _default_start
    await self._actually_start()
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/mode/services.py:760: in _actually_start
    await self.on_start()
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py:1018: in on_start
    await super().on_start()
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/faust/transport/producer.py:134: in on_start
    await self.add_runtime_dependency(self.buffer)
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/mode/services.py:560: in add_runtime_dependency
    await service.maybe_start()
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/mode/services.py:795: in maybe_start
    await self.start()
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/mode/services.py:736: in start
    await self._default_start()
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/mode/services.py:743: in _default_start
    await self._actually_start()
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/mode/services.py:764: in _actually_start
    self.add_future(task.fun(self))
/home/moritz/.pyenv/versions/3.8.5/envs/samples/lib/python3.8/site-packages/mode/services.py:592: in add_future
    fut = asyncio.ensure_future(self._execute_task(coro), loop=self.loop)
/home/moritz/.pyenv/versions/3.8.5/lib/python3.8/asyncio/tasks.py:661: in ensure_future
    task = loop.create_task(coro_or_future)
/home/moritz/.pyenv/versions/3.8.5/lib/python3.8/asyncio/base_events.py:429: in create_task
    self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=True debug=False>

    def _check_closed(self):
        if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

/home/moritz/.pyenv/versions/3.8.5/lib/python3.8/asyncio/base_events.py:508: RuntimeError
=

Versions

bobh66 commented 2 years ago

This project appears to have been abandoned.

You might want to check out the fork of this project - https://github.com/faust-streaming/faust

It has a bunch of fixes merged for problems that were in the base project.

Midnighter commented 2 years ago

Oh, thank you for the heads up.