Bogdanp / dramatiq

A fast and reliable background task processing library for Python 3.
https://dramatiq.io
GNU Lesser General Public License v3.0
4.35k stars 312 forks source link

Dramatiq workers hangs very often, no logs were seen in logs even if it is verbose mode. #578

Open Harinib-Kore opened 1 year ago

Harinib-Kore commented 1 year ago

Issues

GitHub issues are for bugs. If you have questions, please ask them on the mailing list.

Checklist

What OS are you using?

Centos

What version of Dramatiq are you using?

Dramatiq version 1.14.2

What did you do?

Processing parallel async requests using dramatiq

What did you expect would happen?

It should work properly it should receive messages and tasks, but it is getting hanged.

What happened?

We are processing parallel async requests. It will work fine for 3 days and again the same issue occurs. If we restart the workers then it works , how to fix this

dramatiq dramatiq_app.BatchOperationsDramatiqTasks --processes 1 --verbose

I cant see anything in logs files. Even logs getting hanged

error which i captured when logs got hanged

Exception in thread Thread-1: Traceback (most recent call last): File "/data/py3.9.7/lib/python3.9/[threading.py](https://threading.py/)", line 973, in _bootstrap_inner [self.run](https://self.run/)() File "/data/py3.9.7/lib/python3.9/[threading.py](https://threading.py/)", line 910, in run self._target(*self._args, **self._kwargs) File "/data/py3.9.7/Findly/lib/python3.9/site-packages/dramatiq/[cli.py](https://cli.py/)", line 328, in watch_logs data = event.recv_bytes() File "/data/py3.9.7/lib/python3.9/multiprocessing/[connection.py](https://connection.py/)", line 221, in recv_bytes buf = self._recv_bytes(maxlength) File "/data/py3.9.7/lib/python3.9/multiprocessing/[connection.py](https://connection.py/)", line 426, in _recv_bytes return self._recv(size) File "/data/py3.9.7/lib/python3.9/multiprocessing/[connection.py](https://connection.py/)", line 384, in _recv chunk = read(handle, remaining) MemoryError

spumer commented 1 year ago

We got something similiar with --process 1 --threads 2. When Worker consume one queue and produce messages to another. We reproduce it yesterday and will investigate what happend actually

Quick fix was --process 1 --threads 1 UPD: we produce messages to another queue trough kombu and kombu.Connection not thread-safe. In that case you got infinity wait on _read, cause bot threads call _read and read different parts of incoming data and both can't construct frame and infinity wait for next data.

Alsheh commented 1 year ago

I've observed similar behavior when the CPU reaches 100%.

menezes- commented 1 year ago

I have the same issue with 1.15 (python 3.11) using the AsyncIO middleware. using --process 1 --threads 1 works

agamrp commented 1 year ago

I am seeing this as well running with --processes 4 --threads 1 running through django_dramatiq I'll try updating to 1 process and see how that goes. Also using dramatiq 1.5 (python 3.10)

spumer commented 4 months ago

I'm released thread-safe broker implemention https://github.com/spumer/dramatiq-kombu-broker/ We does not have hangs anymore with that

Harinib-Kore commented 4 months ago

I'm released thread-safe broker implemention https://github.com/spumer/dramatiq-kombu-broker/ We does not have hangs anymore with that

hey what combination work wells whether it is dramatiq kombu broker and redis as backend , or should i use both kombu

spumer commented 4 months ago

Works well with kombu + rabbitmq. Right now dramatiq-kombu-broker support only RabbitMQ

Harinib-Kore commented 4 months ago

Is there a way to check the current status of a Dramatiq task by its jobId? Specifically, to determine if it is still running or has completed? Also, is it possible to assign a custom name to the job, similar to a functionality available in Celery?

In Celery, you can achieve this with the following code:

celery_task = celery.AsyncResult(id=job_id, app=celery)
if celery_task.state != 'REVOKED':

My primary intention is to manage long-running tasks by being able to terminate them if necessary. How can I remove such tasks from the queue in Dramatiq?

spumer commented 4 months ago

Is there a way to check the current status of a Dramatiq task by its jobId?

We solve it by https://github.com/Bogdanp/django_dramatiq, it has Task model and middleware which write message status

Also, is it possible to assign a custom name to the job, similar to a functionality available in Celery?

You mean display_name in Celery? Where you expect to see that in Dramatiq? :) I think Actor name is enough

Harinib-Kore commented 4 months ago

Is there a way to check the current status of a Dramatiq task by its jobId?

We solve it by https://github.com/Bogdanp/django_dramatiq, it has Task model and middleware which write message status

Also, is it possible to assign a custom name to the job, similar to a functionality available in Celery?

You mean display_name in Celery? Where you expect to see that in Dramatiq? :) I think Actor name is enough

1) how to get process id of task with message id or any other parameter in dramatiq ?

2) I wanted to know how to revoke a process like

result.revoke()

AsyncResult(id).revoke()

app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed')

app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed', terminate=True)

app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed', terminate=True, signal='SIGKILL')

spumer commented 4 months ago

You need to implement manually. Dramatiq has no functionality to revoke task.

I think it can be done by implement like in celery: create queue for events and dispatch events by worker when done some action and listen for any other. Then you can implement "revoke event", and discard message when worker receive it

Harinib-Kore commented 3 months ago

Hi i am facing issue , can you tell me how to solve this @agamrp @spumer @Bogdanp

I'm issuing a ticket for the following Dramatiq configuration and error:

Actor Configuration

@dramatiq.actor(queue_name="batch_index_operations_queue", store_results=True)
def batch_index(request_payload):
    try:
        sa_gateway.batch_index_documents(request_payload)
        return True
    except Exception:
        debug_logger.error('Error in creating an async task for Batch Index')
        debug_logger.error(traceback.format_exc())
        return False

Dramatiq Setup

def setup_dramatiq():
    redis_url = get_redis_connection_url()
    redis_broker = RedisBroker(url=redis_url)
    dramatiq.set_broker(redis_broker)
    backend = RedisBackend(url=redis_url)
    redis_broker.add_middleware(Results(backend=backend))

CrawlerDramatiq Configuration

{ "name": "CrawlerDramatiq", "script": "/bin/bash", "args": "-c 'cd /data/www/Findly/search && source /data/sa_py3.9.7/Findly/bin/activate && dramatiq dramatiq_app.CrawlDramatiqTasks --processes 4'", "autorestart": true, "log_file": "/data/logs/findly/crawler_dramatiq.log", "watch": false }

Error

Exception in thread Thread-1: Traceback (most recent call last): File "/data/sa_py3.9.7/lib/python3.9/threading.py", line 973, in _bootstrap_inner self.run() File "/data/sa_py3.9.7/lib/python3.9/threading.py", line 910, in run self._target(*self._args, **self._kwargs) File "/data/sa_py3.9.7/Findly/lib/python3.9/site-packages/dramatiq/cli.py", line 337, in watch_logs log_file.write(data + "\n") MemoryError

Issue

I am encountering a MemoryError when running the Dramatiq task. How can this be resolved?