majek / puka

Puka - the opinionated RabbitMQ client
https://github.com/majek/puka
Other
181 stars 34 forks source link

basic_qos how to #74

Open jpmeijers opened 3 years ago

jpmeijers commented 3 years ago

It's a little unclear how basic_qos should be used. I currently have the following code:

def connect_and_receive(sub_to_process_queue, amqp_user, amqp_password, amqp_host, amqp_port, amqp_exchange_sub, amqp_queue_sub):
    # declare send and receive clients, both connecting to the same server on local machine
    subscribe_dsn = "amqp://{}:{}@{}:{}/".format(
                amqp_user, amqp_password, amqp_host, amqp_port
            )
    print("Subscribing to server:", subscribe_dsn)
    consumer = puka.Client(subscribe_dsn)

    # connect receiving party
    receive_promise = consumer.connect()
    consumer.wait(receive_promise)

    # Define receiving exchange and queue
    receive_promise = consumer.exchange_declare(exchange=amqp_exchange_sub, type='fanout', durable=True)
    consumer.wait(receive_promise)
    receive_promise = consumer.queue_declare(queue=amqp_queue_sub, durable=True, exclusive=False, auto_delete=False)
    consumer.wait(receive_promise)
    receive_promise = consumer.queue_bind(exchange=amqp_exchange_sub, queue=amqp_queue_sub)
    consumer.wait(receive_promise)

    # start waiting for messages
    receive_promise = consumer.basic_consume(queue=amqp_queue_sub, no_ack=False)
    consumer.basic_qos(receive_promise, prefetch_count=10)

    while True:
        message = consumer.wait(receive_promise)
        # print("Resc", end=" ")
        handle_message(message, sub_to_process_queue)
        consumer.basic_ack(message)

But when I run it I get an Exception:

transform_1             | Exception in thread Thread-1:
transform_1             | Traceback (most recent call last):
transform_1             |   File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
transform_1             |     self.run()
transform_1             |   File "/usr/local/lib/python3.8/threading.py", line 870, in run
transform_1             |     self._target(*self._args, **self._kwargs)
transform_1             |   File "/app/subscribe.py", line 29, in connect_and_receive
transform_1             |     message = consumer.wait(receive_promise)
transform_1             |   File "/app/src/puka/puka/connection.py", line 377, in wait
transform_1             |     self.on_read()
transform_1             |   File "/app/src/puka/puka/connection.py", line 191, in on_read_nohandshake
transform_1             |     self._handle_read(data, offset)
transform_1             |   File "/app/src/puka/puka/connection.py", line 224, in _handle_frame_read
transform_1             |     self.channels.channels[channel].inbound_method(frame)
transform_1             |   File "/app/src/puka/puka/channel.py", line 80, in inbound_method
transform_1             |     self._handle_inbound(frame)
transform_1             |   File "/app/src/puka/puka/channel.py", line 106, in _handle_inbound
transform_1             |     self.promise.recv_method(result)
transform_1             |   File "/app/src/puka/puka/promise.py", line 87, in recv_method
transform_1             |     callback = self.methods[result.method_id]
transform_1             | KeyError: 3932171
transform_1             | 
transform_1             | Thread stopped

How exactly should I be using the basic_qos function?

majek commented 3 years ago

apologies, I havent looked at the code for years. Perhaps someone on rabbitmq-users could help https://groups.google.com/g/rabbitmq-users