apache / pulsar-client-python

Apache Pulsar Python client library
https://pulsar.apache.org/
Apache License 2.0
49 stars 38 forks source link

Cannot seek on partitioned topic #213

Open Samreay opened 2 months ago

Samreay commented 2 months ago

As per https://github.com/apache/pulsar/issues/3643, it seems pulsar should support seeking on partitioned topics.

This functionality seems partially missing in the C++ and python clients.

Reproduction

First, run a pulsar standalone instance:

docker run -it -p 6650:6650 -p 8080:8080 --tmpfs /pulsar/data apachepulsar/pulsar:3.1.0 bin/pulsar standalone

Ensure you have pulsar client and httpx dependencies, and then run the following code:

import httpx
import pulsar

# Make the topic above with a single partition
r = httpx.put("http://localhost:8080/admin/v2/persistent/public/default/example/partitions", json=1)
r.raise_for_status()

client = pulsar.Client("pulsar://localhost:6650")
consumer = client.subscribe("persistent://public/default/example", "sub")

# Seek to the latest message, raises OperationNotSupported
consumer.seek(pulsar.MessageId.latest)

Expected Behaviour

Seeking should work across partitions. A seek to latest should take you to latest across partitions. Seeking by a MessageId (which contains a partition number) should seek that partition.

Curiously, passing in an explicit integer timestamp doesn't raise this exception, which seems to disagree with the doco that seeking is just not supported.

Workarounds

Is there any way in the provided python API to seek on a per-partition basis as a workaround?

BewareMyPower commented 2 months ago

Seeking by a MessageId (which contains a partition number) should seek that partition.

It's not the expected behavior. In Java client, a MessageId that does not contain a topic name cannot be sought as well. Messages received from a multi-topics consumer contain the topic name so that it can be sought because the multi-topics consumer can find the internal consumer to seek. This implementation was introduced since Java client 3.0.0 in https://github.com/apache/pulsar/pull/19158

Is there any way in the provided python API to seek on a per-partition basis as a workaround?

Currently, you can seek to a very large timestamp to simulate seek(MessageId.latest).

Samreay commented 2 months ago

Hi @BewareMyPower - I'm not sure I follow, but are multi-topic consumers entangled in this discussion? In the example above, all I am doing is making a single topic and then trying to seek latest on it. Isn't that exactly the behaviour being discussed here: https://github.com/apache/pulsar/issues/3643#issuecomment-465903549

Currently, you can seek to a very large timestamp to simulate seek(MessageId.latest).

Will swap over to doing this in the interim, cheers :)

BewareMyPower commented 2 months ago

but are multi-topic consumers entangled in this discussion?

It's always confusing in Pulsar when topics and partitions are mentioned. I'd rather to mention "topic" as "non-partitioned topic" or "a partition of a partitioned topic". In Java client, a multi-topics consumer could also be created when the subscribed topic is a single partitioned topic.