Could you please suggest how you would implement a timeout when connecting to a webscoket endpoint?
Unfortunately, I cannot find anything related to the docs.
import socket
import json
import uuid
from wsproto import WSConnection
from wsproto.connection import ConnectionType
from wsproto.events import (
AcceptConnection,
RejectConnection,
CloseConnection,
Message,
Request,
TextMessage,
)
API_IPADDR = "127.0.0.1"
API_PORT = 5048
def net_send(out_data, sock):
sock.send(out_data)
def net_recv(ws_conn, sock):
in_data = sock.recv(8 * 1024)
if not in_data:
ws_conn.receive_data(None)
else:
ws_conn.receive_data(in_data)
def handle_ws_event(ws_conn, ws_event, sock):
if isinstance(ws_event, AcceptConnection):
result = True
message = None
elif isinstance(ws_event, RejectConnection):
result = False
message = {"error": ws_event.status_code, "reason": "rejecting connection"}
elif isinstance(ws_event, CloseConnection):
result = False
message = {"error": ws_event.code, "reason": ws_event.reason}
sock.send(ws_conn.send(ws_event.response()))
elif isinstance(ws_event, TextMessage):
result = True
message = ws_event.data
else:
result = False
message = {"error": -1, "reason": str(ws_event)}
return (result, message)
def main():
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((API_IPADDR, API_PORT))
ws_conn = WSConnection(ConnectionType.CLIENT)
net_send(
ws_conn.send(
Request(host=API_IPADDR, target="/ws/{}".format(str(uuid.uuid4())))
),
client,
)
net_recv(ws_conn, client)
for ws_event in ws_conn.events():
(result, message) = handle_ws_event(ws_conn, ws_event, client)
if not result:
print(json.dumps(message)) # note: error
client.close()
# return
Hi,
Could you please suggest how you would implement a timeout when connecting to a webscoket endpoint? Unfortunately, I cannot find anything related to the docs.