php-amqplib / php-amqplib

The most widely used PHP client for RabbitMQ
http://www.rabbitmq.com/getstarted.html
GNU Lesser General Public License v2.1
4.47k stars 1.03k forks source link

Multi-consumer strange behavior for one connection #760

Closed stepanets closed 4 years ago

stepanets commented 4 years ago

Version v2.9.2

Description
I have one connection and array of callbacks (consumers). I iterate callbacks, take channel from connection, consume current callback for given channel and add channel to channels array. After that I iterate channels array and call non blocking wait and only the first callback reacts on message (there are hundreds messages in queue). I tryed to close channel form the first callback, after that another callbacks were started to receive messages.

How to reproduce
Test publisher code

<?php declare(strict_types=1);

use PhpAmqpLib\Connection\AMQPSocketConnection;
use PhpAmqpLib\Message\AMQPMessage;

require_once __DIR__.'/vendor/autoload.php';

$connection = new AMQPSocketConnection('rabbit', 5672, 'guest', 'guest');

$ch = $connection->channel();
$ch->exchange_declare('x-test', 'direct', false, true, false);
$ch->queue_declare('q-test', false, true, false, false);
$ch->queue_bind('q-test', 'x-test', 'key');

for ($i = 0; $i < 100; ++$i) {
    $ch->basic_publish(
        new AMQPMessage(
            "Test msg #$i",
            [
                'delivery_mode' => 2,
                'content_type' => 'plain/text',
                'message_id' => $i,
                'timestamp' => time(),
            ]
        ),
        'x-test',
        'key'
    );
}

Test consumer code

<?php declare(strict_types=1);

use PhpAmqpLib\Connection\AMQPSocketConnection;
use PhpAmqpLib\Message\AMQPMessage;

require_once __DIR__.'/vendor/autoload.php';

$connection = new AMQPSocketConnection('rabbit', 5672, 'guest', 'guest');

$ch = $connection->channel();
$ch->exchange_declare('x-test', 'direct', false, true, false);
$ch->queue_declare('q-test', false, true, false, false);
$ch->queue_bind('q-test', 'x-test', 'key');

$consumers = [
    static function(AMQPMessage $msg) {
        echo 'Consumer #1 received ', $msg->body, "\n";
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    },
    static function(AMQPMessage $msg) {
        echo 'Consumer #2 received ', $msg->body, "\n";
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    },
    static function(AMQPMessage $msg) {
        echo 'Consumer #3 received ', $msg->body, "\n";
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    },
];

$channels = [];
foreach ($consumers as $consumer) {
    $ch = $connection->channel();
    echo "Channel #{$ch->getChannelId()} received\n";
    $tag = $ch->basic_consume('q-test', '', false, false, false, false, $consumer);
    $channels[$tag] = $ch;
}

foreach ($channels as $tag => $channel) {
    echo "Call wait for channel $tag\n";
    $channel->wait(null, true);
    echo "After wait for channel $tag\n";
}

Consumer output (run twice):

/var/www/html # php test.cons.php 
Channel #2 received
Channel #3 received
Channel #4 received
Call wait for channel amq.ctag-YCrS0OPy_iTZmDAj3JIb5g
Consumer #1 received Test msg #0
After wait for channel amq.ctag-YCrS0OPy_iTZmDAj3JIb5g
Call wait for channel amq.ctag-JB55oFdAaaBlpKcyWq_gcA
After wait for channel amq.ctag-JB55oFdAaaBlpKcyWq_gcA
Call wait for channel amq.ctag-MFCbhfUJpAU1dxFMvH2Avg
After wait for channel amq.ctag-MFCbhfUJpAU1dxFMvH2Avg

/var/www/html # php test.cons.php 
Channel #2 received
Channel #3 received
Channel #4 received
Call wait for channel amq.ctag-PuNSRjvLsHaz0395mMVWpw
Consumer #1 received Test msg #1
After wait for channel amq.ctag-PuNSRjvLsHaz0395mMVWpw
Call wait for channel amq.ctag-km07QZSwNEnyrLiFxv7VeQ
After wait for channel amq.ctag-km07QZSwNEnyrLiFxv7VeQ
Call wait for channel amq.ctag-AwwxsqeB2LEEB4D-f3uWsg
After wait for channel amq.ctag-AwwxsqeB2LEEB4D-f3uWsg

Possible Solution
When I add $channel->close() to the first callback, other callbacks begun receive messages.

<?php declare(strict_types=1);

use PhpAmqpLib\Connection\AMQPSocketConnection;
use PhpAmqpLib\Message\AMQPMessage;

require_once __DIR__.'/vendor/autoload.php';

$connection = new AMQPSocketConnection('rabbit', 5672, 'guest', 'guest');

$ch = $connection->channel();
$ch->exchange_declare('x-test', 'direct', false, true, false);
$ch->queue_declare('q-test', false, true, false, false);
$ch->queue_bind('q-test', 'x-test', 'key');

$consumers = [
    static function(AMQPMessage $msg) {
        echo 'Consumer #1 received ', $msg->body, "\n";
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        $msg->delivery_info['channel']->close();
    },
    static function(AMQPMessage $msg) {
        echo 'Consumer #2 received ', $msg->body, "\n";
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    },
    static function(AMQPMessage $msg) {
        echo 'Consumer #3 received ', $msg->body, "\n";
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    },
];

$channels = [];
foreach ($consumers as $consumer) {
    $ch = $connection->channel();
    echo "Channel #{$ch->getChannelId()} received\n";
    $tag = $ch->basic_consume('q-test', '', false, false, false, false, $consumer);
    $channels[$tag] = $ch;
}

foreach ($channels as $tag => $channel) {
    echo "Call wait for channel $tag\n";
    $channel->wait(null, true);
    echo "After wait for channel $tag\n";
}

Modified consumer output (run twice):

/var/www/html # php test.cons.php 
Channel #2 received
Channel #3 received
Channel #4 received
Call wait for channel amq.ctag-taMIh14W2aP6L-nETBAraw
Consumer #1 received Test msg #2
After wait for channel amq.ctag-taMIh14W2aP6L-nETBAraw
Call wait for channel amq.ctag-_DJ-xw1LGN-OqRL0s8SF5A
Consumer #2 received Test msg #3
After wait for channel amq.ctag-_DJ-xw1LGN-OqRL0s8SF5A
Call wait for channel amq.ctag-I75LREQgO6lV47UE3TAMvw
Consumer #3 received Test msg #4
After wait for channel amq.ctag-I75LREQgO6lV47UE3TAMvw

/var/www/html # php test.cons.php 
Channel #2 received
Channel #3 received
Channel #4 received
Call wait for channel amq.ctag-2M5cUnIz3hwEVAwlL3iF0g
Consumer #1 received Test msg #5
After wait for channel amq.ctag-2M5cUnIz3hwEVAwlL3iF0g
Call wait for channel amq.ctag-RQ3-_Qw98a8per59UN5bZQ
Consumer #2 received Test msg #6
After wait for channel amq.ctag-RQ3-_Qw98a8per59UN5bZQ
Call wait for channel amq.ctag-EwKSUv9j0yEfBGZS6hdTag
Consumer #3 received Test msg #7
After wait for channel amq.ctag-EwKSUv9j0yEfBGZS6hdTag

Additional context

PHP info ``` phpinfo() PHP Version => 7.1.31 System => Linux 6955f496400d 5.0.0-36-generic #39~18.04.1-Ubuntu SMP Tue Nov 12 11:09:50 UTC 2019 x86_64 Build Date => Aug 3 2019 00:21:56 Configure Command => './configure' '--build=x86_64-linux-musl' '--with-config-file-path=/usr/local/etc/php' '--with-config-file-scan-dir=/usr/local/etc/php/conf.d' '--enable-option-checking=fatal' '--with-mhash' '--enable-ftp' '--enable-mbstring' '--enable-mysqlnd' '--with-curl' '--with-libedit' '--with-openssl' '--with-zlib' '--enable-fpm' '--with-fpm-user=www-data' '--with-fpm-group=www-data' '--disable-cgi' 'build_alias=x86_64-linux-musl' Server API => Command Line Interface Virtual Directory Support => disabled Configuration File (php.ini) Path => /usr/local/etc/php Loaded Configuration File => (none) Scan this dir for additional .ini files => /usr/local/etc/php/conf.d Additional .ini files parsed => /usr/local/etc/php/conf.d/docker-php-ext-bcmath.ini, /usr/local/etc/php/conf.d/docker-php-ext-gd.ini, /usr/local/etc/php/conf.d/docker-php-ext-intl.ini, /usr/local/etc/php/conf.d/docker-php-ext-pcntl.ini, /usr/local/etc/php/conf.d/docker-php-ext-pdo_mysql.ini, /usr/local/etc/php/conf.d/docker-php-ext-pdo_pgsql.ini, /usr/local/etc/php/conf.d/docker-php-ext-sockets.ini, /usr/local/etc/php/conf.d/docker-php-ext-xmlrpc.ini, /usr/local/etc/php/conf.d/docker-php-ext-zip.ini, /usr/local/etc/php/conf.d/mcrypt.ini PHP API => 20160303 PHP Extension => 20160303 Zend Extension => 320160303 Zend Extension Build => API320160303,NTS PHP Extension Build => API20160303,NTS Debug Build => no Thread Safety => disabled Zend Signal Handling => enabled Zend Memory Manager => enabled Zend Multibyte Support => provided by mbstring IPv6 Support => enabled DTrace Support => disabled Registered PHP Streams => https, ftps, compress.zlib, php, file, glob, data, http, ftp, phar, zip Registered Stream Socket Transports => tcp, udp, unix, udg, ssl, tls, tlsv1.0, tlsv1.1, tlsv1.2 Registered Stream Filters => zlib.*, convert.iconv.*, string.rot13, string.toupper, string.tolower, string.strip_tags, convert.*, consumed, dechunk, mcrypt.*, mdecrypt.* This program makes use of the Zend Scripting Language Engine: Zend Engine v3.1.0, Copyright (c) 1998-2018 Zend Technologies _______________________________________________________________________ Configuration bcmath BCMath support => enabled Directive => Local Value => Master Value bcmath.scale => 0 => 0 Core PHP Version => 7.1.31 Directive => Local Value => Master Value allow_url_fopen => On => On allow_url_include => Off => Off arg_separator.input => & => & arg_separator.output => & => & auto_append_file => no value => no value auto_globals_jit => On => On auto_prepend_file => no value => no value browscap => no value => no value default_charset => UTF-8 => UTF-8 default_mimetype => text/html => text/html disable_classes => no value => no value disable_functions => no value => no value display_errors => STDOUT => STDOUT display_startup_errors => Off => Off doc_root => no value => no value docref_ext => no value => no value docref_root => no value => no value enable_dl => On => On enable_post_data_reading => On => On error_append_string => no value => no value error_log => no value => no value error_prepend_string => no value => no value error_reporting => no value => no value expose_php => On => On extension_dir => /usr/local/lib/php/extensions/no-debug-non-zts-20160303 => /usr/local/lib/php/extensions/no-debug-non-zts-20160303 file_uploads => On => On hard_timeout => 2 => 2 highlight.comment => #FF8000 => #FF8000 highlight.default => #0000BB => #0000BB highlight.html => #000000 => #000000 highlight.keyword => #007700 => #007700 highlight.string => #DD0000 => #DD0000 html_errors => Off => Off ignore_repeated_errors => Off => Off ignore_repeated_source => Off => Off ignore_user_abort => Off => Off implicit_flush => On => On include_path => .:/usr/local/lib/php => .:/usr/local/lib/php input_encoding => no value => no value internal_encoding => no value => no value log_errors => Off => Off log_errors_max_len => 1024 => 1024 mail.add_x_header => Off => Off mail.force_extra_parameters => no value => no value mail.log => no value => no value max_execution_time => 0 => 0 max_file_uploads => 20 => 20 max_input_nesting_level => 64 => 64 max_input_time => -1 => -1 max_input_vars => 1000 => 1000 memory_limit => 128M => 128M open_basedir => no value => no value output_buffering => 0 => 0 output_encoding => no value => no value output_handler => no value => no value post_max_size => 8M => 8M precision => 14 => 14 realpath_cache_size => 4096K => 4096K realpath_cache_ttl => 120 => 120 register_argc_argv => On => On report_memleaks => On => On report_zend_debug => Off => Off request_order => no value => no value sendmail_from => no value => no value sendmail_path => /usr/sbin/sendmail -t -i => /usr/sbin/sendmail -t -i serialize_precision => -1 => -1 short_open_tag => On => On SMTP => localhost => localhost smtp_port => 25 => 25 sql.safe_mode => Off => Off sys_temp_dir => no value => no value track_errors => Off => Off unserialize_callback_func => no value => no value upload_max_filesize => 2M => 2M upload_tmp_dir => no value => no value user_dir => no value => no value user_ini.cache_ttl => 300 => 300 user_ini.filename => .user.ini => .user.ini variables_order => EGPCS => EGPCS xmlrpc_error_number => 0 => 0 xmlrpc_errors => Off => Off zend.assertions => 1 => 1 zend.detect_unicode => On => On zend.enable_gc => On => On zend.multibyte => Off => Off zend.script_encoding => no value => no value zend.signal_check => Off => Off ctype ctype functions => enabled curl cURL support => enabled cURL Information => 7.65.1 Age => 4 Features AsynchDNS => Yes CharConv => No Debug => No GSS-Negotiate => No IDN => No IPv6 => Yes krb4 => No Largefile => Yes libz => Yes NTLM => Yes NTLMWB => Yes SPNEGO => No SSL => Yes SSPI => No TLS-SRP => Yes HTTP2 => Yes GSSAPI => No KERBEROS5 => No UNIX_SOCKETS => Yes PSL => No Protocols => dict, file, ftp, ftps, gopher, http, https, imap, imaps, pop3, pop3s, rtsp, smb, smbs, smtp, smtps, telnet, tftp Host => x86_64-alpine-linux-musl SSL Version => OpenSSL/1.1.1c ZLib Version => 1.2.11 date date/time support => enabled timelib version => 2016.05 "Olson" Timezone Database Version => 2018.7 Timezone Database => internal Default timezone => UTC Directive => Local Value => Master Value date.default_latitude => 31.7667 => 31.7667 date.default_longitude => 35.2333 => 35.2333 date.sunrise_zenith => 90.583333 => 90.583333 date.sunset_zenith => 90.583333 => 90.583333 date.timezone => no value => no value dom DOM/XML => enabled DOM/XML API Version => 20031129 libxml Version => 2.9.9 HTML Support => enabled XPath Support => enabled XPointer Support => enabled Schema Support => enabled RelaxNG Support => enabled fileinfo fileinfo support => enabled version => 1.0.5 libmagic => 522 filter Input Validation and Filtering => enabled Revision => $Id: 5a34caaa246b9df197f4b43af8ac66a07464fe4b $ Directive => Local Value => Master Value filter.default => unsafe_raw => unsafe_raw filter.default_flags => no value => no value ftp FTP support => enabled FTPS support => enabled gd GD Support => enabled GD Version => bundled (2.1.0 compatible) FreeType Support => enabled FreeType Linkage => with freetype FreeType Version => 2.10.0 GIF Read Support => enabled GIF Create Support => enabled JPEG Support => enabled libJPEG Version => 8 PNG Support => enabled libPNG Version => 1.6.37 WBMP Support => enabled XPM Support => enabled libXpm Version => 30411 XBM Support => enabled Directive => Local Value => Master Value gd.jpeg_ignore_warning => 1 => 1 hash hash support => enabled Hashing Engines => md2 md4 md5 sha1 sha224 sha256 sha384 sha512/224 sha512/256 sha512 sha3-224 sha3-256 sha3-384 sha3-512 ripemd128 ripemd160 ripemd256 ripemd320 whirlpool tiger128,3 tiger160,3 tiger192,3 tiger128,4 tiger160,4 tiger192,4 snefru snefru256 gost gost-crypto adler32 crc32 crc32b fnv132 fnv1a32 fnv164 fnv1a64 joaat haval128,3 haval160,3 haval192,3 haval224,3 haval256,3 haval128,4 haval160,4 haval192,4 haval224,4 haval256,4 haval128,5 haval160,5 haval192,5 haval224,5 haval256,5 MHASH support => Enabled MHASH API Version => Emulated Support iconv iconv support => enabled iconv implementation => unknown iconv library version => unknown Directive => Local Value => Master Value iconv.input_encoding => no value => no value iconv.internal_encoding => no value => no value iconv.output_encoding => no value => no value intl Internationalization support => enabled version => 1.1.0 ICU version => 64.2 ICU Data version => 64.2 Directive => Local Value => Master Value intl.default_locale => no value => no value intl.error_level => 0 => 0 intl.use_exceptions => 0 => 0 json json support => enabled json version => 1.5.0 libxml libXML support => active libXML Compiled Version => 2.9.9 libXML Loaded Version => 20909 libXML streams => enabled mbstring Multibyte Support => enabled Multibyte string engine => libmbfl HTTP input encoding translation => disabled libmbfl version => 1.3.2 oniguruma version => 5.9.6 mbstring extension makes use of "streamable kanji code filter and converter", which is distributed under the GNU Lesser General Public License version 2.1. Multibyte (japanese) regex support => enabled Multibyte regex (oniguruma) backtrack check => On Multibyte regex (oniguruma) version => 5.9.6 Directive => Local Value => Master Value mbstring.detect_order => no value => no value mbstring.encoding_translation => Off => Off mbstring.func_overload => 0 => 0 mbstring.http_input => no value => no value mbstring.http_output => no value => no value mbstring.http_output_conv_mimetypes => ^(text/|application/xhtml\+xml) => ^(text/|application/xhtml\+xml) mbstring.internal_encoding => no value => no value mbstring.language => neutral => neutral mbstring.strict_detection => Off => Off mbstring.substitute_character => no value => no value mcrypt mcrypt support => enabled mcrypt_filter support => enabled Version => 2.5.8 Api No => 20021217 Supported ciphers => cast-128 gost rijndael-128 twofish arcfour cast-256 loki97 rijndael-192 saferplus wake blowfish-compat des rijndael-256 serpent xtea blowfish enigma rc2 tripledes Supported modes => cbc cfb ctr ecb ncfb nofb ofb stream Directive => Local Value => Master Value mcrypt.algorithms_dir => no value => no value mcrypt.modes_dir => no value => no value mysqlnd mysqlnd => enabled Version => mysqlnd 5.0.12-dev - 20150407 - $Id: 38fea24f2847fa7519001be390c98ae0acafe387 $ Compression => supported core SSL => supported extended SSL => supported Command buffer size => 4096 Read buffer size => 32768 Read timeout => 31536000 Collecting statistics => Yes Collecting memory statistics => No Tracing => n/a Loaded plugins => mysqlnd,debug_trace,auth_plugin_mysql_native_password,auth_plugin_mysql_clear_password,auth_plugin_sha256_password API Extensions => pdo_mysql mysqlnd statistics => bytes_sent => 0 bytes_received => 0 packets_sent => 0 packets_received => 0 protocol_overhead_in => 0 protocol_overhead_out => 0 bytes_received_ok_packet => 0 bytes_received_eof_packet => 0 bytes_received_rset_header_packet => 0 bytes_received_rset_field_meta_packet => 0 bytes_received_rset_row_packet => 0 bytes_received_prepare_response_packet => 0 bytes_received_change_user_packet => 0 packets_sent_command => 0 packets_received_ok => 0 packets_received_eof => 0 packets_received_rset_header => 0 packets_received_rset_field_meta => 0 packets_received_rset_row => 0 packets_received_prepare_response => 0 packets_received_change_user => 0 result_set_queries => 0 non_result_set_queries => 0 no_index_used => 0 bad_index_used => 0 slow_queries => 0 buffered_sets => 0 unbuffered_sets => 0 ps_buffered_sets => 0 ps_unbuffered_sets => 0 flushed_normal_sets => 0 flushed_ps_sets => 0 ps_prepared_never_executed => 0 ps_prepared_once_executed => 0 rows_fetched_from_server_normal => 0 rows_fetched_from_server_ps => 0 rows_buffered_from_client_normal => 0 rows_buffered_from_client_ps => 0 rows_fetched_from_client_normal_buffered => 0 rows_fetched_from_client_normal_unbuffered => 0 rows_fetched_from_client_ps_buffered => 0 rows_fetched_from_client_ps_unbuffered => 0 rows_fetched_from_client_ps_cursor => 0 rows_affected_normal => 0 rows_affected_ps => 0 rows_skipped_normal => 0 rows_skipped_ps => 0 copy_on_write_saved => 0 copy_on_write_performed => 0 command_buffer_too_small => 0 connect_success => 0 connect_failure => 0 connection_reused => 0 reconnect => 0 pconnect_success => 0 active_connections => 0 active_persistent_connections => 0 explicit_close => 0 implicit_close => 0 disconnect_close => 0 in_middle_of_command_close => 0 explicit_free_result => 0 implicit_free_result => 0 explicit_stmt_close => 0 implicit_stmt_close => 0 mem_emalloc_count => 0 mem_emalloc_amount => 0 mem_ecalloc_count => 0 mem_ecalloc_amount => 0 mem_erealloc_count => 0 mem_erealloc_amount => 0 mem_efree_count => 0 mem_efree_amount => 0 mem_malloc_count => 0 mem_malloc_amount => 0 mem_calloc_count => 0 mem_calloc_amount => 0 mem_realloc_count => 0 mem_realloc_amount => 0 mem_free_count => 0 mem_free_amount => 0 mem_estrndup_count => 0 mem_strndup_count => 0 mem_estrdup_count => 0 mem_strdup_count => 0 mem_edupl_count => 0 mem_dupl_count => 0 proto_text_fetched_null => 0 proto_text_fetched_bit => 0 proto_text_fetched_tinyint => 0 proto_text_fetched_short => 0 proto_text_fetched_int24 => 0 proto_text_fetched_int => 0 proto_text_fetched_bigint => 0 proto_text_fetched_decimal => 0 proto_text_fetched_float => 0 proto_text_fetched_double => 0 proto_text_fetched_date => 0 proto_text_fetched_year => 0 proto_text_fetched_time => 0 proto_text_fetched_datetime => 0 proto_text_fetched_timestamp => 0 proto_text_fetched_string => 0 proto_text_fetched_blob => 0 proto_text_fetched_enum => 0 proto_text_fetched_set => 0 proto_text_fetched_geometry => 0 proto_text_fetched_other => 0 proto_binary_fetched_null => 0 proto_binary_fetched_bit => 0 proto_binary_fetched_tinyint => 0 proto_binary_fetched_short => 0 proto_binary_fetched_int24 => 0 proto_binary_fetched_int => 0 proto_binary_fetched_bigint => 0 proto_binary_fetched_decimal => 0 proto_binary_fetched_float => 0 proto_binary_fetched_double => 0 proto_binary_fetched_date => 0 proto_binary_fetched_year => 0 proto_binary_fetched_time => 0 proto_binary_fetched_datetime => 0 proto_binary_fetched_timestamp => 0 proto_binary_fetched_string => 0 proto_binary_fetched_json => 0 proto_binary_fetched_blob => 0 proto_binary_fetched_enum => 0 proto_binary_fetched_set => 0 proto_binary_fetched_geometry => 0 proto_binary_fetched_other => 0 init_command_executed_count => 0 init_command_failed_count => 0 com_quit => 0 com_init_db => 0 com_query => 0 com_field_list => 0 com_create_db => 0 com_drop_db => 0 com_refresh => 0 com_shutdown => 0 com_statistics => 0 com_process_info => 0 com_connect => 0 com_process_kill => 0 com_debug => 0 com_ping => 0 com_time => 0 com_delayed_insert => 0 com_change_user => 0 com_binlog_dump => 0 com_table_dump => 0 com_connect_out => 0 com_register_slave => 0 com_stmt_prepare => 0 com_stmt_execute => 0 com_stmt_send_long_data => 0 com_stmt_close => 0 com_stmt_reset => 0 com_stmt_set_option => 0 com_stmt_fetch => 0 com_deamon => 0 bytes_received_real_data_normal => 0 bytes_received_real_data_ps => 0 openssl OpenSSL support => enabled OpenSSL Library Version => OpenSSL 1.1.1c 28 May 2019 OpenSSL Header Version => OpenSSL 1.1.1c 28 May 2019 Openssl default config => /etc/ssl/openssl.cnf Directive => Local Value => Master Value openssl.cafile => no value => no value openssl.capath => no value => no value pcntl pcntl support => enabled pcre PCRE (Perl Compatible Regular Expressions) Support => enabled PCRE Library Version => 8.38 2015-11-23 PCRE JIT Support => enabled Directive => Local Value => Master Value pcre.backtrack_limit => 1000000 => 1000000 pcre.jit => 1 => 1 pcre.recursion_limit => 100000 => 100000 PDO PDO support => enabled PDO drivers => sqlite, mysql, pgsql pdo_mysql PDO Driver for MySQL => enabled Client API version => mysqlnd 5.0.12-dev - 20150407 - $Id: 38fea24f2847fa7519001be390c98ae0acafe387 $ Directive => Local Value => Master Value pdo_mysql.default_socket => no value => no value pdo_pgsql PDO Driver for PostgreSQL => enabled PostgreSQL(libpq) Version => 11.5 Module version => 7.1.31 Revision => $Id: 9c5f356c77143981d2e905e276e439501fe0f419 $ pdo_sqlite PDO Driver for SQLite 3.x => enabled SQLite Library => 3.28.0 Phar Phar: PHP Archive support => enabled Phar EXT version => 2.0.2 Phar API version => 1.1.1 SVN revision => $Id: e117ab0dc068703c55b505e78a0d3b3752e9c0b7 $ Phar-based phar archives => enabled Tar-based phar archives => enabled ZIP-based phar archives => enabled gzip compression => enabled bzip2 compression => disabled (install pecl/bz2) Native OpenSSL support => enabled Phar based on pear/PHP_Archive, original concept by Davey Shafik. Phar fully realized by Gregory Beaver and Marcus Boerger. Portions of tar implementation Copyright (c) 2003-2009 Tim Kientzle. Directive => Local Value => Master Value phar.cache_list => no value => no value phar.readonly => On => On phar.require_hash => On => On posix Revision => $Id: e3a2bc739dee8e0d29094e30e1cfbe3e87e2ceb4 $ readline Readline Support => enabled Readline library => EditLine wrapper Directive => Local Value => Master Value cli.pager => no value => no value cli.prompt => \b \> => \b \> Reflection Reflection => enabled Version => $Id: 279be19a9e466fb7cfea9841a630521f99644504 $ session Session Support => enabled Registered save handlers => files user Registered serializer handlers => php_serialize php php_binary Directive => Local Value => Master Value session.auto_start => Off => Off session.cache_expire => 180 => 180 session.cache_limiter => nocache => nocache session.cookie_domain => no value => no value session.cookie_httponly => Off => Off session.cookie_lifetime => 0 => 0 session.cookie_path => / => / session.cookie_secure => Off => Off session.gc_divisor => 100 => 100 session.gc_maxlifetime => 1440 => 1440 session.gc_probability => 1 => 1 session.lazy_write => On => On session.name => PHPSESSID => PHPSESSID session.referer_check => no value => no value session.save_handler => files => files session.save_path => no value => no value session.serialize_handler => php => php session.sid_bits_per_character => 4 => 4 session.sid_length => 32 => 32 session.upload_progress.cleanup => On => On session.upload_progress.enabled => On => On session.upload_progress.freq => 1% => 1% session.upload_progress.min_freq => 1 => 1 session.upload_progress.name => PHP_SESSION_UPLOAD_PROGRESS => PHP_SESSION_UPLOAD_PROGRESS session.upload_progress.prefix => upload_progress_ => upload_progress_ session.use_cookies => On => On session.use_only_cookies => On => On session.use_strict_mode => Off => Off session.use_trans_sid => 0 => 0 SimpleXML Simplexml support => enabled Revision => $Id: ae067cdcddf424d6e762603905b98798bc924a00 $ Schema support => enabled sockets Sockets Support => enabled SPL SPL support => enabled Interfaces => Countable, OuterIterator, RecursiveIterator, SeekableIterator, SplObserver, SplSubject Classes => AppendIterator, ArrayIterator, ArrayObject, BadFunctionCallException, BadMethodCallException, CachingIterator, CallbackFilterIterator, DirectoryIterator, DomainException, EmptyIterator, FilesystemIterator, FilterIterator, GlobIterator, InfiniteIterator, InvalidArgumentException, IteratorIterator, LengthException, LimitIterator, LogicException, MultipleIterator, NoRewindIterator, OutOfBoundsException, OutOfRangeException, OverflowException, ParentIterator, RangeException, RecursiveArrayIterator, RecursiveCachingIterator, RecursiveCallbackFilterIterator, RecursiveDirectoryIterator, RecursiveFilterIterator, RecursiveIteratorIterator, RecursiveRegexIterator, RecursiveTreeIterator, RegexIterator, RuntimeException, SplDoublyLinkedList, SplFileInfo, SplFileObject, SplFixedArray, SplHeap, SplMinHeap, SplMaxHeap, SplObjectStorage, SplPriorityQueue, SplQueue, SplStack, SplTempFileObject, UnderflowException, UnexpectedValueException sqlite3 SQLite3 support => enabled SQLite3 module version => 7.1.31 SQLite Library => 3.28.0 Directive => Local Value => Master Value sqlite3.defensive => 1 => 1 sqlite3.extension_dir => no value => no value standard Dynamic Library Support => enabled Path to sendmail => /usr/sbin/sendmail -t -i Directive => Local Value => Master Value assert.active => 1 => 1 assert.bail => 0 => 0 assert.callback => no value => no value assert.exception => 0 => 0 assert.quiet_eval => 0 => 0 assert.warning => 1 => 1 auto_detect_line_endings => 0 => 0 default_socket_timeout => 60 => 60 from => no value => no value session.trans_sid_hosts => no value => no value session.trans_sid_tags => a=href,area=href,frame=src,form= => a=href,area=href,frame=src,form= url_rewriter.hosts => no value => no value url_rewriter.tags => form= => form= user_agent => no value => no value tokenizer Tokenizer Support => enabled xml XML Support => active XML Namespace Support => active libxml2 Version => 2.9.9 xmlreader XMLReader => enabled xmlrpc core library version => xmlrpc-epi v. 0.51 php extension version => 7.1.31 author => Dan Libby homepage => http://xmlrpc-epi.sourceforge.net open sourced by => Epinions.com xmlwriter XMLWriter => enabled zip Zip => enabled Zip version => 1.13.5 Libzip version => 1.1.2 zlib ZLib Support => enabled Stream Wrapper => compress.zlib:// Stream Filter => zlib.inflate, zlib.deflate Compiled Version => 1.2.11 Linked Version => 1.2.11 Directive => Local Value => Master Value zlib.output_compression => Off => Off zlib.output_compression_level => -1 => -1 zlib.output_handler => no value => no value Additional Modules Module Name Environment Variable => Value PHP_EXTRA_CONFIGURE_ARGS => --enable-fpm --with-fpm-user=www-data --with-fpm-group=www-data --disable-cgi HOSTNAME => 6955f496400d PHP_INI_DIR => /usr/local/etc/php SHLVL => 1 HOME => /root PHP_LDFLAGS => -Wl,-O1 -Wl,--hash-style=both -pie PHP_CFLAGS => -fstack-protector-strong -fpic -fpie -O2 PHP_MD5 => PHP_VERSION => 7.1.31 GPG_KEYS => A917B1ECDA84AEC2B568FED6F50ABC807BD5DCD0 528995BFEDFBA7191D46839EF9BA0ADA31CBD89E 1729F83938DA44E27BA0F4D3DBDB397470D12172 PHP_CPPFLAGS => -fstack-protector-strong -fpic -fpie -O2 PHP_ASC_URL => https://www.php.net/get/php-7.1.31.tar.xz.asc/from/this/mirror PHP_URL => https://www.php.net/get/php-7.1.31.tar.xz/from/this/mirror TERM => xterm PATH => /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin WORKDIR => /var/www/html PHPIZE_DEPS => autoconf dpkg-dev dpkg file g++ gcc libc-dev make pkgconf re2c PWD => /var/www/html PHP_SHA256 => 5cb53b63592ec4361f0ab12c684b10430344821a024881a387ead4299df78fa5 PHP Variables Variable => Value $_SERVER['PHP_EXTRA_CONFIGURE_ARGS'] => --enable-fpm --with-fpm-user=www-data --with-fpm-group=www-data --disable-cgi $_SERVER['HOSTNAME'] => 6955f496400d $_SERVER['PHP_INI_DIR'] => /usr/local/etc/php $_SERVER['SHLVL'] => 1 $_SERVER['HOME'] => /root $_SERVER['PHP_LDFLAGS'] => -Wl,-O1 -Wl,--hash-style=both -pie $_SERVER['PHP_CFLAGS'] => -fstack-protector-strong -fpic -fpie -O2 $_SERVER['PHP_MD5'] => $_SERVER['PHP_VERSION'] => 7.1.31 $_SERVER['GPG_KEYS'] => A917B1ECDA84AEC2B568FED6F50ABC807BD5DCD0 528995BFEDFBA7191D46839EF9BA0ADA31CBD89E 1729F83938DA44E27BA0F4D3DBDB397470D12172 $_SERVER['PHP_CPPFLAGS'] => -fstack-protector-strong -fpic -fpie -O2 $_SERVER['PHP_ASC_URL'] => https://www.php.net/get/php-7.1.31.tar.xz.asc/from/this/mirror $_SERVER['PHP_URL'] => https://www.php.net/get/php-7.1.31.tar.xz/from/this/mirror $_SERVER['TERM'] => xterm $_SERVER['PATH'] => /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin $_SERVER['WORKDIR'] => /var/www/html $_SERVER['PHPIZE_DEPS'] => autoconf dpkg-dev dpkg file g++ gcc libc-dev make pkgconf re2c $_SERVER['PWD'] => /var/www/html $_SERVER['PHP_SHA256'] => 5cb53b63592ec4361f0ab12c684b10430344821a024881a387ead4299df78fa5 $_SERVER['PHP_SELF'] => test.cons.php $_SERVER['SCRIPT_NAME'] => test.cons.php $_SERVER['SCRIPT_FILENAME'] => test.cons.php $_SERVER['PATH_TRANSLATED'] => test.cons.php $_SERVER['DOCUMENT_ROOT'] => $_SERVER['REQUEST_TIME_FLOAT'] => 1574402244.907 $_SERVER['REQUEST_TIME'] => 1574402244 $_SERVER['argv'] => Array ( [0] => test.cons.php ) $_SERVER['argc'] => 1 $_ENV['PHP_EXTRA_CONFIGURE_ARGS'] => --enable-fpm --with-fpm-user=www-data --with-fpm-group=www-data --disable-cgi $_ENV['HOSTNAME'] => 6955f496400d $_ENV['PHP_INI_DIR'] => /usr/local/etc/php $_ENV['SHLVL'] => 1 $_ENV['HOME'] => /root $_ENV['PHP_LDFLAGS'] => -Wl,-O1 -Wl,--hash-style=both -pie $_ENV['PHP_CFLAGS'] => -fstack-protector-strong -fpic -fpie -O2 $_ENV['PHP_MD5'] => $_ENV['PHP_VERSION'] => 7.1.31 $_ENV['GPG_KEYS'] => A917B1ECDA84AEC2B568FED6F50ABC807BD5DCD0 528995BFEDFBA7191D46839EF9BA0ADA31CBD89E 1729F83938DA44E27BA0F4D3DBDB397470D12172 $_ENV['PHP_CPPFLAGS'] => -fstack-protector-strong -fpic -fpie -O2 $_ENV['PHP_ASC_URL'] => https://www.php.net/get/php-7.1.31.tar.xz.asc/from/this/mirror $_ENV['PHP_URL'] => https://www.php.net/get/php-7.1.31.tar.xz/from/this/mirror $_ENV['TERM'] => xterm $_ENV['PATH'] => /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin $_ENV['WORKDIR'] => /var/www/html $_ENV['PHPIZE_DEPS'] => autoconf dpkg-dev dpkg file g++ gcc libc-dev make pkgconf re2c $_ENV['PWD'] => /var/www/html $_ENV['PHP_SHA256'] => 5cb53b63592ec4361f0ab12c684b10430344821a024881a387ead4299df78fa5 PHP License This program is free software; you can redistribute it and/or modify it under the terms of the PHP License as published by the PHP Group and included in the distribution in the file: LICENSE This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. If you did not receive a copy of the PHP license, or have any questions about PHP licensing, please contact license@php.net. ```
ramunasd commented 4 years ago

Lets see :) I left one consumer with 3 channels working and this is what rabbitmqctl list_channels shows:

Listing channels ...
pid user    consumer_count  messages_unacknowledged
<rabbit@f3da104b8fc8.3.3453.0>  guest   0   0
<rabbit@f3da104b8fc8.3.3458.0>  guest   1   78
<rabbit@f3da104b8fc8.3.3462.0>  guest   1   0
<rabbit@f3da104b8fc8.3.3466.0>  guest   1   0

So all messages was assigned to one channel/consumer, perhaps somehow buffered, but marked as unacknowledged. It means those messages cannot be delivered to other consumers while not (n)acked and connection is alive.

Not lets try with $ch->basic_qos(null, 1, false);

Listing channels ...
pid user    consumer_count  messages_unacknowledged
<rabbit@f3da104b8fc8.3.3860.0>  guest   0   0
<rabbit@f3da104b8fc8.3.3865.0>  guest   1   1
<rabbit@f3da104b8fc8.3.3869.0>  guest   1   1
<rabbit@f3da104b8fc8.3.3873.0>  guest   1   1

and test output:

all wait for channel amq.ctag-ZPgU_BHoZm4BZMuzgZYp6g
Consumer #1 received Test msg #32
After wait for channel amq.ctag-ZPgU_BHoZm4BZMuzgZYp6g
Call wait for channel amq.ctag-hlArvtrBmp74Dn3EqKENKw
Consumer #2 received Test msg #33
After wait for channel amq.ctag-hlArvtrBmp74Dn3EqKENKw
Call wait for channel amq.ctag-m9fQcGl-i6c_BDKOdXNZ9A
Consumer #3 received Test msg #34
After wait for channel amq.ctag-m9fQcGl-i6c_BDKOdXNZ9A

Seems everything is fine on client side. So why RabbitMQ assigns all messages to 1st consumer? Well, this part isn't documented well and we can only guess that by default prefetch count is unlimited.

RabbitMQ default prefetch setting gives clients an unlimited buffer, meaning that RabbitMQ by default send as many messages as it can to any consumer that looks ready to accept them.

More at: https://www.cloudamqp.com/blog/2017-12-29-part1-rabbitmq-best-practice.html#prefetch