Open wintonzheng opened 1 week ago
For high-performance task management and asynchronous processing in Python, you can use specialized frameworks and tools that are designed for concurrency and scalability. Here are a few high-performance options:
Dramatiq is a fast and reliable distributed task processing library for Python. It's designed to handle a high volume of tasks with low latency.
Installation:
pip install dramatiq redis
Setup with FastAPI:
worker.py:
import dramatiq
from dramatiq.brokers.redis import RedisBroker
redis_broker = RedisBroker(host="localhost", port=6379)
dramatiq.set_broker(redis_broker)
@dramatiq.actor
def long_running_task(param1, param2):
# Your long-running task here
...
main.py:
from fastapi import FastAPI
from worker import long_running_task
app = FastAPI()
@app.post("/start-task/")
async def start_task(param1: str, param2: str):
long_running_task.send(param1, param2)
return {"message": "Task started"}
Start the Dramatiq worker:
dramatiq worker
RQ (Redis Queue) is a simple Python library for queueing jobs and processing them in the background with workers.
Installation:
pip install rq
Setup with FastAPI:
worker.py:
import time
from redis import Redis
from rq import Queue
redis_conn = Redis()
q = Queue(connection=redis_conn)
def long_running_task(param1, param2):
# Your long-running task here
time.sleep(10) # Simulating a long task
...
main.py:
from fastapi import FastAPI
from worker import q, long_running_task
app = FastAPI()
@app.post("/start-task/")
async def start_task(param1: str, param2: str):
q.enqueue(long_running_task, param1, param2)
return {"message": "Task started"}
Start the RQ worker:
rq worker
Dask is a flexible parallel computing library for analytics.
Installation:
pip install dask distributed
Setup with FastAPI:
worker.py:
from dask.distributed import Client, fire_and_forget
client = Client()
def long_running_task(param1, param2):
# Your long-running task here
...
main.py:
from fastapi import FastAPI
from dask.distributed import Client
from worker import long_running_task
client = Client()
app = FastAPI()
@app.post("/start-task/")
async def start_task(param1: str, param2: str):
future = client.submit(long_running_task, param1, param2)
fire_and_forget(future)
return {"message": "Task started"}
Start the Dask scheduler and workers:
dask-scheduler
dask-worker <scheduler-address>
All these frameworks are designed for high-performance and scalable task processing. Depending on your specific requirements and environment, you can choose the one that best fits your needs. Dramatiq and RQ are simpler and great for straightforward background tasks, while Dask is more powerful and suitable for complex computational tasks.
@TheSnowGuru Thanks a lot. Listing two other tools I've learnt recently: https://hatchet.run/ https://temporal.io/
When a task is running, Skyvern's api service hangs and blocks incoming requests until the running task is completed.