gmr / rabbitpy

A pure python, thread-safe, minimalistic and pythonic RabbitMQ client library
http://rabbitpy.readthedocs.org
BSD 3-Clause "New" or "Revised" License
242 stars 58 forks source link

40ms acknowledgement delay for first Basic.Publish package in every channel #116

Closed ghost closed 5 years ago

ghost commented 5 years ago

I observe a strange behavior of the RabbitMQ server. After opening a channel and publishing a message (Basic.Publish), the server sends a acknowledgement packet 40ms after the first received packet of Basic.Publish.

Wireshark dump (image): selection_001

Whireshark dump (csv source): dump.txt

The problem is aggravated by the fact that if a channel is opened/closed on a connection, the next publication (Basic.Publish) of the message (through all open channels) will have a 40ms delay.

Our system uses a large number of channels per connection and this behavior negates performance.

Questions:

  1. What are the reasons (my error in operation)?
  2. Ability to configure timeouts?

Below is the minimum working code example and information of the system.

import threading
import time

import rabbitpy as rbp

def publish(conn: rbp.Connection):
    with conn.channel() as ch:
        rbp.Queue(ch, 'queue').declare()

        rbp.Message(ch, 'Message').publish('', 'test_queue')
        time.sleep(1)
        rbp.Message(ch, 'Message').publish('', 'test_queue')

def main():
    with rbp.Connection(url='amqp://guest:guest@localhost:5672/%2F') as cn:
        threads = [threading.Thread(target=publish, args=(cn,)) for _ in range(2)]

        [t.start() for t in threads]

        [t.join() for t in threads]

if __name__ == '__main__':
    main()

System Information:

Linux Rokker-PC 4.15.0-38-generic #41~16.04.1-Ubuntu SMP Wed Oct 10 20:16:04 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux

rabbitmqctl status:

Status of node 'rabbit@Rokker-PC' ... [{pid,31556}, {running_applications, [{rabbitmq_management,"RabbitMQ Management Console","3.5.7"}, {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.5.7"}, {webmachine,"webmachine","1.10.3-rmq3.5.7-gite9359c7"}, {mochiweb,"MochiMedia Web Server","2.7.0-rmq3.5.7-git680dba8"}, {rabbitmq_management_agent,"RabbitMQ Management Agent","3.5.7"}, {rabbit,"RabbitMQ","3.5.7"}, {inets,"INETS CXC 138 49","6.4.2"}, {mnesia,"MNESIA CXC 138 12","4.15.1"}, {os_mon,"CPO CXC 138 46","2.4.3"}, {amqp_client,"RabbitMQ AMQP Client","3.5.7"}, {xmerl,"XML parser","1.3.15"}, {sasl,"SASL CXC 138 11","3.1"}, {stdlib,"ERTS CXC 138 10","3.4.2"}, {kernel,"ERTS CXC 138 10","5.4"}]}, {os,{unix,linux}}, {erlang_version, "Erlang/OTP 20 [erts-9.1] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:64] [hipe] [kernel-poll:true]\n"}, {memory, [{total,42966544}, {connection_readers,0}, {connection_writers,0}, {connection_channels,0}, {connection_other,5680}, {queue_procs,13192}, {queue_slave_procs,0}, {plugins,516864}, {other_proc,13645040}, {mnesia,62016}, {mgmt_db,204520}, {msg_index,36384}, {other_ets,1075672}, {binary,14568}, {code,17874309}, {atom,752561}, {other_system,8765738}]}, {alarms,[]}, {listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]}, {vm_memory_high_watermark,0.4}, {vm_memory_limit,13445326438}, {disk_free_limit,50000000}, {disk_free,141350633472}, {file_descriptors, [{total_limit,65436}, {total_used,3}, {sockets_limit,58890}, {sockets_used,1}]}, {processes,[{limit,1048576},{used,183}]}, {run_queue,0}, {uptime,657}]

Thanks!

P.S In the example I use the rabbitpy library. But the behavior is observed in another library - the pika. But judging by the network traffic is not a problem of libraries.

michaelklishin commented 5 years ago

Please have some decency and post your findings into this issue instead of just dumping a link.

Your code opens N channels before publishing concurrently and this client has a per-connection lock around channel.open operations. So all threads in your code contend on that lock and possibly more (I couldn't quickly find where published message framing happens: clients often use a per-connection lock for serialised outgoing frames as well).

Pre-allocating channels will avoid contention on the channel management lock.

ghost commented 5 years ago

While I do not understand how multithreading affects. If you do not use threads then the behavior is no different.

import rabbitpy as rbp

def main():
    with rbp.Connection(url='amqp://guest:guest@localhost:5672/%2F') as cn:
        with cn.channel() as ch:
            rbp.Queue(ch, 'queue').declare()

            rbp.Message(ch, 'Message').publish('', 'test_queue')  # 40ms delay

            cn.channel().close()  # Any works with channel on that connection and next Publish will with delay

            rbp.Message(ch, 'Message').publish('', 'test_queue')  # 40ms delay too

if __name__ == '__main__':
    main()

Pre-allocation channels will avoid but only in one case, when all channels are pre-allocated if one more opens publications on all (pre-allocated) others will be late.

lukebakken commented 5 years ago

IMHO this is not an issue in rabbitpy nor RabbitMQ itself and #116 should be closed. See this comment.

gmr commented 5 years ago

Closing per rabbitmq-server #1763