jasonrbriggs / stomp.py

“stomp.py” is a Python client library for accessing messaging servers (such as ActiveMQ or RabbitMQ) using the STOMP protocol (versions 1.0, 1.1 and 1.2). It can also be run as a standalone, command-line client for testing.
Apache License 2.0
492 stars 167 forks source link

Consumer only receiving 45 messages at a time #325

Closed rtista closed 4 years ago

rtista commented 4 years ago

Hello, I'm having a problem with a stomp.py 6.0.1 + Artemis 2.12.0 implementation. I created the following consumer class to provide a synchronous interface for my consuming entities:

class StompyMessageConsumer(MessageConsumer, stomp.ConnectionListener):
    """
    Retrieves messages from the queue using the stomp.py library.

    Args:
        stomp.ConnectionListener (class): Connection listener.
    """

    def __init__(self, conn, bqueue, prefetch=10):
        """
        Creates an instance of the queue listener.

        Args:
            conn: The connection to the broker system.
            bqueue: The broker queue from which to consume.
            prefetch: The prefetch value.
        """
        super().__init__(conn, bqueue, prefetch)
        self._localq = queue.Queue()
        self._conn.set_listener('QueueListener', self)

    def subscribe(self):
        """
        Subscribes to the queue.
        """
        self._conn.subscribe(
            destination=self._bqueue, id=uuid.uuid4().hex, ack='client-individual',
            headers={
                'subscription-type': 'ANYCAST',
                'consumer.prefetchSize': self._prefetch
            }
        )

    def get(self, timeout=1):
        """
        Retrieves a message from the queue.

        Args:
            timeout: The time in seconds to wait before returning None.

        Returns:
            tuple(headers, body): The broker message headers and body tuple.
        """
        return self._localq.get(block=True, timeout=timeout)

    def ack(self, message):
        """
        Acknowledge a received message.

        Args:
            message: The message object.
        """
        self._conn.ack(message)

    def nack(self, message):
        """
        Not-Acknowledge a received message.

        Args:
            message: The message object.
        """
        self._conn.nack(message)

    def on_connected(self, headers, body):
        """
        Subscribe on connection establishment.

        Args:
            headers (dict): The message headers.
            body (str): The message body.
        """
        self.subscribe()

    def on_message(self, headers, message):
        """
        Executes for each message received.

        Args:
            headers (dict): Message headers as a dict.
            message (str): The message body.
        """
        self._localq.put((headers, message))

It is then used like this:

# Create consumer
consumer = MessageConsumer(conn, self._logqueue, self._handler.buffersize())

# Subscribe
consumer.subscribe()

# Will gather several messages
messages = []

# Consumer Loop
while not self._stop:

    # Ignore when no message available
    with contextlib.suppress(queue.Empty):

        # Retrieve message from broker
        messages.append(consumer.get(timeout=3))

        # Fill buffer for bulk message handling
        if len(messages) < self._handler.buffersize():
            continue

    # If empty continue
    if not messages:
        continue

    # Handle logs by calling handler method
    try:
        self._handler.handle([json.loads(body) for _, body in messages])
    except Exception as e:
        logger.error(f'Caught {e.__class__}: {str(e)}')
        break

    # Acknowledge all the messages
    for h, _ in messages:
        consumer.ack(h['ack'])

    # Clear messages after ACK or NACK
    messages.clear()

The problem I'm facing is that the consumer only receives 45 messages at a time, although my prefetch limit is set to 1000 by using "consumer.prefetchSize" header when subscribing. This consumer was supposed to retrieve 1000 messages (there are 800K messages in queue) and bulk write them to a table into a database and it only receives 45 which makes it too slow... Does anyone know what might be happening?

rtista commented 4 years ago

I've found the answer to this, it was an Artemis server limitation, however, the documentation is still pretty scarce on these topics. Basically, there is a stompConsumerCredits variable which can be placed on STOMP acceptors in the Artemis broker.xml configuration which defines the maximum amount of messages in bytes which a consumer may receive before acknowledging them.

Once I modified the server configuration, it worked perfectly:

<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true;stompConsumerCredits=256000</acceptor>