async def send_ping():
while True:
await websocket.ping()
await asyncio.sleep(60) # Send ping every minute
async def receive_messages():
async for message_json in websocket:
message = json.loads(message_json)
message_type = message.get("MessageType") # Retrieve MessageType from the top level of the message
if message_type == "PositionReport":
ais_message = message['Message']['PositionReport']
ship_id = ais_message['UserID']
ship_name = message['MetaData'].get('ShipName', 'Unknown') # Get ship name from MetaData
heading = ais_message.get('TrueHeading', None)
speed = ais_message.get('Sog', None) # Speed Over Ground
destination = None # Destination information is not provided in this message
latitude = ais_message.get('Latitude', None)
longitude = ais_message.get('Longitude', None)
# Update the GeoDataFrame
if latitude is not None and longitude is not None:
point = Point(longitude, latitude)
new_row = {'Ship_ID': ship_id, 'Ship_Name': ship_name, 'Heading': heading,
'Speed': speed, 'Destination': destination, 'geometry': point}
ships = ships.append(new_row, ignore_index=True)
# You may want to limit the size of ships GeoDataFrame to avoid memory issues
# For example, you can keep only the last N rows:
# ships = ships.iloc[-MAX_ROWS:]
await asyncio.gather(receive_messages(), send_ping())
async def update_map(map, ships):
while True:
map.clear_layers()
ship_layer = GeoData(geo_dataframe=ships, name='Ships')
map.add_layer(ship_layer)
await asyncio.sleep(60) # Update every minute
if name == "main":
task_connect = asyncio.create_task(connect_ais_stream(ships))
task_update = asyncio.create_task(update_map(map, ships))
display(map)
await task_connect
await task_update
ERROR:
IncompleteReadError Traceback (most recent call last)
File ~/miniconda3/envs/geo/lib/python3.11/site-packages/websockets/legacy/protocol.py:963, in WebSocketCommonProtocol.transfer_data(self)
962 while True:
--> 963 message = await self.read_message()
965 # Exit the loop when receiving a close frame.
File ~/miniconda3/envs/geo/lib/python3.11/site-packages/websockets/legacy/protocol.py:1033, in WebSocketCommonProtocol.read_message(self)
1025 """
1026 Read a single message from the connection.
1027
(...)
10311032 """
-> 1033 frame = await self.read_data_frame(max_size=self.max_size)
1035 # A close frame was received.
File ~/miniconda3/envs/geo/lib/python3.11/site-packages/websockets/legacy/protocol.py:1108, in WebSocketCommonProtocol.read_data_frame(self, max_size)
1107 while True:
-> 1108 frame = await self.read_frame(max_size)
1110 # 5.5. Control Frames
File ~/miniconda3/envs/geo/lib/python3.11/site-packages/websockets/legacy/protocol.py:1165, in WebSocketCommonProtocol.read_frame(self, max_size)
1161 """
1162 Read a single frame from the connection.
...
--> 934 raise self.connection_closed_exc()
935 else:
936 return
ConnectionClosedError: no close frame received or sent
I'm getting IncompleteReadError which leads to ConnectionClosedError (no close frames received or sent).
Here's the code followed by the error. I'm running this in a Jupyter Notebook FWIW:
CODE:
async def connect_ais_stream(ships): async with websockets.connect("wss://stream.aisstream.io/v0/stream") as websocket: subscribe_message = { "APIKey": "ABCDEFG", "BoundingBoxes": [[-123.398438, 47.419725], [-121.379700, 48.267655]], "FilterMessageTypes": ["PositionReport"] } subscribe_message_json = json.dumps(subscribe_message) await websocket.send(subscribe_message_json)
async def update_map(map, ships): while True: map.clear_layers() ship_layer = GeoData(geo_dataframe=ships, name='Ships') map.add_layer(ship_layer) await asyncio.sleep(60) # Update every minute
Create an empty GeoDataFrame
ships = gpd.GeoDataFrame(columns=['Ship_ID', 'Ship_Name', 'Heading', 'Speed', 'Destination', 'geometry'])
center = (47.9120, -122.5260) map = Map(center=center, zoom=12) marker1 = Marker(location=center, draggable=True) map.add_control(marker1)
if name == "main": task_connect = asyncio.create_task(connect_ais_stream(ships)) task_update = asyncio.create_task(update_map(map, ships)) display(map) await task_connect await task_update
ERROR:
IncompleteReadError Traceback (most recent call last) File ~/miniconda3/envs/geo/lib/python3.11/site-packages/websockets/legacy/protocol.py:963, in WebSocketCommonProtocol.transfer_data(self) 962 while True: --> 963 message = await self.read_message() 965 # Exit the loop when receiving a close frame.
File ~/miniconda3/envs/geo/lib/python3.11/site-packages/websockets/legacy/protocol.py:1033, in WebSocketCommonProtocol.read_message(self) 1025 """ 1026 Read a single message from the connection. 1027 (...) 1031 1032 """ -> 1033 frame = await self.read_data_frame(max_size=self.max_size) 1035 # A close frame was received.
File ~/miniconda3/envs/geo/lib/python3.11/site-packages/websockets/legacy/protocol.py:1108, in WebSocketCommonProtocol.read_data_frame(self, max_size) 1107 while True: -> 1108 frame = await self.read_frame(max_size) 1110 # 5.5. Control Frames
File ~/miniconda3/envs/geo/lib/python3.11/site-packages/websockets/legacy/protocol.py:1165, in WebSocketCommonProtocol.read_frame(self, max_size) 1161 """ 1162 Read a single frame from the connection. ... --> 934 raise self.connection_closed_exc() 935 else: 936 return
ConnectionClosedError: no close frame received or sent