Azure / go-amqp

AMQP 1.0 client library for Go.
https://github.com/Azure/go-amqp
MIT License
110 stars 58 forks source link

Notify the socket close to the user using a channel #333

Closed Gsantomaggio closed 6 days ago

Gsantomaggio commented 2 months ago

Add a channel to the options to notify when the socket is closed.

This is a proposal, so I'd like your opinion before proceeding with the tests, etc...

The scope is to notify the user when the connection is closed, so there is a way to trace/log and implement some custom reconnection policy in case of an error or based on certain conditions.

Here an example:

closeCh := make(chan *amqp.CloseEvent, 1)

    go func(ch chan *amqp.CloseEvent) {
        for closeEvent := range ch {
            fmt.Printf("Close event received: %v\n", closeEvent)
        }
    }(closeCh)

    conn, err := amqp.Dial(context.TODO(), "amqp://localhost", &amqp.ConnOptions{
        SASLType:     amqp.SASLTypeAnonymous(),
        ChCloseEvent: closeCh,
    })

    fmt.Println("Press any key to cleanup and exit")
    reader := bufio.NewReader(os.Stdin)
    _, _ = reader.ReadString('\n')

    err = conn.Close()

In case the socket is closed correctly, the user will receive:

Close event received: &{read tcp [::1]:50493->[::1]:5672: use of closed network connection 0x140001281a0}

In case the connection is killed ( in this case by rabbitmq) the user will receive:

Close event received: &{*Error{Condition: amqp:internal-error, Description: Connection forced: "Closed via management plugin", Info: map[]} 0x140001161a0}

This may also help for this issue https://github.com/Azure/go-amqp/issues/295 Thank you

Gsantomaggio commented 2 months ago

@microsoft-github-policy-service agree

Gsantomaggio commented 2 months ago

cc @jhendrixMSFT

Gsantomaggio commented 2 months ago

@jhendrixMSFT something like this https://github.com/Azure/go-amqp/pull/333/commits/44a4fdbb34ac74d67acb60d2222305e307c59160#diff-4f427d2b022907c552328e63f137561f6de92396d7a6e8f6c2ea1bcf0db52654R422-R430 ?

it will be:

conn, err := amqp.Dial(context.TODO(), "amqp://localhost", &amqp.ConnOptions{
        SASLType: amqp.SASLTypeAnonymous(),
    })
    if err != nil {
        fmt.Printf("Failed to dial: %v\n", err)
        return
    }

    go func() {
        for {
            select {
            case x := <-conn.Done():
                fmt.Printf("Connection closed, err: %v\n", x)
                return
            }
        }
    }()

    fmt.Println("Press any key to close the connection")
    reader := bufio.NewReader(os.Stdin)
    _, _ = reader.ReadString('\n')
    err = conn.Close()
    if err != nil {
        return
    }
    time.Sleep(900 * time.Millisecond)

Closed normally:

Connection closed, err: amqp: connection closed

Forced:

Connection closed, err: *Error{Condition: amqp:internal-error, Description: Connection forced: "Closed via management plugin", Info: map[]}

Does it make make sense to you?

Thank you

Gsantomaggio commented 2 weeks ago

@jhendrixMSFT hi, Is there anything we can do to proceed with this PR? Thank you

jhendrixMSFT commented 1 week ago

Sorry for the delay.

I was discussing this with @richardpark-msft a while back and one wrinkle with this pattern is that once the error is read from Done() it essentially evaporates (technically, you could call Close() to retrieve it but that looks a little weird). So, we were debating if it would be better to have Done() and Err() methods like context.Context, where Done() returns a <-chan struct{} that's closed when Conn closes and you then call Err() to see if Conn closed due to an error condition or not. The downside with this it that it requires more machinery to properly implement to ensure there are no races.

Any thoughts on this and how it would impact your use case?

Gsantomaggio commented 1 week ago

it essentially evaporates

Oh, Thank you! I didn't consider it.

Any thoughts on this and how it would impact your use case?

It is a reasonable solution.

My use case is simple:

I need to understand when a connection is closed:

  1. In a normal way, by calling with the Close() method
  2. In an unexpected way, like connection killed, network problems etc...

So, in case number 2, I can implement the logic to reconnect the client.

jhendrixMSFT commented 1 week ago

This has been added in https://github.com/Azure/go-amqp/pull/348

Gsantomaggio commented 6 days ago

Closed for https://github.com/Azure/go-amqp/pull/348 Thank you!