Bogdanp / dramatiq

A fast and reliable background task processing library for Python 3.
https://dramatiq.io
GNU Lesser General Public License v3.0
4.37k stars 313 forks source link

dramatiq.composition.pipeline is not working as expected with reference to the documentation. #524

Open priyanshu-mayank-tech opened 1 year ago

priyanshu-mayank-tech commented 1 year ago

Issues

I think dramatiq.composition.pipeline is not working as expected. The output/result of first actor is not getting passed to second actor.

Checklist

What OS are you using?

macOS 13.1

What version of Dramatiq are you using?

dramatiq == 1.13.0

What did you do?

I tries to use the composition.pipeline([actor1.message(input), actor2.message()], broker=broker).run()

What did you expect would happen?

As per documentation, the output result/message from first actor i.e. actor1 should get passed to second actor i.e. actor2. But that is not happening here.

What happened?

But the output of first actor i.e. actor1 is not getting passed to second actor i.e. actor2.

synweap15 commented 1 year ago

Can you show a minimal reproducible example? Make sure you're not overriding the default middlewares, or if that's intended, add Pipelines middleware manually.

priyanshu-mayank-tech commented 1 year ago

Can you show a minimal reproducible example? Make sure you're not overriding the default middlewares, or if that's intended, add Pipelines middleware manually.

@synweap15 - Following is the code for your reference:

from dramatiq.middleware import Pipelines

from dramatiq.composition import pipeline
from dramatiq.brokers.rabbitmq import RabbitmqBroker
from dramatiq.middleware.retries import Retries
from dramatiq.results import Results
from dramatiq.results.backends import RedisBackend
from redis import Redis

result_backend = RedisBackend(client=Redis(host="redis", port=6379))
host = "rabbitmq"
port = 5672
user = "guest"
password = "guest"
rabbit_url = f"amqp://{user}:{password}@{host}:{port}"

broker = RabbitmqBroker(
    url=rabbit_url,
    middleware=[Retries(), Results(backend=result_backend), Pipelines()],)
dramatiq.set_broker(broker)
msg = {
    "artist_docs": {
        "type": "track",
    },
}

@dramatiq.actor()
def act_one(message):
    artist = "Myself"
    message["artist"] = artist
    return message

@dramatiq.actor(store_results=True)
def act_two(received_msg):
    sub_artist = "everyone"
    received_msg["sub_artist"] = sub_artist
    return received_msg

def main():
    pipe = pipeline([
        act_one.message(msg),
        act_two.message()
    ], broker=broker).run()

Error:

2023-01-10 11:54.16 [error    ] Received message for undefined actor 'act_one'. Moving it to the DLQ. 
[dramatiq.worker.ConsumerThread(default)]
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/dramatiq/worker.py", line 325, in handle_message
    actor = self.broker.get_actor(message.actor_name)
  File "/usr/local/lib/python3.10/dist-packages/dramatiq/broker.py", line 230, in get_actor
    raise ActorNotFound(actor_name) from None
dramatiq.errors.ActorNotFound: act_one

After this I tried replacing act_one.message(msg) with act_one(msg) only. In that case, I am getting the following error:

 AttributeError("'dict' object has no attribute 'options'")
synweap15 commented 1 year ago

Are the broker definition and actors definitions placed in the same file? If not, you need to start the worker also pointing to the file which contains the actor's definitions:

dramatiq [-h] [--processes PROCESSES] [--threads THREADS] [--path [PATH [PATH ...]]] [--queues [QUEUES [QUEUES ...]]] [--pid-file PID_FILE] [--log-file LOG_FILE] [--skip-logging] [--use-spawn] [--fork-function FORKS] [--version] [--verbose] broker [module [module ...]]

So eg. dramatiq broker actors, assuming you have the broker defined in broker.py and actors in actors.py

priyanshu-mayank-tech commented 1 year ago

As you can see from above code, both the actors & broker are defined in the same file.

broker = RabbitmqBroker(
    url=rabbit_url,
    middleware=[Retries(), Results(backend=result_backend), Pipelines()],)
dramatiq.set_broker(broker)