rabbitmq / amqp091-go

An AMQP 0-9-1 Go client maintained by the RabbitMQ team. Originally by @streadway: `streadway/amqp`
1.47k stars 135 forks source link

Add mutex guard to Channel methods #242

Open gnuletik opened 7 months ago

gnuletik commented 7 months ago

Is your feature request related to a problem? Please describe.

I used Channel.QueueDeclare concurrently and got the following error:

Exception (503) Reason: "unexpected command received"

Describe the solution you'd like

I'd like to add the following guard:

defer ch.m.Unlock()

to some methods of the Channel struct:

This is already implemented on the following methods (publish / ack related):

So I think that it would make sense to have it on other methods too.

Describe alternatives you've considered

Implementing the mutex in the business code is doable but it makes less sense considering that this is done by the library for some methods.

Additional context

No response

lukebakken commented 7 months ago

All of the RabbitMQ client libraries specifically do not allow sharing channels across threads and assume that applications that use the libraries have a connection per-thread, with associated channels. I doubt we will add this feature here, but I'd like @Zerpet's feedback.

I'm not sure why certain methods do use a mutex. I'll investigate when I have time.

gnuletik commented 7 months ago

Thanks for the fast feedback!

I can see the point of letting users manually handle the channels.

It seems that Ack() and Nack() are both concurrent-safe since 2018:

Regarding Publish(), this seems to be handled since 2012.


I think that it should be clarified in the docs that Publish*, Ack, Nack, Reject are thread-safe and others are not.

The docs could also suggest to create one channel per goroutine.

The only mention of thread-safe that I see is that SetLogger is not thread-safe.

Connection Pool

Would a concurrent-safe connection pool package (in the same fashion as pgxpool) would fit into this repo?

However, it seems that having multiple connections may be less useful with RabbitMQ in comparison to PostgreSQL because calls seems to be way faster. But I don't have huge experience with Rabbit. If that's true, then having a mutex in the library would be more efficient.

Zerpet commented 7 months ago

We could do a better job at documenting the thread-safety of channels. The document that Luke mention is this one: https://rabbitmq.com/channels.html

I'm not against this change. My concern is whether we may lose performance by making every operation on the channel synchronous. If we can prove with a benchmark that performance penalty is reasonable, I'll be happy to see this change in the library.

Regarding a connection pool, it will definitely benefit this repo. However, a connection pool is a concept of "smart clients", and this library has, intentionally, been kept as simple bindings to the AMQP protocol + RabbitMQ extension. If you may notice that this library does not have auto-reconnection, whilst the Java and .NET rabbitmq libraries do. This is an inherited non-goal. What I'm trying to say is that we can consider a connection pool type for this library, as long as it's not too clever 🙂

Something you may also consider is whether the existing CloudAMQP AMQProxy already covers what you had in mind for the connection pool.

gnuletik commented 7 months ago

Thanks for the fast feedback!

I'm glad the connection pool idea can be considered. However, I'm afraid having this may be too high level to respect the non-goal of the project. Especially as a connection pool implementation should need to drop failed connections / channels.

Thanks also for suggesting the AMQProxy! However, it feels a bit heavy to setup for a simple use-case.

Regarding the benchmark, is there a way to run the test with a mocked RabbitMQ server in the CI? I don't see a channel_test.go file.

However, I've setup this simple benchmark with a real rabbitMQ :

docker run -d --hostname my-rabbit --name some-rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password rabbitmq:3-management
package main

import (

    amqp "github.com/rabbitmq/amqp091-go"

func BenchmarkQueueDeclare(b *testing.B) {
    config := amqp.Config{
        Vhost:      "/",
        Properties: amqp.NewConnectionProperties(),

    conn, err := amqp.DialConfig("amqp://user:password@", config)
    require.NoError(b, err)
    defer conn.Close()

    channel, err := conn.Channel()
    require.NoError(b, err)
    defer channel.Close()

    for i := 0; i < b.N; i++ {
        name := fmt.Sprintf("queue-%d", i)
        _, err := channel.QueueDeclare(
            name,  // name of the queue
            false, // durable
            false, // delete when unused
            false, // exclusive
            false, // noWait
            nil,   // arguments
        require.NoError(b, err)

        _, err = channel.QueueDelete(
            false, // ifUnused
            false, // ifEmpty
            false, // noWait
        require.NoError(b, err)

Here are the results:

Without mutex lock in QueueDeclare

$ go test -bench=. -count 5
goos: darwin
goarch: arm64
pkg: test/producer
BenchmarkQueueDeclare-10             459       2188841 ns/op
BenchmarkQueueDeclare-10             588       1948132 ns/op
BenchmarkQueueDeclare-10             674       1859471 ns/op
BenchmarkQueueDeclare-10             740       1598775 ns/op
BenchmarkQueueDeclare-10             742       1626909 ns/op
ok      test/producer   7.859s

$ go test -bench=. -count 5
goos: darwin
goarch: arm64
pkg: test/producer
BenchmarkQueueDeclare-10             758       1502975 ns/op
BenchmarkQueueDeclare-10             730       1562227 ns/op
BenchmarkQueueDeclare-10             762       1502248 ns/op
BenchmarkQueueDeclare-10             703       1577003 ns/op
BenchmarkQueueDeclare-10             724       1687284 ns/op
ok      test/producer   7.402s

With mutex lock in QueueDeclare

$ go test -bench=. -count 5
goos: darwin
goarch: arm64
pkg: test/producer
BenchmarkQueueDeclare-10             739       1571499 ns/op
BenchmarkQueueDeclare-10             777       1627958 ns/op
BenchmarkQueueDeclare-10             801       1543984 ns/op
BenchmarkQueueDeclare-10             738       1519895 ns/op
BenchmarkQueueDeclare-10             699       1888766 ns/op
ok      test/producer   8.040s

$ go test -bench=. -count 5
goos: darwin
goarch: arm64
pkg: test/producer
BenchmarkQueueDeclare-10             754       1557935 ns/op
BenchmarkQueueDeclare-10             740       1559442 ns/op
BenchmarkQueueDeclare-10             669       1547426 ns/op
BenchmarkQueueDeclare-10             771       1459199 ns/op
BenchmarkQueueDeclare-10             772       1658854 ns/op
ok      test/producer   7.552s

It can be reduced to measuring the performance of

func BenchmarkMutex(b *testing.B) {
    var m sync.Mutex

    for i := 0; i < b.N; i++ {

which is:

goos: darwin
goarch: arm64
pkg: test/producer
BenchmarkMutex-10       88810612            13.50 ns/op
BenchmarkMutex-10       87675774            13.46 ns/op
BenchmarkMutex-10       87437000            13.39 ns/op
BenchmarkMutex-10       89020771            13.48 ns/op
BenchmarkMutex-10       88700265            13.58 ns/op
ok      test/producer   6.960s

So, the mutex lock / unlock seems to represent 0.00084% of ns/op value, which seems really light IMO but it depends on you.


gnuletik commented 7 months ago

Any news on this?

lukebakken commented 7 months ago

@gnuletik there is no need to bump this issue. This issue is not urgent, and, as we've said, this library works as documented.

If you'd like to submit a PR with tests and benchmarks, it would be appreciated, but there is no guarantee of when we can review it.