mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Feature/wait reopen channel state #533

Closed mosquito closed 1 year ago

mosquito commented 1 year ago

Fixes #505

coveralls commented 1 year ago

Coverage Status

Coverage: 88.26% (-0.5%) from 88.755% when pulling 87617bf33a65b541f18b8afef398866aacd4a60e on featire/wait-reopen-channel-state into 6d4c8a90f9b88e372c5dac7e093916fbf6145e1a on master.

phillipuniverse commented 1 year ago

@mosquito question about this with the removed connection construct.

In the aio_pika OpenTelemetry instrumentation located here there are a few fallbacks to getting the active URL from the channel. The connection url is being retrieved from the channel since that's all that is really available at publish time, which otel decorates here.

I'm wondering if you have some advice on what the correct thing to do in this case would be to support the changes in 9.1 while still supporting 7.x, 8.x and 9.1+?

I would hate to drop down to use channel._connection (although that will work most of the time), do you think change the method to async and use the get_underlay_channel()? Like this:

     async def set_channel(self, channel: AbstractChannel):
        # aio_rmq 9.1 removed the connection attribute from the abstract listings
        if hasattr(channel, "get_underlay_channel"):
            connection = (await channel.get_underlay_channel()).connection
        else:
            connection = channel.connection
        if getattr(connection, "connection", None):
            # aio_rmq 7
            url = connection.connection.url
        else:
            # aio_rmq 8
            url = connection.url
        self._attributes.update(
            {
                SpanAttributes.NET_PEER_NAME: url.host,
                SpanAttributes.NET_PEER_PORT: url.port,
            }
        )

Or would it be better in your opinion to keep this sync and check for channel._connection like this:

     async def set_channel(self, channel: AbstractChannel):
        # aio_rmq 9.1 removed the connection attribute from the abstract listings
        if hasattr(channel, "_connection"):
            connection = channel._connection
        else:
            connection = channel.connection
        if getattr(connection, "connection", None):
            # aio_rmq 7
            url = connection.connection.url
        else:
            # aio_rmq 8
            url = connection.url
        self._attributes.update(
            {
                SpanAttributes.NET_PEER_NAME: url.host,
                SpanAttributes.NET_PEER_PORT: url.port,
            }
        )