Closed andrei-korshikov closed 1 month ago
Do you by chance have a client to replicate the problem? I've never touched websockets so this is a little new to me. I'm well versed in wireshark/tcpdump/tcp and becoming more so in asyncio, so I should be able to run with it if you can provide a little details on a simple setup to replicate the issue. Don't assume I know what GNS3 does with websockets so if there is something I need to do on that side let me know.
Good hunting!
a client to replicate the problem
- "After a reasonable amount of time" Client decides to be an active closer. In case of WebSockets timeout is determined by close_timeout and corresponding debug messages are timed out waiting for TCP close and half-closing TCP connection. By default, close_timeout in WebSockets is 10 seconds, so such delay on application closure is very visible.
So, WebSockets is such a client. As far as I remember, they try to be as close to the standard (RFC), as possible (they call it focus on correctness
on the main page).
simple setup
# GNS3 WebSocket server violates RFC 6455 so we have to be active closer
WS_CLOSE_TIMEOUT = 0.05
SILENCE_ACTIONS = [
'compute.updated',
'ping',
]
RECONNECT_TIMEOUT = 1.618
async def main() -> None:
async with asyncio.TaskGroup() as tasks:
tasks.create_task(websocket_logger(CONTROLLER_WS))
tasks.create_task(websocket_logger(COMPUTE_WS))
async def websocket_logger(endpoint: str) -> None:
while True:
try:
async with websockets.connect(endpoint, close_timeout=WS_CLOSE_TIMEOUT) as websocket:
async for message_text in websocket:
message = json.loads(message_text)
if message['action'] not in SILENCE_ACTIONS:
log.debug(f'{endpoint}: {message}')
except ConnectionRefusedError:
log.info(f'Connection to {endpoint!r} refused.')
await asyncio.sleep(RECONNECT_TIMEOUT)
Something like this. To replicate the issue just remove (set to default) close_timeout
.
I was really hoping for a fully working client replication. I'll play around with this but it would be helpful if you fleshed out the entire thing.
I'm doing a crash course in websockets and the ws endpoints in gns3. I'll reply back when i think I have your code tied into a working replication.
I think I have a replication now. Just to verify, the issue is the client calls close() and then the server shuts down the connection exactly 10 seconds later?
The server does NOT shut down the TCP connection (does not become an active closer, does not initiate TCP Close). The client does. That is the issue.
10 seconds is just client default timeout.
hmm pretty sure the issue is you didn't submit a working replication and I'm attempting to put it together (and failing at it). Feel free to remove my guess work at anytime.
Is this the correct way to replicate this? Your code seemed to never come out of the loop so I wasn't sure what was triggering the close call from the client. I removed the loop and just called close().
import asyncio
import base64
import json
import websockets
import logging
logger = logging.getLogger('websockets')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
log = logging.getLogger(__name__)
# GNS3 WebSocket server violates RFC 6455 so we have to be active closer
# Lets give Websockets a chance to get data.
WS_CLOSE_TIMEOUT = 5
RECONNECT_TIMEOUT = 1.618
CONTROLLER_WS_API = '/v2/notifications/ws'
COMPUTE_WS = '/v2/compute/notifications/ws'
SERVER = 'x.x.x.x:3080'
USER = 'USER'
PASS = 'PASS'
CREDS = f'{USER}:{PASS}'
ENCODED_CREDS = base64.b64encode(CREDS.encode()).decode()
CONTROLLER_URI = f'ws://{SERVER}{CONTROLLER_WS_API}'
COMPUTE_URI = f'ws://{SERVER}{COMPUTE_WS}'
async def main() -> None:
async with asyncio.TaskGroup() as tasks:
tasks.create_task(websocket_logger(CONTROLLER_URI))
async def websocket_logger(endpoint: str) -> None:
headers = {
'Authorization': f'Basic {ENCODED_CREDS}'
}
try:
#async with websockets.connect(endpoint, close_timeout=WS_CLOSE_TIMEOUT, extra_headers=headers) as websocket:
async with websockets.connect(endpoint, extra_headers=headers) as websocket:
async for message_text in websocket:
print(f'Slow: We got a message, tell the server to we\'re done')
print(f'Slow: Is the socket closed? :{websocket.closed}:')
reply = await websocket.close()
print(f'Slow: We got a reply of :{reply}:. Did that take a while?')
print(f'Slow: Is the socket closed? :{websocket.closed}:')
break
except ConnectionRefusedError:
log.info(f'Connection to {endpoint!r} refused.')
await asyncio.sleep(RECONNECT_TIMEOUT)
print("\n\nSwitching to lower timeout.\n\n")
try:
async with websockets.connect(endpoint, close_timeout=WS_CLOSE_TIMEOUT, extra_headers=headers) as websocket:
async for message_text in websocket:
print(f'Fast: We got a message, tell the serve to we\'re done')
print(f'Fast: Is the socket closed? :{websocket.closed}:')
reply = await websocket.close()
print(f'Fast: We got a reply of :{reply}:. Did that take a while?')
print(f'Fast: Is the socket closed? :{websocket.closed}:')
break
except ConnectionRefusedError:
log.info(f'Connection to {endpoint!r} refused.')
await asyncio.sleep(RECONNECT_TIMEOUT)
if __name__ == '__main__':
asyncio.run(main())
It does seem like the server is ignoring the close call. Just thinking out loud. If the server ignored the close message shouldn't it return instantly? I poked around in GNS3 trying to wrap my head around how its handling the close call and didn't make much progress.
2024-02-04 19:55:45,389 - DEBUG - = connection is CONNECTING
2024-02-04 19:55:45,390 - DEBUG - > GET /v2/notifications/ws HTTP/1.1
2024-02-04 19:55:45,390 - DEBUG - > Host: 192.168.1.100:3080
2024-02-04 19:55:45,390 - DEBUG - > Upgrade: websocket
2024-02-04 19:55:45,390 - DEBUG - > Connection: Upgrade
2024-02-04 19:55:45,390 - DEBUG - > Sec-WebSocket-Key: krI98/AQ4ncZF2mWbcPXjg==
2024-02-04 19:55:45,390 - DEBUG - > Sec-WebSocket-Version: 13
2024-02-04 19:55:45,390 - DEBUG - > Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
2024-02-04 19:55:45,390 - DEBUG - > Authorization: Basic Z25zM2FwaTpKQDlkMDlod2hkamhza2hmQCMk
2024-02-04 19:55:45,390 - DEBUG - > User-Agent: Python/3.11 websockets/12.0
2024-02-04 19:55:45,395 - DEBUG - < HTTP/1.1 101 Switching Protocols
2024-02-04 19:55:45,395 - DEBUG - < Upgrade: websocket
2024-02-04 19:55:45,395 - DEBUG - < Connection: upgrade
2024-02-04 19:55:45,395 - DEBUG - < Sec-WebSocket-Accept: oz4Bk2GvFMM4MWocNwiljVQIT40=
2024-02-04 19:55:45,395 - DEBUG - < Sec-WebSocket-Extensions: permessage-deflate
2024-02-04 19:55:45,395 - DEBUG - < Date: Mon, 05 Feb 2024 00:55:45 GMT
2024-02-04 19:55:45,395 - DEBUG - < Server: Python/3.10 aiohttp/3.9.1
2024-02-04 19:55:45,395 - DEBUG - = connection is OPEN
2024-02-04 19:55:45,396 - DEBUG - < TEXT '{"action": "ping", "event": {"cpu_usage_percent...y_usage_percent": 1.6}}' [84 bytes]
Slow: We got a message, tell the server to we're done
Slow: Is the socket closed? :False:
2024-02-04 19:55:45,397 - DEBUG - = connection is CLOSING
2024-02-04 19:55:45,397 - DEBUG - > CLOSE 1000 (OK) [2 bytes]
2024-02-04 19:55:45,399 - DEBUG - < CLOSE 1000 (OK) [2 bytes]
2024-02-04 19:55:55,409 - DEBUG - ! timed out waiting for TCP close
2024-02-04 19:55:55,409 - DEBUG - x half-closing TCP connection
2024-02-04 19:55:55,410 - DEBUG - = connection is CLOSED
Slow: We got a reply of :None:. Did that take a while?
Slow: Is the socket closed? :True:
Switching to lower timeout.
2024-02-04 19:55:55,412 - DEBUG - = connection is CONNECTING
2024-02-04 19:55:55,412 - DEBUG - > GET /v2/notifications/ws HTTP/1.1
2024-02-04 19:55:55,412 - DEBUG - > Host: 192.168.1.100:3080
2024-02-04 19:55:55,412 - DEBUG - > Upgrade: websocket
2024-02-04 19:55:55,412 - DEBUG - > Connection: Upgrade
2024-02-04 19:55:55,412 - DEBUG - > Sec-WebSocket-Key: fc0+d7bvdWwdvk2+LhYU+Q==
2024-02-04 19:55:55,412 - DEBUG - > Sec-WebSocket-Version: 13
2024-02-04 19:55:55,412 - DEBUG - > Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
2024-02-04 19:55:55,412 - DEBUG - > Authorization: Basic Z25zM2FwaTpKQDlkMDlod2hkamhza2hmQCMk
2024-02-04 19:55:55,412 - DEBUG - > User-Agent: Python/3.11 websockets/12.0
2024-02-04 19:55:55,417 - DEBUG - < HTTP/1.1 101 Switching Protocols
2024-02-04 19:55:55,417 - DEBUG - < Upgrade: websocket
2024-02-04 19:55:55,417 - DEBUG - < Connection: upgrade
2024-02-04 19:55:55,417 - DEBUG - < Sec-WebSocket-Accept: gRtFvtO5+oqnWrPtd3VLRIELdWo=
2024-02-04 19:55:55,417 - DEBUG - < Sec-WebSocket-Extensions: permessage-deflate
2024-02-04 19:55:55,417 - DEBUG - < Date: Mon, 05 Feb 2024 00:55:55 GMT
2024-02-04 19:55:55,417 - DEBUG - < Server: Python/3.10 aiohttp/3.9.1
2024-02-04 19:55:55,417 - DEBUG - = connection is OPEN
2024-02-04 19:55:55,418 - DEBUG - < TEXT '{"action": "ping", "event": {"cpu_usage_percent...y_usage_percent": 1.6}}' [84 bytes]
Fast: We got a message, tell the serve to we're done
Fast: Is the socket closed? :False:
2024-02-04 19:55:55,418 - DEBUG - = connection is CLOSING
2024-02-04 19:55:55,418 - DEBUG - > CLOSE 1000 (OK) [2 bytes]
2024-02-04 19:55:55,420 - DEBUG - < CLOSE 1000 (OK) [2 bytes]
2024-02-04 19:56:00,426 - DEBUG - ! timed out waiting for TCP close
2024-02-04 19:56:00,426 - DEBUG - x half-closing TCP connection
2024-02-04 19:56:00,427 - DEBUG - = connection is CLOSED
Fast: We got a reply of :None:. Did that take a while?
Fast: Is the socket closed? :True:
Slow
19:55:45.389366 IP 192.168.1.100.33346 > 192.168.1.100.3080: Flags [S], seq 2297948997, win 65495, options [mss 65495,sackOK,TS val 1884165694 ecr 0,nop,wscale 7], length 0
19:55:45.389396 IP 192.168.1.100.3080 > 192.168.1.100.33346: Flags [S.], seq 2718374491, ack 2297948998, win 65483, options [mss 65495,sackOK,TS val 1884165694 ecr 1884165694,nop,wscale 7], length 0
19:55:45.389418 IP 192.168.1.100.33346 > 192.168.1.100.3080: Flags [.], ack 1, win 512, options [nop,nop,TS val 1884165694 ecr 1884165694], length 0
19:55:45.390556 IP 192.168.1.100.33346 > 192.168.1.100.3080: Flags [P.], seq 1:344, ack 1, win 512, options [nop,nop,TS val 1884165695 ecr 1884165694], length 343
19:55:45.390570 IP 192.168.1.100.3080 > 192.168.1.100.33346: Flags [.], ack 344, win 509, options [nop,nop,TS val 1884165695 ecr 1884165695], length 0
19:55:45.394895 IP 192.168.1.100.3080 > 192.168.1.100.33346: Flags [P.], seq 1:248, ack 344, win 512, options [nop,nop,TS val 1884165700 ecr 1884165695], length 247
19:55:45.394929 IP 192.168.1.100.33346 > 192.168.1.100.3080: Flags [.], ack 248, win 511, options [nop,nop,TS val 1884165700 ecr 1884165700], length 0
19:55:45.396604 IP 192.168.1.100.3080 > 192.168.1.100.33346: Flags [P.], seq 248:315, ack 344, win 512, options [nop,nop,TS val 1884165701 ecr 1884165700], length 67
19:55:45.396624 IP 192.168.1.100.33346 > 192.168.1.100.3080: Flags [.], ack 315, win 511, options [nop,nop,TS val 1884165701 ecr 1884165701], length 0
19:55:45.397197 IP 192.168.1.100.33346 > 192.168.1.100.3080: Flags [P.], seq 344:352, ack 315, win 512, options [nop,nop,TS val 1884165702 ecr 1884165701], length 8
19:55:45.398809 IP 192.168.1.100.3080 > 192.168.1.100.33346: Flags [P.], seq 315:319, ack 352, win 512, options [nop,nop,TS val 1884165703 ecr 1884165702], length 4
19:55:45.442850 IP 192.168.1.100.33346 > 192.168.1.100.3080: Flags [.], ack 319, win 512, options [nop,nop,TS val 1884165748 ecr 1884165703], length 0
19:55:45.930678 IP 192.168.1.100.3080 > 192.168.1.100.33346: Flags [P.], seq 319:517, ack 352, win 512, options [nop,nop,TS val 1884166235 ecr 1884165748], length 198
19:55:45.930709 IP 192.168.1.100.33346 > 192.168.1.100.3080: Flags [.], ack 517, win 511, options [nop,nop,TS val 1884166235 ecr 1884166235], length 0
19:55:55.409768 IP 192.168.1.100.33346 > 192.168.1.100.3080: Flags [F.], seq 352, ack 517, win 512, options [nop,nop,TS val 1884175714 ecr 1884166235], length 0
19:55:55.410649 IP 192.168.1.100.3080 > 192.168.1.100.33346: Flags [F.], seq 517, ack 353, win 512, options [nop,nop,TS val 1884175715 ecr 1884175714], length 0
19:55:55.410687 IP 192.168.1.100.33346 > 192.168.1.100.3080: Flags [.], ack 518, win 512, options [nop,nop,TS val 1884175715 ecr 1884175715], length 0
Fast
19:55:55.411642 IP 192.168.1.100.51924 > 192.168.1.100.3080: Flags [S], seq 2814044814, win 65495, options [mss 65495,sackOK,TS val 1884175716 ecr 0,nop,wscale 7], length 0
19:55:55.411666 IP 192.168.1.100.3080 > 192.168.1.100.51924: Flags [S.], seq 4193058584, ack 2814044815, win 65483, options [mss 65495,sackOK,TS val 1884175716 ecr 1884175716,nop,wscale 7], length 0
19:55:55.411689 IP 192.168.1.100.51924 > 192.168.1.100.3080: Flags [.], ack 1, win 512, options [nop,nop,TS val 1884175716 ecr 1884175716], length 0
19:55:55.413089 IP 192.168.1.100.51924 > 192.168.1.100.3080: Flags [P.], seq 1:344, ack 1, win 512, options [nop,nop,TS val 1884175718 ecr 1884175716], length 343
19:55:55.413105 IP 192.168.1.100.3080 > 192.168.1.100.51924: Flags [.], ack 344, win 509, options [nop,nop,TS val 1884175718 ecr 1884175718], length 0
19:55:55.416805 IP 192.168.1.100.3080 > 192.168.1.100.51924: Flags [P.], seq 1:248, ack 344, win 512, options [nop,nop,TS val 1884175722 ecr 1884175718], length 247
19:55:55.416838 IP 192.168.1.100.51924 > 192.168.1.100.3080: Flags [.], ack 248, win 511, options [nop,nop,TS val 1884175722 ecr 1884175722], length 0
19:55:55.418403 IP 192.168.1.100.3080 > 192.168.1.100.51924: Flags [P.], seq 248:315, ack 344, win 512, options [nop,nop,TS val 1884175723 ecr 1884175722], length 67
19:55:55.418425 IP 192.168.1.100.51924 > 192.168.1.100.3080: Flags [.], ack 315, win 511, options [nop,nop,TS val 1884175723 ecr 1884175723], length 0
19:55:55.419110 IP 192.168.1.100.51924 > 192.168.1.100.3080: Flags [P.], seq 344:352, ack 315, win 512, options [nop,nop,TS val 1884175724 ecr 1884175723], length 8
19:55:55.420594 IP 192.168.1.100.3080 > 192.168.1.100.51924: Flags [P.], seq 315:319, ack 352, win 512, options [nop,nop,TS val 1884175725 ecr 1884175724], length 4
19:55:55.462833 IP 192.168.1.100.51924 > 192.168.1.100.3080: Flags [.], ack 319, win 512, options [nop,nop,TS val 1884175768 ecr 1884175725], length 0
19:55:55.963491 IP 192.168.1.100.3080 > 192.168.1.100.51924: Flags [P.], seq 319:517, ack 352, win 512, options [nop,nop,TS val 1884176268 ecr 1884175768], length 198
19:55:55.963523 IP 192.168.1.100.51924 > 192.168.1.100.3080: Flags [.], ack 517, win 511, options [nop,nop,TS val 1884176268 ecr 1884176268], length 0
19:56:00.426698 IP 192.168.1.100.51924 > 192.168.1.100.3080: Flags [F.], seq 352, ack 517, win 512, options [nop,nop,TS val 1884180731 ecr 1884176268], length 0
19:56:00.427567 IP 192.168.1.100.3080 > 192.168.1.100.51924: Flags [F.], seq 517, ack 353, win 512, options [nop,nop,TS val 1884180732 ecr 1884180731], length 0
19:56:00.427605 IP 192.168.1.100.51924 > 192.168.1.100.3080: Flags [.], ack 518, win 512, options [nop,nop,TS val 1884180732 ecr 1884180732], length 0
This isn't from the run above but these are the logs that fire while in debug mode.
Slow
2024-02-04 21:42:23 INFO notification_handler.py:73 New client has connected to controller WebSocket
2024-02-04 21:42:24 INFO notification_handler.py:82 Client has disconnected from controller WebSocket
2024-02-04 21:42:24 INFO response.py:58 GET /v2/notifications/ws
2024-02-04 21:42:24 DEBUG response.py:59 {'Host': '192.168.1.100:3080', 'Upgrade': 'websocket', 'Connection': 'Upgrade', 'Sec-WebSocket-Key': 'XXXX', 'Sec-WebSocket-Version': '13', 'Sec-WebSocket-Extensions': 'permessage-deflate; client_max_window_bits', 'Authorization': 'Basic XXX', 'User-Agent': 'Python/3.11 websockets/12.0'}
2024-02-04 21:42:24 DEBUG response.py:61 {}
2024-02-04 21:42:24 INFO response.py:62 Response: 200 OK
2024-02-04 21:42:24 DEBUG response.py:63 {'Connection': 'close', 'X-Route': '/v2/notifications/ws', 'Server': 'Python/3.10 GNS3/2.2.45'}
2024-02-04 21:42:24 INFO web_log.py:211 192.168.1.100 [04/Feb/2024:21:42:23 -0500] "GET /v2/notifications/ws HTTP/1.1" 200 445 "-" "Python/3.11 websockets/12.0"
Fast
2024-02-04 21:42:33 INFO notification_handler.py:73 New client has connected to controller WebSocket
2024-02-04 21:42:34 INFO notification_handler.py:82 Client has disconnected from controller WebSocket
2024-02-04 21:42:34 INFO response.py:58 GET /v2/notifications/ws
2024-02-04 21:42:34 DEBUG response.py:59 {'Host': '192.168.1.100:3080', 'Upgrade': 'websocket', 'Connection': 'Upgrade', 'Sec-WebSocket-Key': 'XXX==', 'Sec-WebSocket-Version': '13', 'Sec-WebSocket-Extensions': 'permessage-deflate; client_max_window_bits', 'Authorization': 'Basic XXX', 'User-Agent': 'Python/3.11 websockets/12.0'}
2024-02-04 21:42:34 DEBUG response.py:61 {}
2024-02-04 21:42:34 INFO response.py:62 Response: 200 OK
2024-02-04 21:42:34 DEBUG response.py:63 {'Connection': 'close', 'X-Route': '/v2/notifications/ws', 'Server': 'Python/3.10 GNS3/2.2.45'}
2024-02-04 21:42:34 INFO web_log.py:211 192.168.1.100 [04/Feb/2024:21:42:33 -0500] "GET /v2/notifications/ws HTTP/1.1" 200 445 "-" "Python/3.11 websockets/12.0"
Well, there are a lot of aiohttp bugs that talk about race conditions while closing. One of the fixes is throwing a asyncio.sleep(0), but i'm not %100 where that would go. I'm thinking in the shutdown of the server section.
To make sure things work in an ideal world I (cough cough) made a server that traces out what files are called on a shutdown.
import aiohttp
from aiohttp import web
import logging
import sys
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def trace_calls(frame, event, arg):
if event == "call":
filename = frame.f_code.co_filename
lineno = frame.f_lineno
func_name = frame.f_code.co_name
logger.debug(f"Call to {func_name} on line {lineno} of {filename}")
return trace_calls
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
logger.debug('WebSocket connection opened')
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
logger.debug('Message received: %s', msg.data)
if msg.data == 'close':
await ws.close()
logger.debug('Close command received. Closing connection.')
else:
await ws.send_str("Message received: " + msg.data)
logger.debug('Sent message back to client: %s', msg.data)
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.error('WebSocket connection closed with exception %s', ws.exception())
logger.debug('WebSocket connection closed')
return ws
def main():
# Set the trace function
sys.settrace(trace_calls)
app = web.Application()
app.add_routes([web.get('/v2/notifications/ws', websocket_handler)])
web.run_app(app, port=8000)
if __name__ == '__main__':
main()
Here is the modified client.
import asyncio
import base64
import json
import websockets
import logging
logger = logging.getLogger('websockets')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
log = logging.getLogger(__name__)
# GNS3 WebSocket server violates RFC 6455 so we have to be active closer
# Lets give Websockets a chance to get data.
WS_CLOSE_TIMEOUT = 5
# RFC compliant timeout value per 6455.
RECONNECT_TIMEOUT = 1.618
CONTROLLER_WS_API = '/v2/notifications/ws'
COMPUTE_WS = '/v2/compute/notifications/ws'
SERVER = '127.0.0.1:8000'
USER = 'XX'
PASS = 'XX'
CREDS = f'{USER}:{PASS}'
ENCODED_CREDS = base64.b64encode(CREDS.encode()).decode()
CONTROLLER_URI = f'ws://{SERVER}{CONTROLLER_WS_API}'
COMPUTE_URI = f'ws://{SERVER}{COMPUTE_WS}'
async def main() -> None:
async with asyncio.TaskGroup() as tasks:
tasks.create_task(websocket_logger(CONTROLLER_URI))
async def websocket_logger(endpoint: str) -> None:
headers = {
'Authorization': f'Basic {ENCODED_CREDS}'
}
try:
async with websockets.connect(endpoint, close_timeout=WS_CLOSE_TIMEOUT, extra_headers=headers) as websocket:
#async with websockets.connect(endpoint, extra_headers=headers, close_timeout=WS_CLOSE_TIMEOUT) as websocket:
#async with websockets.connect(endpoint, close_timeout=WS_CLOSE_TIMEOUT) as websocket:
#reply = await websocket.close()
print("Call close")
await websocket.close()
print("close complete")
except ConnectionRefusedError:
log.info(f'Connection to {endpoint!r} refused.')
await asyncio.sleep(RECONNECT_TIMEOUT)
if __name__ == '__main__':
asyncio.run(main())
output
2024-02-05 13:18:41,726 - DEBUG - = connection is CONNECTING
2024-02-05 13:18:41,726 - DEBUG - > GET /v2/notifications/ws HTTP/1.1
2024-02-05 13:18:41,726 - DEBUG - > Host: 127.0.0.1:8000
2024-02-05 13:18:41,726 - DEBUG - > Upgrade: websocket
2024-02-05 13:18:41,726 - DEBUG - > Connection: Upgrade
2024-02-05 13:18:41,726 - DEBUG - > Sec-WebSocket-Key: 6JqiL6ADHYi3qSXeXtGW0g==
2024-02-05 13:18:41,726 - DEBUG - > Sec-WebSocket-Version: 13
2024-02-05 13:18:41,726 - DEBUG - > Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
2024-02-05 13:18:41,727 - DEBUG - > Authorization: Basic WFg6WFg=
2024-02-05 13:18:41,727 - DEBUG - > User-Agent: Python/3.11 websockets/12.0
2024-02-05 13:18:41,737 - DEBUG - < HTTP/1.1 101 Switching Protocols
2024-02-05 13:18:41,737 - DEBUG - < Upgrade: websocket
2024-02-05 13:18:41,738 - DEBUG - < Connection: upgrade
2024-02-05 13:18:41,738 - DEBUG - < Sec-WebSocket-Accept: K6w+W4PxFSFzpLpd4hvWV2mb4+8=
2024-02-05 13:18:41,738 - DEBUG - < Sec-WebSocket-Extensions: permessage-deflate
2024-02-05 13:18:41,738 - DEBUG - < Date: Mon, 05 Feb 2024 18:18:41 GMT
2024-02-05 13:18:41,738 - DEBUG - < Server: Python/3.11 aiohttp/3.9.3
2024-02-05 13:18:41,738 - DEBUG - = connection is OPEN
Call close
2024-02-05 13:18:41,738 - DEBUG - = connection is CLOSING
2024-02-05 13:18:41,738 - DEBUG - > CLOSE 1000 (OK) [2 bytes]
2024-02-05 13:18:41,741 - DEBUG - < CLOSE 1000 (OK) [2 bytes]
2024-02-05 13:18:41,746 - DEBUG - = connection is CLOSED
close complete
I've verified the client sends the close and the server starts socket shutdown. I'll check out the server debugs and compare to what is happening in GNS3. That should shed some light on whats happening. I'll also try importing the aiohttp from the gns3 install and see if it still works.
@grossmj
I think I've narrowed down what is going on, but i'm not getting what the fix is. I believe something with the context manager of the queue is preventing ws from closing correctly.
gns3server/handlers/api/controller/notification_handler.py ( showing so the debugs at the end hopefully make sense.)
In the while true loop on the first pass I can add ws.close anywhere and the client will get shutdown correctly.
On the 2nd pass we now fall into the break statement and close no longer works correctly. Queue doesn't seem to know about ws so i'm thinking its a clean up from the "with controller.notification.controller_queue() as queue:" that is doing something to the client that is breaking the ability to close.
Any idea? Here is all the debugs I have for kicking around this issue.
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import aiohttp
from aiohttp.web import WebSocketResponse
from gns3server.web.route import Route
from gns3server.controller import Controller
import logging
log = logging.getLogger(__name__)
async def process_websocket(ws):
"""
Process ping / pong and close message
"""
try:
await ws.receive()
except aiohttp.WSServerHandshakeError:
pass
class NotificationHandler:
@Route.get(
r"/notifications",
description="Receive notifications about the controller",
status_codes={
200: "End of stream"
})
async def notification(request, response):
controller = Controller.instance()
response.content_type = "application/json"
response.set_status(200)
response.enable_chunked_encoding()
await response.prepare(request)
with controller.notification.controller_queue() as queue:
while True:
msg = await queue.get_json(5)
await response.write(("{}\n".format(msg)).encode("utf-8"))
@Route.get(
r"/notifications/ws",
description="Receive notifications about controller from a Websocket",
status_codes={
200: "End of stream"
})
async def notification_ws(request, response):
controller = Controller.instance()
ws = aiohttp.web.WebSocketResponse()
await ws.prepare(request)
#request.app['websockets'].add(ws)
asyncio.ensure_future(process_websocket(ws))
log.info("New client has connected to controller WebSocket")
try:
print("while true")
with controller.notification.controller_queue() as queue:
while True:
print("await get_json")
notification = await queue.get_json(5)
print("if closed")
if ws.closed:
print(f"Excpt: {ws.exception()}")
print("break")
break
print(f"notification: {notification}")
await ws.send_str(notification)
#await ws.close()
finally:
log.info("Client has disconnected from controller WebSocket")
print("if not ws.closed")
if not ws.closed:
print("ws.close")
await ws.close()
#request.app['websockets'].discard(ws)
print("return ws")
return ws
ws-client.py
import asyncio
import base64
import json
import websockets
import logging
logger = logging.getLogger('websockets')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
log = logging.getLogger(__name__)
# GNS3 WebSocket server violates RFC 6455 so we have to be active closer
# Lets give Websockets a chance to get data.
WS_CLOSE_TIMEOUT = 10
# RFC compliant timeout value per 6455.
RECONNECT_TIMEOUT = 1.618
CONTROLLER_WS_API = '/notifications/ws'
COMPUTE_WS = '/notifications/ws'
SERVER = '127.0.0.1:8080'
USER = 'XXX'
PASS = 'XXX'
CREDS = f'{USER}:{PASS}'
ENCODED_CREDS = base64.b64encode(CREDS.encode()).decode()
CONTROLLER_URI = f'ws://{SERVER}{CONTROLLER_WS_API}'
COMPUTE_URI = f'ws://{SERVER}{COMPUTE_WS}'
async def main() -> None:
async with asyncio.TaskGroup() as tasks:
tasks.create_task(websocket_logger(CONTROLLER_URI))
async def websocket_logger(endpoint: str) -> None:
headers = {
'Authorization': f'Basic {ENCODED_CREDS}'
}
try:
async with websockets.connect(endpoint, close_timeout=WS_CLOSE_TIMEOUT, extra_headers=headers) as websocket:
#async with websockets.connect(endpoint, extra_headers=headers, close_timeout=WS_CLOSE_TIMEOUT) as websocket:
#async with websockets.connect(endpoint, close_timeout=WS_CLOSE_TIMEOUT) as websocket:
#reply = await websocket.close()
print("Call close")
await websocket.close()
print("close complete")
except ConnectionRefusedError:
log.info(f'Connection to {endpoint!r} refused.')
await asyncio.sleep(RECONNECT_TIMEOUT)
if __name__ == '__main__':
asyncio.run(main())
and main.py (i'm using this for the replication).
import os
import sys
import aiohttp.web
import logging
# Assuming you're running this as admin2 and your home directory is /home/admin2
home_dir = os.path.expanduser('~') # This will correctly resolve to /home/admin2
gns3server_path = os.path.join(home_dir, '.local/lib/python3.11/site-packages/gns3server/')
controller_path = os.path.join(home_dir, '.local/lib/python3.11/site-packages/gns3server/handlers/api/controller/')
sys.path.append(gns3server_path)
sys.path.append(controller_path)
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
def trace_calls(frame, event, arg):
if event == "call":
filename = frame.f_code.co_filename
lineno = frame.f_lineno
func_name = frame.f_code.co_name
logger.debug(f"Call to {func_name} on line {lineno} of {filename}")
return trace_calls
from notification_handler import NotificationHandler
async def init_app():
app = aiohttp.web.Application()
app.router.add_get('/notifications', NotificationHandler.notification)
app.router.add_get('/notifications/ws', NotificationHandler.notification_ws)
return app
def main():
#sys.settrace(trace_calls)
app = init_app()
aiohttp.web.run_app(app, port=8080)
if __name__ == '__main__':
main()
When i run the client I get this output on the server.
INFO:notification_handler:New client has connected to controller WebSocket
while true
await get_json
if closed
notification: {"action": "ping", "event": {"cpu_usage_percent": 0.2, "memory_usage_percent": 41.2}}
await get_json
<This gap represents a 5 second pause of output>
Timed out returning ping
if closed
Excpt: None
break
INFO:notification_handler:Client has disconnected from controller WebSocket
if not ws.closed
return ws
INFO:gns3server.web.response:GET /notifications/ws
DEBUG:gns3server.web.response:{'Host': '127.0.0.1:8080', 'Upgrade': 'websocket', 'Connection': 'Upgrade', 'Sec-WebSocket-Key': 'XX==', 'Sec-WebSocket-Version': '13', 'Sec-WebSocket-Extensions': 'permessage-deflate; client_max_window_bits', 'Authorization': 'Basic XXX==', 'User-Agent': 'Python/3.11 websockets/12.0'}
DEBUG:gns3server.web.response:{}
INFO:gns3server.web.response:Response: 200 OK
DEBUG:gns3server.web.response:{'Connection': 'close', 'X-Route': '/notifications/ws', 'Server': 'Python/3.11 GNS3/2.2.45'}
INFO:aiohttp.access:127.0.0.1 [14/Feb/2024:06:41:28 +0000] "GET /notifications/ws HTTP/1.1" 200 442 "-" "Python/3.11 websockets/12.0"
gns3server/notification_queue.py
async def get(self, timeout):
"""
When timeout is expire we send a ping notification with server information
"""
# At first get we return a ping so the client immediately receives data
if self._first:
self._first = False
return ("ping", self._getPing(), {})
#pdb.set_trace()
try:
(action, msg, kwargs) = await asyncio.wait_for(super().get(), timeout)
print(f"acction: {action} msg: {msg} kwargs: {kwargs}")
except asyncio.TimeoutError:
print("Timed out returning ping")
return ("ping", self._getPing(), {})
except Exception as e:
print(f"Failed: {e}")
print("Last return")
return (action, msg, kwargs)
I don't know how to do this just yet, but my goal is to asyncio.shield() ws's exit function inside the with statement to see if that can prove its something to do with running inside a context manager that is causing this.
As a side note it seems a little strange the "with" statement from gns3server/handlers/api/controller/notification_handler.py doesn't use a "async with" as gns3server/notification_queue.py inherits asyncio.queue. Maybe nothing as this is the most I've ever done with a context manager.
This is a band-aid for the issue. Everything I see says the server has shutdown the websocket except the tcp socket. This will force the server to close the websocket's tcp socket, which it should have done already anyway.
@andrei-korshikov Can you give it a try and see?
My latest theory is its really the ws.receive() that is the blame in aiohttp. I tried a aiohttp patch but it didn't help. That being said it was very late when I tried it. Maybe you want to give it a spin as well if you don't like the PR I've submitted?
BTW it also seems strange that we're using a none async context manager to manage queue() but async calls to fetch json data.
@grossmj
After a little more digging it finally dawned on me the issue has been the receive call the entire time. The patch I listed above seems like it was intended to fix this type of issue, but I noticed receive() set closing = True on a close message. Then the close() method returns too early when closing is true which prevents the patch from closing the socket.
Seem more details here.
Looks like i'll be opening a new issue for aiohttp per request.
@andrei-korshikov Would be helpful if you test this. The aiohttp folks are looking for feedback.
I will test it, of course, but in a week or so, I'm very sorry for delay. Thank you for your investigations!
aiohttp team is asking for an update. I think if you want this fix in the next version of aiohttp you may want to test asap. I believe the goal is for this to be included in 3.9.4.
I have no access to my home laptop for indefinite duration. Anyway, if I'm the only one who cares about the issue, there is no sense to fix it.
ok no problem. I'll pass that along. Thats for the reply.
Version 2.2.44. Issue is the same as https://github.com/python-trio/trio-websocket/issues/115
Here is what RFC 6455 Section 7.1.1 "Close the WebSocket Connection" says:
Observable behavior is contrary: active closer is the client and so it holds TIME_WAIT state.
That is our case. Can easily be viewed with Wireshark and
ss
.timed out waiting for TCP close
andhalf-closing TCP connection
. By default,close_timeout
in WebSockets is 10 seconds, so such delay on application closure is very visible.I'm not familiar with aiohttp.web, so I can't say if it is upstream issue, or it can be tweaked with some parameters.