rabbitmq / rabbitmq-amqp1.0

AMQP 1.0 support for RabbitMQ
https://www.rabbitmq.com/
Other
93 stars 20 forks source link

Incompatibility with Apache Qpid C++ API #90

Closed szegel closed 5 years ago

szegel commented 5 years ago

RabbitMQ AMQP 1.0 errors out and closes the session when an Apache Qpid C++ client receive() call times out. The RabbitMQ server should not close the session when the Qpid C++ client receive() call times out.

I wrote this bug on QPID C++ API: https://issues.apache.org/jira/browse/QPID-8347 I believe that some part of it may be related to the RabbitMQ server.

Source:

#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>

#include <iostream>

using namespace qpid::messaging;

int main(int argc, char** argv) {
    std::string broker = argc > 1 ? argv[1] : "localhost:5672";
    std::cout << "broker: " << broker << std::endl;
    std::string address = argc > 2 ? argv[2] : "topic.hello.world";
    std::cout << "address: " << address << std::endl;
    std::string connectionOptions = argc > 3 ? argv[3] : "";
    std::cout << "connectionOptions: " << connectionOptions << std::endl;

    try {
        Connection connection(broker, connectionOptions);
        connection.open();
        Session session = connection.createSession();

        Receiver receiver = session.createReceiver(address);
        Message message;
        std::cout << "Pre Receive" << std::endl;
        message = receiver.fetch(Duration::SECOND * 10);
        std::cout << "Post Receive" << std::endl;
        session.acknowledge();

        connection.close();
        return 0;
    } catch(const std::exception& error) {
        std::cerr << error.what() << std::endl;
        return 1;
    }
}

Server Error (Occurs after 10 second timeout):

=INFO REPORT==== 23-Jul-2019::17:49:54 ===
accepting AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672)

=ERROR REPORT==== 23-Jul-2019::17:49:54 ===
closing AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672):
{bad_version,{1,1,0,10}}

=INFO REPORT==== 23-Jul-2019::17:49:54 ===
accepting AMQP connection <0.660.0> ([::1]:40360 -> [::1]:5672)

=ERROR REPORT==== 23-Jul-2019::17:50:04 ===
** Generic server <0.675.0> terminating 
** Last message in was {send_command,
                           {'basic.credit_drained',
                               <<99,116,97,103,45,0,0,0,0>>,
                               1}}
** When Server state == {state,1,<0.669.0>,<0.674.0>,direct,
                               {[],[]},
                               false,<0.678.0>,none,none,0,true,none,
                               {0,nil},
                               {0,nil},
                               true,false}
** Reason for termination == 
** {{badmatch,{empty,{[],[]}}},
    [{amqp_channel,rpc_bottom_half,2,
                   [{file,"src/amqp_channel.erl"},{line,623}]},
     {amqp_channel,handle_method_from_server1,3,
                   [{file,"src/amqp_channel.erl"},{line,800}]},
     {gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,616}]},
     {gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,686}]},
     {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]}

=WARNING REPORT==== 23-Jul-2019::17:50:04 ===
Connection (<0.669.0>) closing: internal error in channel (<0.675.0>): {{badmatch,
                                                                         {empty,
                                                                          {[],
                                                                           []}}},
                                                                        [{amqp_channel,
                                                                          rpc_bottom_half,
                                                                          2,
                                                                          [{file,
                                                                            "src/amqp_channel.erl"},
                                                                           {line,
                                                                            623}]},
                                                                         {amqp_channel,
                                                                          handle_method_from_server1,
                                                                          3,
                                                                          [{file,
                                                                            "src/amqp_channel.erl"},
                                                                           {line,
                                                                            800}]},
                                                                         {gen_server,
                                                                          try_dispatch,
                                                                          4,
                                                                          [{file,
                                                                            "gen_server.erl"},
                                                                           {line,
                                                                            616}]},
                                                                         {gen_server,
                                                                          handle_msg,
                                                                          6,
                                                                          [{file,
                                                                            "gen_server.erl"},
                                                                           {line,
                                                                            686}]},
                                                                         {proc_lib,
                                                                          init_p_do_apply,
                                                                          3,
                                                                          [{file,
                                                                            "proc_lib.erl"},
                                                                           {line,
                                                                            247}]}]}

=ERROR REPORT==== 23-Jul-2019::17:50:04 ===
** Generic server <0.678.0> terminating
** Last message in was {'EXIT',<0.675.0>,
                           {{badmatch,{empty,{[],[]}}},
                            [{amqp_channel,rpc_bottom_half,2,
                                 [{file,"src/amqp_channel.erl"},{line,623}]},
                             {amqp_channel,handle_method_from_server1,3,
                                 [{file,"src/amqp_channel.erl"},{line,800}]},
                             {gen_server,try_dispatch,4,
                                 [{file,"gen_server.erl"},{line,616}]},
                             {gen_server,handle_msg,6,
                                 [{file,"gen_server.erl"},{line,686}]},
                             {proc_lib,init_p_do_apply,3,
                                 [{file,"proc_lib.erl"},{line,247}]}]}}
** When Server state == {ch,running,rabbit_framing_amqp_0_9_1,1,<0.675.0>,
                         <0.675.0>,<0.669.0>,<<"[::1]:40360 -> [::1]:5672">>,
                         {lstate,<0.677.0>,false},
                         none,1,
                         {[],[]},
                         {user,<<"guest">>,
                          [administrator],
                          [{rabbit_auth_backend_internal,none}]},
                         <<"/">>,<<>>,
                         {dict,1,16,16,8,80,48,
                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                          {{[],[],[],[],[],[],[],[],[],
                            [[<0.583.0>|
                              {resource,<<"/">>,queue,
                               <<"topic.hello.world">>}]],
                            [],[],[],[],[],[]}}},
                         {state,
                          {dict,1,16,16,8,80,48,
                           {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                           {{[],[],[],[],[],[],[],[],[],
                             [[<0.583.0>|#Ref<0.4039704202.3895984130.68325>]],
                             [],[],[],[],[],[]}}},
                          erlang},
                         {dict,1,16,16,8,80,48,
                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                          {{[],[],[],[],[],[],[],[],[],[],[],
                            [[<<99,116,97,103,45,0,0,0,0>>|
                              {{amqqueue,
                                {resource,<<"/">>,queue,
                                 <<"topic.hello.world">>},
                                false,false,none,[],<0.583.0>,[],[],[],
                                undefined,[],[],live,0},
                               {false,65535,false,
                                [{<<"x-credit">>,table,
                                  [{<<"credit">>,long,0},
                                   {<<"drain">>,boolean,false}]}]}}]],
                            [],[],[],[]}}},
                         {dict,1,16,16,8,80,48,
                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                          {{[],[],[],[],[],[],[],[],[],
                            [[<0.583.0>|
                              {1,{<<99,116,97,103,45,0,0,0,0>>,nil,nil}}]],
                            [],[],[],[],[],[]}}},
                         {set,1,16,16,8,80,48,
                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                          {{[],[],[],[],[],[],[],[],[],
                            [<0.583.0>],
                            [],[],[],[],[],[]}}},
                         <0.672.0>,
                         {state,fine,5000,
                          #Ref<0.4039704202.3895984129.147347>},
                         false,1,
                         {{0,nil},{0,nil}},
                         [],
                         {{0,nil},{0,nil}},
                         [{<<"publisher_confirms">>,bool,true},
                          {<<"exchange_exchange_bindings">>,bool,true},
                          {<<"basic.nack">>,bool,true},
                          {<<"consumer_cancel_notify">>,bool,true},
                          {<<"connection.blocked">>,bool,true},
                          {<<"authentication_failure_close">>,bool,true}],
                         none,65535,none,flow,[]}
** Reason for termination == 
** {{badmatch,{empty,{[],[]}}},
    [{amqp_channel,rpc_bottom_half,2,
                   [{file,"src/amqp_channel.erl"},{line,623}]},
     {amqp_channel,handle_method_from_server1,3,
                   [{file,"src/amqp_channel.erl"},{line,800}]},
     {gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,616}]},
     {gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,686}]},
     {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]}

Configuration: OS: Ubuntu Bionic QPID C++ version 1.39.0 QPID Proton version 0.28.0 RabbitMQ Broker version 3.6.10 Erlang version 20.2.2 RabbimtMQ AMQP 1.0 plugin Version 3.6.10

michaelklishin commented 5 years ago

RabbitMQ 3.6.x has been out of support for 14 months. Sorry but we won't be investigating any issues with 3.6.x releases.

michaelklishin commented 5 years ago

According to the exceptions logged the client reports its version as 1,1,0,10 and this plugin treats it as {bad_version,{1,1,0,10}}but doesn't close the connection. While it should (and I wouldn't be surprised if it does in3.7.x`), the real question is whether the version bump indicates a meaningfully different behavior compared to older [protocol] versions.