streadway / amqp

Go client for AMQP 0.9.1
http://godoc.org/github.com/streadway/amqp
BSD 2-Clause "Simplified" License
4.85k stars 620 forks source link

Creating a pool of channels? #403

Open xamenyap opened 5 years ago

xamenyap commented 5 years ago

Hi,

I am trying to find a way to create a pool of channels on one connection. As I could not find any examples, I tried to come up with my own implementation. Here it is:

type Publisher struct {
    url           string
    pollDuration  time.Duration
    channelsCount int
    conn          *amqp.Connection
    sessions      []session
    errorCh       chan error
    amqpErrorCh   chan *amqp.Error
}

type session struct {
    channel *amqp.Channel
    confirm chan *amqp.Confirmation
}

func NewPublisher(
    url string,
    pollDuration time.Duration,
    channelsCount int,
) *Publisher {
    p := &Publisher{
        url:           url,
        pollDuration:  pollDuration,
        channelsCount: channelsCount,
        errorCh:       make(chan error),
        amqpErrorCh:   make(chan *amqp.Error),
    }

    go func() {
        for {
            select {
            case <-p.amqpErrorCh:
                p.reconnect()
            case <-p.errorCh:
                p.reconnect()
            }
        }
    }()

    go p.connect()

    return p
}

func (p *Publisher) connect() {
    conn, err := amqp.Dial(p.url)

    if err != nil {
        p.errorCh <- err
        return
    }

    conn.NotifyClose(p.amqpErrorCh)

    p.conn = conn
    p.sessions = make([]session, 0)

    for len(p.sessions) < p.channelsCount {
        ch, err := conn.Channel()
        if err != nil {
            p.errorCh <- err
            return
        }

        p.sessions = append(p.sessions, session{ch, make(chan *amqp.Confirmation)})
    }
}

func (p *Publisher) reconnect() {
    time.Sleep(p.pollDuration)
    p.connect()
}

The basic idea here is to create a number of sessions, which holds an *amqp.Channel and and a chan *amqp.Confirmation as I need to use confirm mode in my project. A go routine is spawned in NewPublisher() in order to listen to the errors, and re-create another connection and a batch of channels. I really appreciate it if someone can give me some reviews on this implementation. I can contribute to the examples if this is an appropriate approach. Thanks!

hsyed commented 4 years ago

I need something similar and this is one way of solving the problem. I'm not that familiar with AMQP. @streadway should it not be possible to multiplex multiple publish confirms over a single channel ?.

streadway commented 4 years ago

In AMQP, Channel messages are multiplexed on a single TCP connection. RabbitMQ will close the TCP window size as a method of flow control. Differentiating at the protocol level between a soft exception vs hard exception, channel vs connection exception would complicate this Channel pool concept.

What are you trying to accomplish with the pool of channels?

hsyed commented 4 years ago

| What are you trying to accomplish with the pool of channels?

Reliable publishes, this is the abstraction I created for this:

type SynchronousPublisher struct {
    mu               sync.Mutex
    name             string
    channel          *amqp.Channel
    confirmationChan chan amqp.Confirmation
    deadline         time.Duration
}

func NewSynchronousPublisher(name string, conn *amqp.Connection) (*SynchronousPublisher, error) {
    var err error
    sp := &SynchronousPublisher{
        name:     name,
        deadline: 100 * time.Millisecond,
    }
    if sp.channel, err = conn.Channel(); err != nil {
        return nil, err
    } else if err = sp.channel.Confirm(false); err != nil {
        return nil, err
    } else {
        sp.confirmationChan = sp.channel.NotifyPublish(make(chan amqp.Confirmation, 1))
    }

    return sp, nil
}

func (sp *SynchronousPublisher) Publish(msg *amqp.Publishing) error {
    sp.mu.Lock()
    if err := sp.channel.Publish("", sp.name, true, false, *msg); err != nil {
        sp.mu.Unlock()
        return err
    } else {
        select {
        case conf := <-sp.confirmationChan:
            sp.mu.Unlock()
            if !conf.Ack {
                return errors.New("negative ack")
            }
            return nil
        case <-time.After(sp.deadline):
            sp.mu.Unlock()
            return errors.New("deadline elapsed")
        }
    }
}

In this case the channel is locked until the publish is confirmed. So a pool would be needed to do publishes in parallel.

streadway commented 4 years ago

The way that publish confirms work on a channel is that a single protocol message can contain a range of delivery to confirm so that the server can batch a sequence in ACKs in one response. The client library turns that range of acks into one confirm per publish.

In this example the timeout would not consume the ack/nack and there wouldn't be one recv on the confirmation channel for every publish and the channel would need to closed and opened again. I'd recommend re-reopening the channel when you reach the deadline.

Creating a pool of these publishers makes sense to handle independently from AMQP. A basic algorithm to limit the number of in-flight publishings would be something like:

type Pool struct {
    publishers chan *SynchronousPublisher
}

func NewPublisherPool(name string, conn *amqp.Connection) (*Pool, error) {
    const inflight = 10

    pool = Pool{
        publishers: make(chan *SynchronousPublisher, inflight),
    }

    for i := 0; i < cap(pool.publishers); i++ {
        sync, err := NewSynchronousPublisher(name, conn)
        if err != nil {
            return nil, err
        }
        pool.publishers <- sync
    }
    return pool, nil
}

func (p *Pool) Publish(msg *amqp.Publishing) error {
    pub := <-p.publishers
    err := pub.Publish(msg)
    p.publishers <- pool
    return err
}
sirius1024 commented 4 years ago

Try this extension with reconnecting and cluster supported: https://github.com/sirius1024/go-amqp-reconnect

houseofcat commented 4 years ago

I created channel/connection pools and have been in production. https://github.com/houseofcat/turbocookedrabbit

Always room for improvement (submit PRs!) and it's based off of Streadway/amqp.

epsilon-akshay commented 9 months ago

@streadway do you suggest to create a channel and close for every goroutine, a pooling of channel is necessary when we dont want to keep recreating channel (lets say for an api exposed to publish rabbit mq messages) for every api request, this would impact throughput given atleast extra 4-6 tcp packets has to be transmitted to open and close a channel (connection can be kept single)

streadway commented 9 months ago

@epsilon-akshay channels are logical streams that share the same tcp session. They were intended to isolate some error modes, but the n my uses I find it simpler to have 1:1 channel/connection and even separating the publisher connection from the subscriber.

The library is goroutine safe so you should be able to share the connection and channel between goroutines.

Other than that, your usage will depend on your application.

epsilon-akshay commented 9 months ago

was mostly thinking from a usage perspective, as a user when i write my apis which does some processing and simply publishes to rabbit mq i would not want to create and close channels per request, rather i would love to have a wrapper dialer which would create a set of channels over a single TCP connection that can be reused and maybe autoconnected (going little ahead here 😄). This would solve for a lot of usecases people would want (similar to how http.Client reuses its connections), But i do understand that the initial though going into the library was to have a 1:1 mapping, was just suggesting this kind of interface also to be provided.