apache / pulsar-client-python

Apache Pulsar Python client library
https://pulsar.apache.org/
Apache License 2.0
49 stars 38 forks source link

Producer callback is unable to utilise python raise to propagate errors #184

Closed Samreay closed 3 months ago

Samreay commented 6 months ago

To reproduce:

  1. Start a pulsar standalone instance, via
docker run -it -p 6650:6650 -p 8080:8080 --tmpfs /pulsar/data apachepulsar/pulsar:3.1.0 bin/pulsar standalone
  1. Paste the following code into a file:
import time

import pulsar

def callback(result: pulsar.Result, message_id: pulsar.MessageId):
    if result == pulsar.Result.Timeout:
        raise ValueError()

client = pulsar.Client("pulsar://localhost:6650")
producer = client.create_producer("topic-example", send_timeout_millis=1)
producer.send_async(b"hello", callback)
time.sleep(1)
  1. Run the code. You should see something akin to:
...
2023-12-27 14:23:19.550 INFO  [139722145527360] ProducerImpl:209 | [persistent://public/default/topic-example, ] Created producer on broker [127.0.0.1:56868 -> 127.0.0.1:6650] 
terminate called after throwing an instance of 'pybind11::error_already_set'
  what():  ValueError: <EMPTY MESSAGE>

At:
  /home/sam/arenko/service-utils/tmp.py(8): callback

[1]    53944 IOT instruction  /home/sam/arenko/service-utils/.venv/bin/python 

Something about pybind and the C++ code means that errors cannot be raised in the callback function, which is obviously an issue for propagating errors up the python callstack. This might be because the callback function exists independently of the async event loop that it should be using, and so there's nowhere for the error to go. A try, except clause around everything won't work either, because the terminate there is a forceful termination, the entire app is dead in an instant.

Expected behaviour: The callback function acts as a standard python function with normal exception handling.

BewareMyPower commented 6 months ago

You should not raise an exception in the callback because the callback is called in an internal thread.

Expected behaviour: The callback function acts as a standard python function with normal exception handling.

If it's allowed to raise an exception in the callback, where will you expect to handle this exception?

Samreay commented 6 months ago

Can this be documented at the very least then? Having a hidden "do this and your app will terminate" with no warning isn't ideal for an end user :) Additionally, doco on pulsar.Result types would be great, I struggled to handle this.

If you mean in terms of what should you see in the stack if the callback raises, I'd expect the send_async to be the root cause with the callback underneath it. If you mean in what execution context should a python async callback raise exceptions, that would be the event loop in general.

The goal, IMHO, is to get the async producer send as close to pythons standard library async as possible. Pythonically, this means you'd have something like:

try:
    result = await producer.send_async(b"hello")
except:
   catch issues here

For a more complete example, this is my current workaround (well, a simplified version of it), which uses janus Queues (ie queues with sync and async interfaces) to convert the callback function into actual python async:

import asyncio
from collections.abc import Awaitable

import janus
import pulsar

async def better_send(producer: pulsar.Producer, data: bytes) -> Awaitable[pulsar.MessageId]:
    queue = janus.Queue()

    def callback(result: pulsar.Result, message_id: pulsar.MessageId):
        queue.sync_q.put((result, message_id))

    producer.send_async(data, callback)
    result, message_id = await queue.async_q.get()
    if result == pulsar.Result.Timeout:
        raise TimeoutError()
    elif result != pulsar.Result.Ok:
        raise ValueError(str(result))
    return message_id

async def main():
    client = pulsar.Client("pulsar://localhost:6650")
    producer = client.create_producer("topic-example", send_timeout_millis=1)
    await better_send(producer, b"hello")

if __name__ == "__main__":
    asyncio.run(main())

Running this gives you a pythonic exception which doesn't crash the application, and has proper asyncio support.

Traceback (most recent call last):
  File "/home/sam/arenko/service-utils/tmp.py", line 30, in <module>
    asyncio.run(main())
  File "/home/sam/.pyenv/versions/3.11.4/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/home/sam/.pyenv/versions/3.11.4/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/sam/.pyenv/versions/3.11.4/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/home/sam/arenko/service-utils/tmp.py", line 26, in main
    await better_send(producer, b"hello")
  File "/home/sam/arenko/service-utils/tmp.py", line 17, in better_send
    raise TimeoutError()
TimeoutError
BewareMyPower commented 6 months ago

I got your point now. Unfortunately, the Pulsar Python client is not well integrated with Python coroutines. It's a callback-based solution rather than coroutine. As you can see, the send_async method is not a method that can be await-ed.

Therefore, you need a traditional way to handle it just like your workaround that queues the results.

BTW, I just found you have already checked the similar discussion: https://github.com/apache/pulsar-client-python/issues/55.

Can this be documented at the very least then?

Sure.

Additionally, doco on pulsar.Result types would be great

I agree. I opened another issue for it: https://github.com/apache/pulsar-client-python/issues/185

Samreay commented 6 months ago

Yeah its a thorny issue here. There is the option to add async support similar to how I've done it in the official client...

But ultimately, I worry this would just cause even more issues with communication with the underlying C++ client, and this probably brings the discussion back to "Should the python client be rewritten (but a vague 'someone') in python to use standard language features?"

BewareMyPower commented 6 months ago

this would just cause even more issues with communication with the underlying C++ client

The root cause is that the C++ client (or the callback-based solution) does not provide a way for users to specify which thread to run the callback. What's worse, which thread calls the callback is a black box to users. Here is a similar issue I found: https://github.com/apache/pulsar-client-cpp/issues/368

The Python wrapper does not do some complicated work. Ideally, it should avoid user-provided callbacks being passed to the underlying C++ APIs directly, just like what the Node.js wrapper did.

merlimat commented 6 months ago

@Samreay The question is also for which reason you are using send_async() instead of the plain send.

While the send is blocking, it won't be holding the GIL, so other Python threads can make progress.

Samreay commented 6 months ago

The workaround I have is more advanced than the one I posted above, and has proper concurrency when async sending multiple messages (unlike my example which sends one message at a time), and by having an awaitable task I can choose to wait for its completion via await or to just throw it into the event loop via async.create_task(better_send(...)).

Samreay commented 6 months ago

A bit more context:

Right now, if I have 100 messages to send and want actual error handling to exist, I either use my workaround or have to spend most of my time blocked using send because it seems the python client doesnt support batch sending by producers outside of send_async. I would be very happy to be wrong on this though and being able to sync send a batch of messages would be great!

I have a batch consume on the same topic I want to produce in with another service, but using send_sync and each message being serial means instead of getting a single nice batch consumption on the end, I get numerous consumptions

BewareMyPower commented 6 months ago

Did you mean the batch receive? Currently there is no good way in Pulsar to receive a batch as a whole. There is a batchReceive API in Java client and C++ client but both of them just receive message one by one and group them into an iterable batch.

It's because the raw format of a batch in Pulsar is not a trivial concatenation of message buffers. The MessageMetadata and SingleMessageMetadata protobuf structures are involved.

Samreay commented 6 months ago

So one of the goals for getting async working properly for my use case was that is seems the python producer can do batching but only with send_async and not send.

As an aside: it'd be great to know the batch send messages and bytes defaults and whether this should be set to 0 or -1 to be unlimited and just batch off elapsed time, the doco for all those kwargs just says "Undocumented" https://pulsar.apache.org/api/python/3.3.x/pulsar.Client.html#create_producer

So every half an hour, we have a clearing action on a market, and we get a sudden and instantanous flood of messages from a third party. We turn these into pulsar messages, and want to batch send them out. Similarly, we want to batch read those messages in another service. Right now we dont use any of the batching functionality in the python consumer, because we need the interface between consumer and readers to be identical, and batching doesnt seem to be part of the reader implementation, just the consumer.

This means our pulsar wrapper just consumes messages one by one, with a little timeout after which all the messages its consumed get passed back to the function allocated to process the messages.

image

This all is just flavouring to answer that initial question of "Why not just use send instead of send_async?", and to boil the answer back its because send blocks and if I get 10k messages at once, sending them one by one isn't ideal, and sending them via send_async without ability to error handle is also not ideal, hence the current workaround. Hope that helps explain the use cases :)

BewareMyPower commented 6 months ago

it'd be great to know the batch send messages and bytes defaults and whether this should be set to 0 or -1 to be unlimited and just batch off elapsed time

Yeah these documents are missed, we should add them. For now, you can refer the C++ client documents here for all setBatchXXX methods. In short, 0 means no limit. (I'm not sure about the behavior of setting with -1 because of the signed and unsigned issue)

Here is a related issue: https://github.com/apache/pulsar-client-python/issues/187


BTW, backing to the issue title. sing a callback just forces users to pass the errors in another thread to process, which is generally implemented via a concurrent queue.

Regarding to your concern before:

But ultimately, I worry this would just cause even more issues with communication with the underlying C++ client

I think it's okay to add such APIs just like your workaround. The underlying C++ client calls the send callback (converted from a Python function via Pybind11) in Boost.Asio's event loop, which just works like asyncio's event loop.

My brainstorm:

producer = client.create_producer(topic)
coros = list()
for i in range(10):
    try:
        coros.append(producer.send_async("msg"))
    except pulsar.QueueIsFull as e:
        break
try:
    msg_ids = await asyncio.gather(*coros) # or wait here
except pulsar.PulsarException as e:
    # ...
except Exception as e:
    # ...
BewareMyPower commented 6 months ago

FYI, I'm going to support async-await style APIs, see https://github.com/apache/pulsar-client-python/issues/55#issuecomment-1871190583

merlimat commented 6 months ago

@Samreay The way to achieve what you're describing with the current client is like:

producer = client.create_producer(topic, block_if_queue_full=True)

for i in range(100):
    producer.send_async("msg")

producer.flush()

flush will throw exception if any of the send async operations fail

Samreay commented 6 months ago

Thanks @merlimat, I didn't realise flush would raise errors like that - very handy and I'll add this into my code to try and increase its robustness :)