Open hchandola opened 1 year ago
I have been working with @hchandola via chat to resolve another issue when this one popped up.
Need to reproduce it locally ourselves, but this error originates from the aiohttp websocket library. My initial hunch is the heartbeat sent by the library fails and the underlying transport is set to closing.
Looking at the initial logs sent in, I didn't see any detaches come in from the service.
Just stumbled upon this as well. This is quite easy to replicate:
import asyncio
import logging
import logging.config
import os
from azure.servicebus import TransportType
from azure.servicebus.aio import AutoLockRenewer, ServiceBusClient
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": False,
"root": {
"level": "NOTSET",
"handlers": ["console", "file"],
},
"handlers": {
"console": {
"formatter": "simple",
"level": "INFO",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
},
"file": {
"formatter": "detailed",
"level": "DEBUG",
"class": "logging.FileHandler",
"filename": "sb.log",
"mode": "w",
},
},
"formatters": {
"simple": {"format": "%(levelname)-8s | %(name)s | %(message)s"},
"detailed": {
"format": "[%(asctime)s][%(levelname)-8s][%(name)s] %(message)s"
},
},
}
)
logging.captureWarnings(True)
logger = logging.getLogger("main")
connstr = os.environ["SERVICE_BUS_CONNECTION_STR"]
queue_name = os.environ["SERVICE_BUS_QUEUE_NAME"]
async def main() -> None:
# CHANGE ME
transport_type = TransportType.AmqpOverWebsocket
async with ServiceBusClient.from_connection_string(
conn_str=connstr,
transport_type=transport_type,
) as sb_client:
receiver = sb_client.get_queue_receiver(queue_name=queue_name)
renewer = AutoLockRenewer()
async with receiver, renewer:
# CHANGE ME
counter = 14
while True:
messages = await receiver.receive_messages(max_message_count=1)
logger.info(f"Received {len(messages)} messages")
if not messages:
break
for msg in messages:
renewer.register(receiver, msg)
sleep_duration = float(counter)
logger.info(
f"Handling message {counter}: sleeping for {sleep_duration} ..."
)
await asyncio.sleep(sleep_duration)
logger.info(f"Handling message {counter} DONE")
await receiver.complete_message(msg)
counter += 1
asyncio.run(main())
Errors start happening right after the 15 second mark:
INFO | main | Received 1 messages
INFO | main | Handling message 14: sleeping for 14.0 ...
INFO | main | Handling message 14 DONE
INFO | main | Received 1 messages
INFO | main | Handling message 15: sleeping for 15.0 ...
INFO | main | Handling message 15 DONE
INFO | main | Received 1 messages
INFO | main | Handling message 16: sleeping for 16.0 ...
INFO | main | Handling message 16 DONE
INFO | azure.servicebus.aio._base_handler_async | AMQP error occurred: (AMQPConnectionError('Error condition: ErrorCondition.SocketError\n Error Description: Can not send frame out due to exception: Cannot write to closing transport')), condition: (<ErrorCondition.SocketError: b'amqp:socket-error'>), description: ('Can not send frame out due to exception: Cannot write to closing transport').
The problem is that aiohttp sends a heartbeat, but:
.receive()
is the only way that the heartbeat can be "ACK'd"Where does the magic "15 second" come from? azure sets heartbeat=10, and aiohttp
heartbeat
seconds (10) -- this timer is cancelled as soon .receive()
is called, but this doesn't occur.receive()
is called, but again this doesn't doesn't occur10 + 5 = 15
QED.
I was able to workaround the issue by renewing message locks more frequently (every 10 seconds), which forces communication and thus forces a .receive()
call.
class LockRenewer:
def __init__(self) -> None:
self._stop_evt = asyncio.Event()
# This value should be lower than `DEFAULT_WEBSOCKET_HEARTBEAT_SECONDS * 1.5`.
self._renew_every = 10
self._tasks: list[asyncio.Task] = []
async def __aenter__(self) -> Self:
self._stop_evt.clear()
return self
async def __aexit__(self, *_args: Any) -> None:
await self._close()
async def _close(self) -> None:
self._stop_evt.set()
if len(self._tasks) != 0:
await asyncio.wait(self._tasks)
self._tasks.clear()
def register(
self, receiver: ServiceBusReceiver, msg: ServiceBusReceivedMessage
) -> None:
task = asyncio.create_task(self._auto_renew(receiver, msg))
self._tasks.append(task)
async def _auto_renew(
self, receiver: ServiceBusReceiver, msg: ServiceBusReceivedMessage
) -> None:
while True:
with contextlib.suppress(asyncio.TimeoutError):
await asyncio.wait_for(self._stop_evt.wait(), self._renew_every)
# If the above succeeds, then `_stop_evt` was set.
break
logger.debug("Renewing...")
try:
await receiver.renew_message_lock(msg)
except MessageAlreadySettled:
break
except Exception:
logger.error("Failed to auto-renew message", exc_info=True)
break
else:
logger.debug("Renewing DONE")
It may be possible to do this with azure's provided AutoLockRenewer
instead, but I think it requires patching private attributes.
It may also be possible to workaround the issue by monkeypatching DEFAULT_WEBSOCKET_HEARTBEAT_SECONDS
, either by increasing it or setting it to None, but this code comment suggests that this would not be wise.
I have the following code that gives errors on
complete_message
whenever the functionone_minute_work
takes a minute or so. The error I see in the logs is:Cannot write to closing transport