rabbitmq / amqp091-go

An AMQP 0-9-1 Go client maintained by the RabbitMQ team. Originally by @streadway: `streadway/amqp`
Other
1.4k stars 133 forks source link

Method amqp.Connection.Channel() is blocked for infinity while there is no Internet #225

Open dr-begemot opened 9 months ago

dr-begemot commented 9 months ago

Describe the bug

Method amqp.Connection.Channel() is infinity blocked when internet turned off in the middle of DialConfig() and Channel():

    config := amqp.Config{Properties: amqp.NewConnectionProperties()}
    config.Properties.SetClientConnectionName(connName)
    config.Heartbeat = 10 * time.Second
    log.Debug().Msgf("dialing %q", amqpURI)
    conn, err := amqp.DialConfig(amqpURI, config)
    if err != nil {
        return nil, fmt.Errorf("Dial: %s", err)
    }

        // turn off internet here
        log.Debug().Msgf("turn off Internet!")
    time.Sleep(10 * time.Second)

    log.Debug().Msgf("got Connection")
    channel, err := conn.Channel() // infinity blocked while no internet
    if err != nil {
        return nil, fmt.Errorf("Channel: %s", err)
    }

Reproduction steps

  1. Dial to rabbitmq server and get Connection
  2. Turn off internet
  3. Call Connection.Channel() method

Expected behavior

Channel() method should return error after timeout.

Additional context

No response

lukebakken commented 9 months ago

Hello, thanks for using this library.

Could you be more specific about what "turn off internet" means? What exactly are you doing?

dr-begemot commented 9 months ago

Hello, thanks for using this library.

Could you be more specific about what "turn off internet" means? What exactly are you doing?

I just unplug the Ethernet cable))

Danlock commented 8 months ago

Yes, it's because this library doesn't take a context.Context in and actually use it to timeout AMQP call's. You can check out my wrapper which was created to solve this exact problem.

Hopefully https://github.com/rabbitmq/amqp091-go/issues/124 is implemented eventually.

shendongyuxmxm commented 8 months ago

I also encountered this issue. Is there any temporary solution available now? I can't wait for the 2.0.0 version. For example, can I downgrade the version?

lukebakken commented 8 months ago

@shendongyuxmxm please see the comment just above yours - https://github.com/rabbitmq/amqp091-go/issues/225#issuecomment-1795484754

At some point we'll have time to investigate this issue. Personally I'm surprised there isn't a socket read/write timeout that catches this particular case.

Danlock commented 8 months ago

I wrote an example to demonstrate a similar problem I've had with amqp091-go (a connection hanging), with a similar solution (use contexts throughout amqp091-go). No ethernet unplugging required.

https://github.com/Danlock/rmq/blob/17f5efed0a2038993f1da091398b0a174812a02a/hang_int_test.go#L29

Unfortunately just calling the socket's SetDeadline is not enough to prevent this.

Zerpet commented 8 months ago

Thank you @Danlock for providing a repro for this issue, that's very helpful 👍 One challenge I found, when I started thinking about contexts, is the state machine used internally in this library, see https://github.com/rabbitmq/amqp091-go/issues/124#issuecomment-1578529925 and https://github.com/rabbitmq/amqp091-go/issues/124#issuecomment-1580280717

Supporting contexts in almost all functions will likely come at a cost of "drastic" clean up after a context cancels a function i.e. closing the AMQP channel or connection. I guess that's more desirable than a blocked function, but I'm not sure since #124 hasn't got much traction since June.

Danlock commented 8 months ago

If you look at my library danlock/rmq's implementation, it's closes the channel when there is a timeout using it, and closes the connection when there is a Channel() timeout. I personally think that behavior is fine.

It may also be good to see what the other (Java, C#) libraries do in a similar situation.

On Tue, Nov 14, 2023, 6:22 AM Aitor Pérez Cedres @.***> wrote:

Thank you @Danlock https://github.com/Danlock for providing a repro for this issue, that's very helpful 👍 One challenge I found, when I started thinking about contexts, is the state machine used internally in this library, see #124 (comment) https://github.com/rabbitmq/amqp091-go/issues/124#issuecomment-1578529925 and #124 (comment) https://github.com/rabbitmq/amqp091-go/issues/124#issuecomment-1580280717

Supporting contexts in almost all functions will likely come at a cost of "drastic" clean up after a context cancels a function i.e. closing the AMQP channel or connection. I guess that's more desirable than a blocked function, but I'm not sure since #124 https://github.com/rabbitmq/amqp091-go/issues/124 hasn't got much traction since June.

— Reply to this email directly, view it on GitHub https://github.com/rabbitmq/amqp091-go/issues/225#issuecomment-1810020715, or unsubscribe https://github.com/notifications/unsubscribe-auth/AA2VKUAXPG3XSZSRRQDIS6LYENH6VAVCNFSM6AAAAAA5UHAVZOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTQMJQGAZDANZRGU . You are receiving this because you were mentioned.Message ID: @.***>

Ja7ad commented 8 months ago

@Danlock your wrapped library auto reconnect connection and recreate channel object? I have this, some message is unacked in queue but don't received in message channel from consumer!!!

Danlock commented 8 months ago

danlock/rmq.NewConsumer() creates an auto reconnecting consumer that recreates AMQP channels as necessary.

I'm not exactly sure what you are asking about, but if it's related to your usage of Danlock/rmq you should open a new issue in Danlock/rmq with reproducible steps. If it's related to amqp091 you should probably open a new issue here with reproducible steps.

Zerpet commented 7 months ago

I discussed this issue with some colleagues, and the general consensus is that closing a channel/connection after a timeout is a bit too drastic. The Java and .NET AMQP clients just reset the state machine, and continue as if nothing happened. We probably will do the same in this client, in order to maintain consistency between our client libraries. There are probably some edge cases we need to consider (e.g. subscription frame times out, arrives late and gets discarded, do we receive messages?)

Danlock commented 7 months ago

That does seem better to me, and maintaining parity with other language implementations is important.

In regards to that specific edge case, if a subscription frame timed out, presumably the Consume() call also timed out, correct? Therefore Consume would have already returned a nil channel and timeout error to the end user.

In that case I don't think messages should be received at all. If messages were received without a successful accompanying Consume() channel, they probably be rejected so they can be placed back on the queue for any other healthy consumers.

Zerpet commented 7 months ago

In regards to that specific edge case, if a subscription frame timed out, presumably the Consume() call also timed out, correct?

Yes, that's correct. Handling the specific return of Consume() is as you described. The challenge/question in the scenario I described is that RabbitMQ server registers the basic.consume (from Consume()) request and sends messages to the client. In that case, our state machine will receive a basic.deliver (a message) for a subscription that should not exist (since Consume() timed out). The state machine can't quite reject the message because it doesn't have a channel at hand to send a basic.cancel or basic.nack. In that specific case (a delivery for non-existing subcription), it'll probably be ok to treat it as a channel exception and close the channel.