janbjorge / pgqueuer

PgQueuer is a Python library leveraging PostgreSQL for efficient job queuing.
https://pgqueuer.readthedocs.io/en/stable/
MIT License
995 stars 15 forks source link

Multiprocessing based concurrency #151

Open GCBallesteros opened 1 week ago

GCBallesteros commented 1 week ago

Hi thanks for the great library!

I have gone through the docs, past issues and a good part of the code but haven't found anything related to multiprocessing based concurrency. There are jobs that wont benefit much from the current async based approach but would still like to continue using pgqueuer for the convenience it brings me.

Is it already there and I have missed it or are there any plans to implement such a thing if not?

This is not a deal breaker as I imagine that I will be fine if I just spawn multiple process workers. Is this this correct? In some ways this is better than multiprocessing, less limitations about what can be done and better process isolation are the first that come to my mind.

My first idea is to have a shell script create and background N workers.

Edit: I went looking around and found the workers branch!

jumski commented 1 week ago

I tried to run multiple separate instances of pgqueuer manually but either i do not understand something of they just work kinda wonky for me- when i queue multiple jobs in batch it is always all picked by a single worker process.

janbjorge commented 1 week ago

Thanks for the interest and for digging into the details!

This is actually something I've been working on and off with, but I haven't yet found a solution that I'm completely satisfied with. The current "workers" branch, as you've discovered, just creates more instances of PGQueuer, but doesn't directly submit jobs to a new process.

Adding proper multiprocessing support could indeed be fairly straightforward, but I'm a bit reluctant to implement it right away. My concern is that it could easily lead to what I'd call "process spawn flooding," where too many processes are spawned without adequate management, leading to performance degradation or instability.

trim21 commented 1 week ago

I don't think we need new worker, if you use @qm.entrypoint("...", concurrency_limit=1) and spawn multiple process, then each worker will only pick one job and you will get multiprocessing based concurrency.

If you don't want to start multiple workers, asyncio also have ProcessPoolExecutor, which can be used in loop.run_in_executor(...), this avoid "process spawn flooding" naturally

janbjorge commented 1 week ago

Depending on how you want the system to behave, it might be worth to take a look at https://pgqueuer.readthedocs.io/en/stable/limits.html#serialized-dispatch as well. This ensures one(1) 'global' running task. While the 'concurrency_limit' is local (jobs per processes).

EDIT: Im might merging something like this https://github.com/janbjorge/pgqueuer/pull/155 it will allow you to write custom executors. Would that help your case?

trim21 commented 1 week ago

may also need to set batch_size to 1 for multiple workers

janbjorge commented 6 days ago

I merged #155 this allows the creation of custom execturs. In the below (untested) executor will run the decorated function in a new process (given its not async).

class ProcessesExecutor(JobExecutor):
    def __init__(
        self,
        func: Entrypoint,
        requests_per_second: float = float("inf"),
        retry_timer: timedelta = timedelta(seconds=0),
        serialized_dispatch: bool = False,
        concurrency_limit: int = 0,
    ) -> None:
        self.func = func
        self.requests_per_second = requests_per_second
        self.retry_timer = retry_timer
        self.serialized_dispatch = serialized_dispatch
        self.concurrency_limit = concurrency_limit
        self.is_async = is_async_callable(func)
        self.processes_pool = ProcessPoolExecutor()

    async def execute(self, job: models.Job) -> None:
        """
        Execute the job using the wrapped function.

        Args:
            job (models.Job): The job to execute.
        """
        if self.is_async:
            await cast(AsyncEntrypoint, self.func)(job)
        else:
            await asyncio.get_event_loop().run_in_executor(
                self.processes_pool,
                self.func,
                job,
            )

qm = QueueManager(...)

@qm.entrypoint(name="heavy_cpu_func", executor=ProcessesExecutor)
def heavy_cpu_func(job):
    ...
trim21 commented 6 days ago

This looks a little bit unnecessary since it's easy to do this?

    executor = ProcessPoolExecutor()

    @qm.entrypoint(name="heavy_cpu_func", concurrency_limit=1)
    async def execute(job):
        await asyncio.get_running_loop().run_in_executor(executor, heavy_cpu_func, ...)
janbjorge commented 6 days ago

@trim21 Great suggestion! Leveraging ProcessPoolExecutor within run_in_executor() is a natural way to manage spawning processes while mitigating process flooding risks.

However, there are cases where a custom ProcessesExecutor might be particularly useful. For example, if you need more structured process management or want to encapsulate specific retry logic, rate limiting, or other execution parameters that are challenging to implement directly with ProcessPoolExecutor, a reusable executor can help reduce boilerplate and improve maintainability.

trim21 commented 5 days ago

However, there are cases where a custom ProcessesExecutor might be particularly useful. For example, if you need more structured process management or want to encapsulate specific retry logic, rate limiting, or other execution parameters that are challenging to implement directly with ProcessPoolExecutor, a reusable executor can help reduce boilerplate and improve maintainability.

They are not challenging, because users can do this:

also ProcessesExecutor doesn't share executor, so it will create process pool for each job.

    executor = ProcessPoolExecutor()

    @qm.entrypoint(
        name="heavy_cpu_func",
        concurrency_limit=1,
        retry_timer=timedelta(seconds=10),
        requests_per_second=3,
    )
    async def execute(job):
        await asyncio.get_running_loop().run_in_executor(executor, heavy_cpu_func, ...)
GCBallesteros commented 5 days ago

Thanks @janbjorge and everybody for the discussion and the solutions offered. I will try them as soon as I can.

@trim21 I think @janbjorge was meaning a more general scenario than the specific examples that were presented. To my mind comes managing GPU resources for example.

trim21 commented 5 days ago

@trim21 I think @janbjorge was meaning a more general scenario than the specific examples that were presented. To my mind comes managing GPU resources for example.

Yes, I understand, and I'm suggesting do it in nature asyncio way, a new executor type is not very necessary here. it also lack the ability to share process pool between entrypoints

A example in mind:

import asyncio
import contextlib
import os
import time
from collections.abc import Callable
from concurrent.futures import Executor
from concurrent.futures.thread import ThreadPoolExecutor
from datetime import timedelta

import asyncpg
from pgqueuer.db import AsyncpgPoolDriver
from pgqueuer.models import Job
from pgqueuer.qm import QueueManager

def run_in_executor(executor: Executor, sem: asyncio.Semaphore | None = None):
    if sem is None:
        sem = contextlib.nullcontext()

    def wrapper(fn: Callable[[Job], None]):
        async def wrapped(job: Job):
            async with sem:
                await asyncio.get_running_loop().run_in_executor(executor, fn, job)

        return wrapped

    return wrapper

async def create_qm() -> QueueManager:
    connection = await asyncpg.create_pool("...")
    driver = AsyncpgPoolDriver(connection)
    qm = QueueManager(connection=driver)

    @qm.entrypoint(
        "heavy_cpu_func",
        concurrency_limit=8,
        retry_timer=timedelta(seconds=60),
        requests_per_second=3,
    )
    @run_in_executor(
        ThreadPoolExecutor(max_workers=32),
        asyncio.Semaphore(4),  # give you extra parallel control, for GPU maybe
    )
    def heavy_cpu_func(job: Job) -> None:
        print("run in process {} start".format(os.getpid()))
        time.sleep(10)
        print("run in process {} done".format(os.getpid()))

    return qm
janbjorge commented 5 days ago

A class attribute can be used to share process a pool between entrypoints. But i have to say i like your approach more.

janbjorge commented 1 day ago

@GCBallesteros Can I close the issue? Or do you have another questions/comments?