aisstream / issues

7 stars 3 forks source link

The websocket times out after a while without a reason, and the time duration during which it was active is not consistent. #35

Closed Zia- closed 1 year ago

Zia- commented 1 year ago

Hello,

I'm a geospatial developer at CGG (UK), and lately, we have been experimenting a lot to integrate the AIS Stream into one of our pipelines. However, so far, I have not found it consistent and would like to get some clarification on it.

When I run the following script on two different machines, using two different API keys and two different internet connections, it times out after a while. The interval after which it times out is not consistent. Is there a way to avoid this time out (kindly note that in the script below I'm not storing or dumping the messages anywhere and therefore I/O operation shouldn't be the cause for it)? We want to make sure we get all messages and there is no connection lost issue. Note: I have also tried the example code from https://[aisstream.io/documentation](https://aisstream.io/documentation) and got the same behaviour.

Thanks a lot!

import _thread
import time
import os
import sys
import websocket
import json

api_key = "API Key"
sleep = 30 * 60

def on_message(ws, message):
    time_now = time.time()
    print(time_now)

def on_error(ws, error):
    print("ERROR")

def on_close(w1, w2, w3):
    print('CLOSED CONNECTION')

def on_open(ws):
    def run(*args):
        subscribe_message = {"APIKey": api_key, "BoundingBoxes": [[[-180, -90], [180, 90]]]}
        ws.send(json.dumps(subscribe_message))
        time.sleep(sleep)
        ws.close()
    _thread.start_new_thread(run, ())

if __name__ == "__main__":
    ws = websocket.WebSocketApp("wss://stream.aisstream.io/v0/stream",
                                on_message = on_message,
                                on_error = on_error,
                                on_close = on_close)
    try:
        ws.on_open = on_open
        ws.run_forever()
    except KeyboardInterrupt:
        ws.close()
        print('Closed connection')
aisstream commented 1 year ago

Could you provide the github account name which owns the api keys. We will look on our end on why the connection is being closed.

Zia- commented 1 year ago

The github account is Zia- and the API key I used was 0aa8017c291bbebc2fbc4523760cf787ff6fa6bb. Your help would be extremely helpful to plan.

aisstream commented 1 year ago

Have you used the service at all over the last few days and experienced this issue?

We only keep longs in hot storage for 3 days in our hot log storage platform for cost purposes and my queries for your usage show nothing. There is a chance we are incorrect in our query but we have checked both on your api-key and user-id and do not see anything.

Looking over the last few weeks, we do have clients who manage to keep connections for multiple days. So I am beginning to believe it may be that we are closing your connection because the write buffer for your connection is full due to it not reading fast enough. This is the typical reason we close connections but without any logs I cannot confirm and it could be an other issues as well.

If you could attempt to replicate again, we can likely get to the bottom of the issue.

Zia- commented 1 year ago

Hi, I just reran the script using 0aa8017c291bbebc2fbc4523760cf787ff6fa6bb api-key generated using Zia- user-id, and it got closed after a while with CLOSED CONNECTION error message. You can check the exact code I ran below. I hope it helps you get to the bottom of the issue. I have a big assumption here that I'm reading the buffer fast enough.

import _thread
import time
import os
import sys
import base64
from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaProducer
from datetime import timedelta
import json, time
import asyncio
import websocket
import json
import ssl
from datetime import datetime, timezone
from time import gmtime, strftime

api_key = "0aa8017c291bbebc2fbc4523760cf787ff6fa6bb"
sleep = 30 * 60
types = ['ShipStaticData', 'StaticDataReport']
dump_size = 1000 

hostname = 'localhost'
port = 9092

messages = []

producer = KafkaProducer(
    bootstrap_servers=hostname+":"+str(port),
    # bootstrap_servers = hostname,
    #  security_protocol="SSL",
    #  ssl_cafile=cert_folder+"/ca.pem",
    #  ssl_certfile=cert_folder+"/service.cert",
    #  ssl_keyfile=cert_folder+"/service.key",
    value_serializer=lambda v: json.dumps(v).encode('ascii'),
    key_serializer=lambda v: json.dumps(v).encode('ascii')
)

def on_message(ws, message):

    global messages
    message = json.loads(message)
    messages.append(message)

    if len(messages) > dump_size:
        result_string = "|".join(json.dumps(d) for d in messages)
        producer.send(
            'test_ais',
            key={"id":time.time()},
            value={'message': result_string}
            )

        producer.flush()
        messages = []

def on_error(ws, error):
    print(error)
    print("ERROR")

def on_close(w1, w2, w3):
    print('CLOSED CONNECTION')

def on_open(ws):
    def run(*args):
        # subscribe_message = {"APIKey": api_key, "BoundingBoxes": [[[-5, 50], [5, 60]]]}
        subscribe_message = {"APIKey": api_key, "BoundingBoxes": [[[-180, -90], [180, 90]]]}
        ws.send(json.dumps(subscribe_message))
        time.sleep(sleep)
        ws.close()
    _thread.start_new_thread(run, ())

if __name__ == "__main__":
    #websocket.enableTrace(True)
    ws = websocket.WebSocketApp("wss://stream.aisstream.io/v0/stream",
                                on_message = on_message,
                                on_error = on_error,
                                on_close = on_close)
    try:
        ws.on_open = on_open
        t0 = time.time()
        ws.run_forever()
        errored = ws.has_errored
    except KeyboardInterrupt:
        ws.close()
        print('Closed connection')
aisstream commented 1 year ago

Looking at the logs it appears that the connection is not being closed by either our load balancer or your client. It is not being closed by our websocket service. The hypothesis that you were not reading messages fast enough was incorrect.

Are you confident the client is not closing the connection for some reason or another ? We have other clients that have been connected for multiple days consistency without our load balancer dropping the connection, so I am hesitant to think that it may be due to some misconfiguration.

Some python websocket clients drop the connection if they do not receive a ping-pong response. This is a possibility.

aisstream commented 1 year ago

A 2nd note the inconsistency in time out period is strange. Do you have a stack trace for when the websocket disconnects.

Zia- commented 1 year ago

Kindly excuse me for this delay.

At the minute, I'm experimenting websockets and websocket-client to handle this behaviour. The reason, which I didn't mention earlier to make it simple, is that I'm stuck behind the company's proxy and the websocket-client supports proxy args, unlike websockets. Overall, websockets is more consistent and doesn't drop at all when operated from outside the company's network. All different combinations pointing towards proxy/firewall set by the IT department.

The idea is to make websockets work by bypassing the proxy. The inconsistent connection timeout was observed when using websocket-client and therefore I'm more inclined towards not using it.

At the minute, I'm trying to add the ais stream URL wss://stream.aisstream.io/v0/stream as an exception. One question the IT is asking me is, "do you have any idea how long the buffer is in the service if the connection gets dropped for any reason?". You did mention that you maintain some level of buffer at your end. What are the specifications?

Thanks a lot for supporting us.

aisstream commented 1 year ago

Could you confirm if you are saying you now believe the root cause of the issue is the fire/proxy of the it department? I believe this is what you are saying but I am not 100% confident.

Zia- commented 1 year ago

Even we are not sure until they add an exception for the AIS stream and give it another test. Meanwhile, if you can let us know how much buffer you maintain at your end in case of a connection drop that would be helpful for IT?

aisstream commented 1 year ago

There is buffering if a connection is slow to read websocket messages but there is no buffering in case a message is dropped. That would be a good feature to implement at some point as it would give the client more confidence they are not missing messages.

A slight topic diversion but what is your use case that you do not want to miss any messages? The desire to have a 100% message received rate is common with many users. In practice this is rarely necessary for the vast majority of use cases. Are you aiming to receive 100% of all messages or are you simply aiming to minimize the time you are not receiving data?

Zia- commented 1 year ago

Thanks for the clarification.

At this point, as we are planning to start grabbing the data, we are simply aiming to minimize any avoidable reasons to miss a subset of it. Definitely, down the line, we will be doing some data cleaning and simplification (i.e. dropping unnecessary data points). However, with confidence in having all data available via the API stream, it would be easy to decide how to handle/manipulate it.

aisstream commented 1 year ago

Hello, due to inactivity I will be closing this issue for now. Feel free to re-open under this or a separate issue if the problem persists.

Zia- commented 1 year ago

Hello there,

Kindly excuse me for not closing it. All questions were answered, and your assistance is highly appreciated.