qweeze / rstream

A Python asyncio-based client for RabbitMQ Streams
MIT License
82 stars 13 forks source link

Performances #30

Open Gsantomaggio opened 1 year ago

Gsantomaggio commented 1 year ago

Test the performance and evaluate a way to aggregate the messages before sending the messages.

DanielePalaia commented 1 year ago

I made few tests running also the profiler pyroscope on top of it.

The tests was quite simple:

       MESSAGES = 100
       list_messages = []
        t0 = time.process_time()
        #print ("Sending " + str(MESSAGES) + " simple messages")
        for j in range(10000):
            for i in range(MESSAGES):
                # sending basic messages
                #amqp_message = AMQPMessage(
                #    body='hello:'
                #)
                amqp_message = b'hello'          
                list_messages.append(amqp_message)
            await producer.publish_batch('mixing', list_messages, False)
            list_messages.clear()
        t3 = time.process_time()
        print("time elapsed after publish_batch" + str(t3-t0))

Sending 1 milions of messages with publish_batch. First attempt creating an AMQPMessage. This was very slow like 11 seconds. Just sending in binary format instead amqp_message = b'hello' it seems like we gain at least 5/6 seconds as the computational time go down to 6 seconds.

From peryscope profiler it seems like the function which take most time are the indeed the bytes at amqp.py:21 when we run the first version of the test and the function encode_struct encoding.py:98 which takes 3.5 seconds in total (cumulative test)

qweeze commented 1 year ago

Hi @DanielePalaia, have you tried py-spy profiler? It can output nice flamegraph visualizations of stack traces by the time they take

Also it would be great if your could post a flamegraph report to this thread

DanielePalaia commented 1 year ago

Hi @qweeze This link when I send 1 milion of AMQPMessage (part commented) https://flamegraph.com/share/a44afb24-a3d2-11ed-b6df-aec0fc79c33f This second link when I send 1 milion of binary messages amqp_message = b'hello' https://flamegraph.com/share/6fb388e7-a3d2-11ed-b6df-aec0fc79c33f

This is the output from from pyroscope. I can try also to use [py-spy] if you thinks is better.

Rperry2174 commented 1 year ago

hi @DanielePalaia someone in the Pyroscope slack channel let us know about this issue -- just wanted to say a few things FYI

  1. Excited to see use of Flamegraph.com :)
  2. We use py-spy under the hood for our python integration (with some added functionality)
  3. you can also use flamegraph.com and the api there to get a diff between two flamegraphs:
curl 'https://flamegraph.com/api/render/v1/a44afb24-a3d2-11ed-b6df-aec0fc79c33f' > f1.json
curl 'https://flamegraph.com/api/render/v1/6fb388e7-a3d2-11ed-b6df-aec0fc79c33f' > f2.json

and then if you upload them to flamegraph.com results in either:

Hope this helps! Feel free to let us know if there's any functionality that you want/need that doesn't exist

qweeze commented 1 year ago

Didn't know about https://flamegraph.com, very cool :sweat_smile:

Thanks @DanielePalaia, seems that uamqp and encoding takes quite some time indeed

Also I made a couple of quick and dirty optimizations to the encoding module in this branch. I got ~50% speedup on a test like this

frame = schema.Publish(publisher_id=1, messages=[schema.Message(publishing_id=123, data=b"hello")])
for _ in range(100_000):
    encode_frame(frame)

I'm not an expert on performance, but I doubt that it's possible to radically improve performance using only pure python for low-level stuff like binary protocol encoding/decoding. If speed is a concern maybe we should consider alternative ways, such as using cython or c / rust extension

DanielePalaia commented 1 year ago

@qweeze Thank you for your improvements, I will have a look!

DanielePalaia commented 1 year ago

Using the test-encoding-optimization branch with the previous example is generating an exception:

Exception in callback BaseClient.start_task.<locals>.on_task_done(<Task finishe...sResponse'>")>) at /Users/dpalaia/projects/rabbitmq-stream-mixing/python/venv/lib/python3.10/site-packages/rstream/client.py:93
handle: <Handle BaseClient.start_task.<locals>.on_task_done(<Task finishe...sResponse'>")>) at /Users/dpalaia/projects/rabbitmq-stream-mixing/python/venv/lib/python3.10/site-packages/rstream/client.py:93>
Traceback (most recent call last):
  File "/Users/dpalaia/projects/rabbitmq-stream-mixing/python/venv/lib/python3.10/site-packages/rstream/encoding.py", line 179, in decode_frame
    frame = _decode_struct(buf, cls)
  File "/Users/dpalaia/projects/rabbitmq-stream-mixing/python/venv/lib/python3.10/site-packages/rstream/encoding.py", line 162, in _decode_struct
    data[f.name] = _decode_field(buf, fld_tp)
  File "/Users/dpalaia/projects/rabbitmq-stream-mixing/python/venv/lib/python3.10/site-packages/rstream/encoding.py", line 149, in _decode_field
    raise NotImplementedError(f"Unexpected type {tp}")
NotImplementedError: Unexpected type typing.ClassVar[rstream.constants.Key]

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/dpalaia/projects/rabbitmq-stream-mixing/python/venv/lib/python3.10/site-packages/rstream/client.py", line 95, in on_task_done
    task.result()
  File "/Users/dpalaia/projects/rabbitmq-stream-mixing/python/venv/lib/python3.10/site-packages/rstream/client.py", line 164, in _listener
    frame = await self._conn.read_frame()
  File "/Users/dpalaia/projects/rabbitmq-stream-mixing/python/venv/lib/python3.10/site-packages/rstream/connection.py", line 73, in read_frame
    return decode_frame(await self._read_frame_raw())
  File "/Users/dpalaia/projects/rabbitmq-stream-mixing/python/venv/lib/python3.10/site-packages/rstream/encoding.py", line 182, in decode_frame
    raise ValueError(f"Could not decode {cls!r}") from e
ValueError: Could not decode <class 'rstream.schema.PeerPropertiesResponse'>
^CTraceback (most recent call last):
  File "/Users/dpalaia/projects/rabbitmq-stream-mixing/python/python_rstream/producer.py", line 44, in <module>
    asyncio.run(publish())
  File "/usr/local/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/base_events.py", line 636, in run_until_complete
    self.run_forever()
  File "/usr/local/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/base_events.py", line 603, in run_forever
    self._run_once()
  File "/usr/local/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/base_events.py", line 1868, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/local/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/selectors.py", line 562, in select
    kev_list = self._selector.control(None, max_ev, timeout)

Will try to have a look!

qweeze commented 1 year ago

@DanielePalaia sorry I accidentally broke decoding in this branch, now fixed in 28dd87dcc85fabaf4619b91e5252e970b13a5132

DanielePalaia commented 1 year ago

Hi @qweeze I made a few tests on these new improvements. I was able to gain a few seconds of improvements (like 15%/20% impromevements).

On 3 milions of messages sent in binary: From 15seconds to 12seconds with a publish_batch

And sent in amqp format From 55seconds to 48 seconds.

I think is a nice improment anyway. Would it be possible to make an official PR that we can review and maybe merge?

Gsantomaggio commented 1 year ago

This PR https://github.com/qweeze/rstream/pull/100 increases the performances a bit.

DanielePalaia commented 1 year ago

To check https://github.com/qweeze/rstream/issues/127, https://github.com/qweeze/rstream/issues/132, https://github.com/qweeze/rstream/issues/133 for possibile improvements