mbroadst / qamqp

AMQP 0.9.1 implementation for Qt
Other
151 stars 128 forks source link

Reconnection after server restart #29

Closed aletricks closed 8 years ago

aletricks commented 8 years ago

Hi, thanks for your work! we are trying to make stronger our application if RabbitMQ server restart . No problem with the producers, when connect signal is emitted the exchages are redeclared and we can use publish methods successfully. We have some problem with the queues: after reconnection, the queue object already created throws this error:

bool QAmqpQueue::consume(int) already consuming with tag: "amq.ctag-KAOuGyz97ePJf9ovxBfSLQ"

What is the correct approach? We must destroy and recreate the objects QAmqpClient, QAmqpQueue, QAmqpExchange or we can reuse them?

mbroadst commented 8 years ago

@aletricks hi, can you please try running your application with QAMQP_DEBUG=1 set in the environment variables, this way we can have a little more information to work with. You should be able to use your existing channels over reconnects to the server.

aletricks commented 8 years ago

@mbroadst hi, the variable was yet enable set to 1. As you can see from debug message above. I was able to reconnect simply destroying QamqpClient, but is not the my favourite solution. I would prefer reuse existing channels, as you suggest! The main problem occurs when consume method is called after reconnection to rabbit server (restarted), the server doesn't create a new consumer, because local queue can't send the frame of request for a new consumer because it found the old one to indicated tag above.

mbroadst commented 8 years ago

@aletricks I can see it was run with the debug statement, what I am asking for is to actually see the full logs :smile: I haven't worked with this library in almost a year, so I'll need to refresh myself. When the client reconnects (which it should do automatically), open should automatically be called on all existing QAmqpQueue and QAmqpExchange objects. If that's not happening that's a bug. If that is happening, then you might have an issue with the way you are responding to such events, in which case I would recommend submitting a small test program to recreate this issue.

aletricks commented 8 years ago

Ok, here is the log of this use case:

void ipc::IpcReceiver::queueDeclared() 
void QAmqpQueuePrivate::bindOk(const QAmqpMethodFrame&) bound to exchange 
void ipc::IpcReceiver::queueBound() 
consume ok:  "amq.gen-PPETjpaTVcRIwF6Vlu7ggA" 
consumer tag = amq.ctag-RcZf8H4gJtzJh2GJFBEMbg
declared queue:  "" 
message count 0
Consumer count: 0
void ipc::IpcReceiver::queueDeclared() 
void QAmqpQueuePrivate::bindOk(const QAmqpMethodFrame&) bound to exchange 
void ipc::IpcReceiver::queueBound() 
consume ok:  "amq.gen-PcUAzY_-C7Tv6qB8rg1ArQ" 
consumer tag = amq.ctag-ivzkvruvDyVMBwxgdMrZhA
Connection: 
>> CLOSE
>> code: 320
>> text: CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'
>> class-id: 0
>> method-id: 0
exchange  "TestSim"  disconnected 
socket error:  "The remote host closed the connection" 
trying to reconnect after:  1000 ms 
connecting to host:  "10.0.0.1" , port:  5672 
socket error:  "Connection refused" 
trying to reconnect after:  5000 ms 
connecting to host:  "10.0.0.1" , port:  5672 
Connection: 
>> Start
>> version_major: 0
>> version_minor: 9
>> mechanisms:  ("AMQPLAIN", "PLAIN") 
>> locales: en_US
Connection: 
>> Tune
>> channel_max: 0
>> frame_max: 131072
>> heartbeat: 60
Connection: 
>> OpenOK
void ipc::IpcBase::clientConnected() 
void ipc::IpcBase::clientConnected() 
Open channel #1
Open channel #2
Open channel #3
Channel#1:
>> OpenOK
Channel#2:
>> OpenOK
Channel#3:
>> OpenOK
declared exchange:  "TestSim" 
virtual void ipc::IpcReceiver::exchangeDeclared() 
Open channel #1
virtual void ipc::IpcReceiver::exchangeDeclared() 
Open channel #2
Connection: 
>> CLOSE
>> code: 503
>> text: COMMAND_INVALID - second 'channel.open' seen
>> class-id: 20
>> method-id: 10
exchange  "TestSim"  disconnected 
socket error:  "The remote host closed the connection" 
trying to reconnect after:  1000 ms 
connecting to host:  "10.0.0.1" , port:  5672 
Connection: 
>> Start
>> version_major: 0
>> version_minor: 9
>> mechanisms:  ("AMQPLAIN", "PLAIN") 
>> locales: en_US
Connection: 
>> Tune
>> channel_max: 0
>> frame_max: 131072
>> heartbeat: 60
Connection: 
>> OpenOK
void ipc::IpcBase::clientConnected() 
void ipc::IpcBase::clientConnected() 
Open channel #1
Open channel #2
Open channel #3
Open channel #1
Open channel #2
Channel#1:
>> OpenOK
Channel#1:
>> OpenOK
Channel#2:
>> OpenOK
Channel#2:
>> OpenOK
Channel#3:
>> OpenOK
Connection: 
>> CLOSE
>> code: 503
>> text: COMMAND_INVALID - second 'channel.open' seen
>> class-id: 20
>> method-id: 10
exchange  "TestSim"  disconnected 
socket error:  "The remote host closed the connection" 
trying to reconnect after:  1000 ms 
connecting to host:  "10.0.0.1" , port:  5672 
Connection: 
>> Start
>> version_major: 0
>> version_minor: 9
>> mechanisms:  ("AMQPLAIN", "PLAIN") 
>> locales: en_US
Connection: 
>> Tune
>> channel_max: 0
>> frame_max: 131072
>> heartbeat: 60
Connection: 
>> OpenOK
void ipc::IpcBase::clientConnected() 
void ipc::IpcBase::clientConnected() 
Open channel #1
Open channel #2
Open channel #3
Open channel #1
Open channel #2
Channel#1:
>> OpenOK
Channel#1:
>> OpenOK
Channel#2:
>> OpenOK
Channel#2:
>> OpenOK
Channel#3:
>> OpenOK
Connection: 
>> CLOSE
>> code: 503
>> text: COMMAND_INVALID - second 'channel.open' seen
>> class-id: 20
>> method-id: 10
exchange  "TestSim"  disconnected 
socket error:  "The remote host closed the connection" 
trying to reconnect after:  1000 ms 
connecting to host:  "10.0.0.1" , port:  5672 
Connection: 
>> Start
>> version_major: 0
>> version_minor: 9
>> mechanisms:  ("AMQPLAIN", "PLAIN") 
>> locales: en_US
Connection: 
>> Tune
>> channel_max: 0
>> frame_max: 131072
>> heartbeat: 60
Connection: 
>> OpenOK
void ipc::IpcBase::clientConnected() 
void ipc::IpcBase::clientConnected() 
Open channel #1
Open channel #2
Open channel #3
Open channel #1
Open channel #2
Channel#1:
>> OpenOK
Channel#1:
>> OpenOK
Channel#2:
>> OpenOK
Channel#2:
>> OpenOK
Channel#3:
>> OpenOK
Connection: 
mbroadst commented 8 years ago

@aletricks hmm that seems to be a different issue than you originally reported, but just as bad. For some reason there are multiple open frames being sent out for the same channel:

Open channel #1
Open channel #2
Open channel #3
Open channel #1
Open channel #2

I'll see if I can find some time later today to reproduce the issue, in the meantime you might try to put some debug messages in and around the reconnect logic to figure out which signals are causing open frames to be sent out (probably in here: https://github.com/mbroadst/qamqp/blob/master/src/qamqpchannel.cpp#L95, print out that its called, what channel it is etc). Sorry, but you're probably going to have to work with me on this one :smile:

mbroadst commented 8 years ago

@aletricks off the top of my head I can see that needOpen should probably be set to false at the end of QAmqpChannelPrivate::open

aletricks commented 8 years ago

@mbroadst Sorry, not work ! by setting at the end of QAmqpChannelPrivate::open

needOpen = false

aletricks commented 8 years ago

@mbroadst you can try this issue if, restart server while send messages, you can see one of your tutorial examples (i.e. HelloWorld). If you insert a timer connected with send of message, from other side a client receive all mesagges, until server stop. RIGHT! When I restart server, the producer will be able to continue to send messages automatically (:D), but receives will be locked !

mbroadst commented 8 years ago

@aletricks hey, I pushed a few fixes just now that should fix your problem. First, and foremost, it's important that you do something like this in your code, because the connection will be made multiple times otherwise and you could run into strange state.

Aside from that, I've tested a few different cases and it seems to recover well. I also fixed some bugs with inconsistent behavior when a forced disconnect was received.

Please give it a shot!

aletricks commented 8 years ago

Yesterday, I found a fix, by setting queue attribute

consuming = false

When server go down !

It work and resume queues well!

aletricks commented 8 years ago

I used your last version, it work as good as mine! More o less the same modifies. It's been nice collaborate with you. Thank you!

mbroadst commented 8 years ago

@aletricks great :smile: I pushed a final change to just move the internal state reset code into the d-pointers, but we should be good at this point. I'm going to close the issue, please feel free to re-open if you experience a similar problem.

aletricks commented 8 years ago

I want inform something about my modify. I overloaded _q_disconnected function into QAmqpChannelPrivate, like into others specialized channel classes:

void QAmqpQueuePrivate::_q_disconnected() { QAmqpChannelPrivate::_q_disconnected(); qAmqpDebug() << "exchange " << name << " disconnected"; consuming = false; }

mbroadst commented 8 years ago

@aletricks yeah there is some double effort being made here, in all of those _q_disconnected calls they should just call resetInternalState(), but the way it is right now won't screw anything up (its not just "clean")