eandersson / amqpstorm

Thread-safe Python RabbitMQ Client & Management library
MIT License
188 stars 36 forks source link

reached the maximum number of channels raised with closed channels #55

Closed mikemrm closed 5 years ago

mikemrm commented 6 years ago

Hello,

I've run into an issue with creating new channels and receiving the reached the maximum number of channels 65535 when attempting to create a new channel.

After some digging, I noticed Connection._get_next_available_channel_id() accounts for all channels, both open and closed. I believe filtering the count for just opened should resolve this issue.

I tested with a quick fix

    def _get_next_available_channel_id(self):
        channel_id = len(self._channels) + 1
        active_channels = [
            ch for ch in list(self._channels.values()) if ch and ch.is_open
        ]
        if len(active_channels) >= self.max_allowed_channels:
            raise AMQPConnectionError(
                'reached the maximum number of channels %d' %
                self.max_allowed_channels)
        return channel_id

However it may be better to just keep an active count

eandersson commented 6 years ago

Thanks for reporting this @mikemrm.

I'll look into this tonight.

eandersson commented 6 years ago

The error message is a little misleading, as the real limitation here is that RabbitMQ cannot accept a channel id value higher than 65535.

I'll look into implementing a basic way of re-using closed channels.

eandersson commented 6 years ago

If you have time, please try the above ^ patch.

mikemrm commented 6 years ago

Thanks for taking the time on this. My current environment is a web endpoint that posts updates to a queue. I'm still a bit new with rabbitmq, but my understanding is each thread should have its own channel. So in my setup, I share the connection and create a new channel when adding to the queue. This is where the ton of channels are coming from.

I did some testing, and noticed I would get amqpstorm.exception.AMQPConnectionError: socket/connection closed after a few seconds.

In my test script, I am launching 100 threads per second, each thread creates a channel waits 1 second, and then closes. My thought is, a channel may be getting recreated too quickly.

A possible cool down period could be added for each channel id as it gets closed or setting up an increment variable that loops and skips duplicate ids could be a more efficient route, however some edge cases could result in a similar issue.

mikemrm commented 6 years ago

I took a look at the updated _get_next_available_channel_id and I believe the problem is with returning the count of channels, as it is possible a channel id was created before, and a different id was removed, resulting in the same count.

I've modified it to just loop through and find the first open channel otherwise it raises max channels. mikemrm/amqpstorm@3530608

I've tested this a few times with the same script, and hasn't thrown an error.

eandersson commented 6 years ago

If you are publishing messages I would just share the channel across the threads. The reason why you would want to use individual channels is primarily for consuming. Having that said it should still work.

Feel free to open a pull request against my branch with mikemrm/amqpstorm@3530608.

eandersson commented 6 years ago

What do you think about this? I was trying to come up with something that would be fast, but still offer the features you need.

https://github.com/eandersson/amqpstorm/commit/4bbe32ab268d75609a719e49840d90c40e91ef1e

def _get_next_available_channel_id(self):
    if not self._channels:
        return 1

    last_channel_id = int(next(reversed(self._channels)))
    next_channel_id = last_channel_id + 1
    if next_channel_id < self.max_allowed_channels:
        return next_channel_id

    for index in compatibility.RANGE(1, self.max_allowed_channels):
        if index in self._channels:
            continue
        return index

    raise AMQPConnectionError(
        'reached the maximum number of channels %d' %
        self.max_allowed_channels)
mikemrm commented 6 years ago

Thanks for that information, I'll switch my stuff over to using a single channel, and see how it goes.

That was much faster, but that range is still pretty slow, so I took another stab at it, with one of the suggestions I had before, and it seems to be the quickest.

mikemrm/amqpstorm@d13fb28

    def _get_next_available_channel_id(self):
        if self._last_channel_id == self.max_allowed_channels:
            self._last_channel_id = 0

        for index in range(self._last_channel_id + 1, self.max_allowed_channels):
            if index in self._channels:
                continue
            self._last_channel_id = index
            return index

        if self._last_channel_id != 0:
            self._last_channel_id = 0
            return self._get_next_available_channel_id()

        raise AMQPConnectionError(
            'reached the maximum number of channels %d' % self.max_allowed_channels)

I ran some tests, and this one takes 0.24 seconds to create 100k fake channels.

[single range] Runtime: 328.03 sec
[reversed nxt] Runtime: 106.00 sec
[ channel id ] Runtime: 0.24 sec

Let me know what you think. #56

eandersson commented 6 years ago

Looks good. I made some minor modifications. Not sure if my assumptions are correct. Tried to write some tests to make sure. I will also need to do another pass on the channel removal part.

eandersson commented 6 years ago

I still want to fix this, but haven't had the time to finalize a solution. Is this still critical for you @mikemrm?

mikemrm commented 6 years ago

My local modifications are handling it fine. So I'm good

eandersson commented 5 years ago

The code is merged, and I am ready to release 2.6.0, but if you (or anyone else) has some time to run some basic tests, that would be great!