apache / pulsar-client-python

Apache Pulsar Python client library
https://pulsar.apache.org/
Apache License 2.0
49 stars 38 forks source link

Negative acknowledge doesn't work with message_id #178

Closed Samreay closed 6 months ago

Samreay commented 6 months ago

The docs state:

    def negative_acknowledge(self, message):
        """
        Acknowledge the failure to process a single message.

        When a message is "negatively acked" it will be marked for redelivery after
        some fixed delay. The delay is configurable when constructing the consumer
        with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.

        This call is not blocking.

        Parameters
        ----------

        message:
            The received message or message id.
        """

Specifically, that you can pass either the message or the message id into the function. However, I don't believe this is correct, and passing the message id does not work. To reproduce this issue, first run a local pulsar standalone instance, via

docker run -it -p 6650:6650 -p 8080:8080 --tmpfs /pulsar/data apachepulsar/pulsar:3.1.0 bin/pulsar standalone

And then run the following code:

import pulsar

client = pulsar.Client("pulsar://localhost:6650")
producer = client.create_producer("persistent://public/default/tmp_example")
consumer = client.subscribe("persistent://public/default/tmp_example", "sub", negative_ack_redelivery_delay_ms=10)
producer.send(b"hello")
while True:
    msg = consumer.receive()
    message_id = msg.message_id()
    print(msg, message_id)
    # consumer.negative_acknowledge(msg) # This works and causes redelivery
    consumer.negative_acknowledge(msg.message_id())  # This does not cause redelivery

When you pass in the message_id, no redeliveries occur. When you comment out the last line and swap to passing in the message as a whole, redeliveries occur.

Why using the message id matters: Our services consume the pulsar messages and convert them into non-pulsar structures (like pydantic objects or other python classes). We attach the message id to those objects so we can ack/nack them as needed without storing the entire original pulsar message

I suspect the general acknowledge method has the same bug in it.

BewareMyPower commented 6 months ago

Yeah it's a bug that acknowledge also has the same issue.

https://github.com/apache/pulsar-client-python/blob/bf8524a858eae1eced14a283bbe7dffdee534742/pulsar/__init__.py#L1508

We should check more types here.

BewareMyPower commented 6 months ago

Oh, it's not the issue with type checking. It's a regression that was introduced by https://github.com/apache/pulsar-client-python/pull/121.

image

I will push a fix ASAP and include it in the incoming 3.4.0 release.