alanxz / rabbitmq-c

RabbitMQ C client
MIT License
1.76k stars 669 forks source link

Consuming hangs thread after network error #700

Open PversusNP opened 3 years ago

PversusNP commented 3 years ago

With 0.10.0 version, this is my source code for consuming:

amqp_envelope_t envelope; amqp_rpc_reply_t ret = amqp_consume_message(Connection->State, &envelope, timeout, 0);

if (AMQP_RESPONSE_NORMAL != ret.reply_type) { if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type && AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) { timeval Timeout = {}; // = 0 non blocking Timeout.tv_sec = 0; Timeout.tv_usec = 0; amqp_frame_t frame; if (AMQP_STATUS_OK != amqp_simple_wait_frame_noblock(Connection->State, &frame, &Timeout)) { return false; }

      if (AMQP_FRAME_METHOD == frame.frame_type)
      {
          switch (frame.payload.method.id)
          {
          case AMQP_BASIC_ACK_METHOD:
              /* if we've turned publisher confirms on, and we've published a
              * message here is a message being confirmed.
              */
              break;
          case AMQP_BASIC_RETURN_METHOD:
              /* if a published message couldn't be routed and the mandatory
              * flag was set this is what would be returned. The message then
              * needs to be read.
              */
          {
              amqp_message_t message;
              ret = amqp_read_message(Connection->State, frame.channel, &message, 0);
              if (AMQP_RESPONSE_NORMAL != ret.reply_type) 
              {
                  return false;
              }

              amqp_destroy_message(&message);
          }
          break;

...

I replaced amqp_simple_wait_frame() with amqp_simple_wait_frame_noblock() because my thread must be not blocking.

But sometimes, after some kind of network error, my application closes the connection object and tries to reopen it. When, after some tries, it is able to open a new connection, somewhere in the code above it hangs up.

I suspect that amqp_read_message isn't not blocking. Does anyone experience the same issue? Is there a not blocking version for the functions above? Thanks.

kongxa commented 10 months ago

As a consumer, i would like to know under what circumstances AMQP_BASIC_ACK_METHOD or AMQP_BASIC_RETURN_METHOD will be recved?

kongxa commented 10 months ago

amqp_read_message->amqp_simple_wait_frame_on_channel->wait_frame_inner(_amqp_timeinfinite()). In wait_frame_inner function, if amqp_connection_statet.heartbeat <= 0,then heartbeats are not enabled, and next_recv_heartbeat and next_send_heartbeat are set to infinite. In this case maybe block your program.

   deadline = amqp_time_first(timeout_deadline,
                               amqp_time_first(state->next_recv_heartbeat,
                                               state->next_send_heartbeat));
    /* TODO this needs to wait for a _frame_ and not anything written from the
     * socket */
    res = recv_with_timeout(state, deadline);