RIPE-NCC / ripe-atlas-cousteau

Python client for RIPE ATLAS API
GNU General Public License v3.0
65 stars 26 forks source link

Cousteau does not receive data after running for some time #43

Closed JonasGroeger closed 7 years ago

JonasGroeger commented 7 years ago

Hey!

Under normal operation, cousteau works fine. After some hours of running however, it does not receive any data any more. It only receives what looks similar to heartbeats or the like:

Faulty Operation

Process 122648 attached
restart_syscall(<... resuming interrupted call ...>) = 1
recvfrom(8, "\201\1", 2, 0, NULL, NULL) = 2
poll([{fd=8, events=POLLIN}], 1, 60000) = 1 ([{fd=8, revents=POLLIN}])
recvfrom(8, "3", 1, 0, NULL, NULL)      = 1
poll([{fd=8, events=POLLIN}], 1, 60000) = 1 ([{fd=8, revents=POLLIN}])
recvfrom(8, "\201\1", 2, 0, NULL, NULL) = 2
poll([{fd=8, events=POLLIN}], 1, 60000) = 1 ([{fd=8, revents=POLLIN}])
recvfrom(8, "3", 1, 0, NULL, NULL)      = 1
poll([{fd=8, events=POLLIN}], 1, 60000      

I get similar system calls when its under normal operation but with data inbetween:

Normal Operation

Process 85326 attached
restart_syscall(<... resuming interrupted call ...>) = 0
poll([{fd=9, events=POLLIN}], 1, 1000)  = 1 ([{fd=9, revents=POLLIN}])
recvfrom(9, "\201~", 2, 0, NULL, NULL)  = 2
poll([{fd=9, events=POLLIN}], 1, 1000)  = 1 ([{fd=9, revents=POLLIN}])
recvfrom(9, "\1\273", 2, 0, NULL, NULL) = 2
poll([{fd=9, events=POLLIN}], 1, 1000)  = 1 ([{fd=9, revents=POLLIN}])
recvfrom(9, "42[\"atlas_result\",[{\"af\":4,\"prb_"..., 443, 0, NULL, NULL) = 443

Restarting the script resolves the issue immediately.

The code (nothing fancy) goes something like this:

measurement = config['measurement']

atlas_stream = cousteau.AtlasStream()
atlas_stream.connect()

write_probe_measurement = functools.partial(write_measurement_to_kafka, kafka_producer)
atlas_stream.bind_channel("atlas_result", write_probe_measurement)
atlas_stream.start_stream(
    stream_type="result",
    msm=measurement,
)

try:
    atlas_stream.timeout()
except KeyboardInterrupt:
    atlas_stream.disconnect()

It clearly is not the Atlas' problem, since restarting fixes the issue. If the Atlas were broken, restarting would not help.

astrikos commented 7 years ago

Hi @JonasGroeger, i have seen before weird behaviours but I was never able to reproduce them systematically and debug it. cousteau is using a 3rd party lib socketIO-client for socketio communication, so I am not sure what could go wrong. If you have though some steps for me to reproduce it I will be happy to debug it for you.

JonasGroeger commented 7 years ago

@astrikos I'll try to provide a minimal example:

#!/usr/bin/env python3
import json

from ripe.atlas import cousteau

def print_it(result):
    print(json.dumps(result))

def main():
    atlas_stream = cousteau.AtlasStream()
    atlas_stream.connect()

    atlas_stream.bind_channel("atlas_result", print_it)
    atlas_stream.start_stream(
        stream_type="result",
        msm=4412911,
    )

    try:
        atlas_stream.timeout()
    except KeyboardInterrupt:
        atlas_stream.disconnect()

if __name__ == '__main__':
    main()

Unfortunately it can take up to 3 days until the faulty behaviour appears. Meanwhile, if you could think of a workaround for this, I'm happy to help.

JonasGroeger commented 7 years ago

@astrikos Is there at least a way how to detect this? Like a callback for all data received where I can see how much data was sent to me in a period of time. If some limit is reached, I'll restart the script myself.

astrikos commented 7 years ago

@JonasGroeger I have to recheck lib's code to see if there is a way. In the meantime what you can do is enabling more verbose logging. Try the following in the beginning of the script:

import logging
logging.getLogger('socketIO-client').setLevel(logging.DEBUG)
logging.basicConfig()

Let me know if you see any debug output that would be helpful

JonasGroeger commented 7 years ago

I'm currently using another workaround: I kill the script whenever I don't get updates in a certain time period.

class StallCounter(object):
    def __init__(self, timeframe, min_count):
        """
        Counter that counts if at least min_count increments were received in the last timeframe.
        :param timeframe:
        :param min_count:
        """
        self.counter = 0
        self.counter_start = int(time.time())
        self.timeframe = timeframe
        self.min_count = min_count

    def increment(self, count=1):
        self.counter += count

    def _reset_window(self):
        self.counter_start = int(time.time())

    def is_stalling(self):
        now = int(time.time())

        time_eleapsed = now - self.counter_start
        if time_eleapsed < self.timeframe:
            # Still waiting
            return False

        # Reset window
        self._reset_window()

        if self.counter >= self.min_count:
            # All OK, we received enough
            self.counter = 0
            return False

        return True

    def reset(self):
        self.counter = 0
        self.counter_start = int(time.time())

stall_counter = StallCounter(timeframe=180, min_count=1)

# Gets called on EVERY message
atlas_stream.bind_channel("message", exit_if_stream_is_stalling)

def exit_if_stream_is_stalling(data):
    from socketIO_client import parse_socketIO_packet_data
    data = parse_socketIO_packet_data(data)

    # Kill ourselves if we are stalled
    if stall_counter.is_stalling():
        print("Exit due to stalling: {} packages in the last {} seconds.".format(stall_counter.counter,
                                                                                      stall_counter.timeframe))
        sys.exit(1)

    try:
        if data.args[0] in ['atlas_result']:
            stall_counter.reset()
        else:
            stall_counter.increment()

    except (KeyError, TypeError):
        stall_counter.increment()

Yes, we're dropping the last message. I'm relying on systemd to restart myself.

astrikos commented 7 years ago

can you maybe give me your stream parameters so i can try it myself and leave it running with verbose logging? maybe i find something useful

astrikos commented 7 years ago

Hi @JonasGroeger, I think we were able to debug it yesterday. It seems that when you disconnect (network hickups, etc) the 3rd party library will try to reconnect but it will not subscribe again to the stream it was subscribed before disconnections. So it's no surprise you don't get anything back. Hopefully from what I see in docs of library there is a way to handle that by doing something like:

class Namespace(BaseNamespace):

    def on_connect(self):
        subscribe_to_my_atlas_stream()
        print('[Connected]')

    def on_disconnect(self):
        print('[DisConnected]')

    def on_reconnect(self):
        print('[ReConnected]')
        re_subscribe_to_my_atlas_stream()

io = SocketIO(
    host="atlas-stream.ripe.net",
    port=80,
    resource="/stream/socket.io",
    transports=["websocket"],
    Namespace=Namespace
)
io.wait()

I will try to integrate that logic today at cousteau and then I hope you can give it a try see if it fixes problem for you.

JonasGroeger commented 7 years ago

Pull request #44 solves this issue.