apache / pulsar-client-python

Apache Pulsar Python client library
https://pulsar.apache.org/
Apache License 2.0
51 stars 39 forks source link

Segmentation fault for acknowledge_cumulative #114

Closed JeroenJansze1989 closed 1 year ago

JeroenJansze1989 commented 1 year ago

Describe the bug Trying to use the consumer acknowledge_cumulative(msg) functionality. The python shell stops with Segmentation fault (core dumped). Expected is to ack the messages until the current message in the stream or at least to have error handling available to the python interpreter.

To Reproduce 1、Test Conditions

2、Test code

import pulsar
from _pulsar import InitialPosition
from pulsar import Client, ConsumerBatchReceivePolicy, ConsumerType

pulsar_client = Client(pulsar_host, authentication=authentication_provider.get_authenticator()) # This is a AuthenticationToken 
consumer = pulsar_client.subscribe(
  topic=topic,
  subscription_name=consumer_id,
  initial_position=InitialPosition.Earliest,
  consumer_type=ConsumerType.KeyShared,
)

# Given some message on the topic
message = consumer.receive()
print(message.data()) # b'{"test": "test6"}'
message = consumer.receive()
print(message.data()) # b'{"test": "test8"}'

consumer.acknowledge_cumulative(message)

error log Starting shell with python3 -q -X faulthandler

Fatal Python error: Segmentation fault

Current thread 0x00007f22216e0000 (most recent call first):
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pulsar/__init__.py", line 1300 in acknowledge_cumulative
  File "<stdin>", line 1 in <module>

Extension modules: charset_normalizer.md, _cffi_backend (total: 2)
Segmentation fault (core dumped)
BewareMyPower commented 1 year ago

The cumulative acknowledgment cannot be used for Key_Shared subscription. I think we need to enhance the error processing at the Python client side. We should return an error instead of crash.


BTW, I cannot reproduce this issue with the following code with Python client 3.1.0 and Pulsar standalone 2.11.0.

from pulsar import Client, ConsumerType, InitialPosition

pulsar_client = Client('pulsar://localhost:6650')
topic = 'my-topic'

producer = pulsar_client.create_producer(topic=topic)
producer.send('msg-0'.encode())
producer.send('msg-1'.encode())
producer.close()

consumer = pulsar_client.subscribe(
    topic=topic,
    subscription_name='sub',
    initial_position=InitialPosition.Earliest,
    consumer_type=ConsumerType.KeyShared,
)

msg = consumer.receive()
print(f'{msg.data()} | {msg.message_id()}')
msg = consumer.receive()
print(f'{msg.data()} | {msg.message_id()}')

consumer.acknowledge_cumulative(msg)

pulsar_client.close()

Output:

b'msg-0' | (5,0,-1,-1)
b'msg-1' | (5,1,-1,-1)

The cumulative acknowledgment just fail without any exception thrown.

JeroenJansze1989 commented 1 year ago

Thanks for checking.

Improving the error handling python side sounds good and perhaps a docs update (https://pulsar.apache.org/api/python/3.1.x/pulsar.Consumer.html#acknowledge_cumulative).

On the reproducibility, perhaps the environment makes a difference, error was seen on Ubuntu 22.04, python 3.10.6

BewareMyPower commented 1 year ago

I just rechecked the code and found acknowledge_cumulative uses asynchronous version of the ACK API.

https://github.com/apache/pulsar-client-python/blob/cf4a9c0572e7232b315179f78c28d4c653f9119c/src/consumer.cc#L53

So no exception should be thrown even if it failed.

I tested the code here inside a ubuntu:22.04 container and only modified the service URL to pulsar://host.docker.internal:6650 and it worked well. The Python version is also 3.10.6.