jeffbass / imagezmq

A set of Python classes that transport OpenCV images from one computer to another using PyZMQ messaging.
MIT License
1.01k stars 160 forks source link

PUB/SUB subscriber stops working after a while #74

Open johnhh2 opened 2 years ago

johnhh2 commented 2 years ago

As the title says, I'm having issues with the subscriber no longer receiving images after a while It randomly stops working after 6k-30k+ frames, and implementing the pub/sub 'dedicated IO' strategy did not fix the issue. If I restart the subscriber but not the publisher it will continue receiving frames but only for about ~100 to 1k frames before it stops receiving again (stops much faster than if I were to restart both the publisher and subscriber)

Additional info:

Subscriber code:

import time
import os
from datetime import datetime

import threading
import json
import zlib
import select
import cv2
import imagezmq
import redis

import settings

class VideoStreamSubscriber:

    def __init__(self, port, *args, **kwargs):
        self.port = port

        # Update camera hub in a new thread
        self._stop = False
        self._data_ready = threading.Event()
        self._thread = threading.Thread(target=self._run, args=())
        self._thread.daemon = True
        self._thread.start()

    def add_clients(self, receiver):
        for publisher_ip in settings.PUBLISHER_IPS:
            receiver.connect(f'tcp://{publisher_ip}:{self.port}')

    def receive(self, timeout=15.0):
        flag = self._data_ready.wait(timeout=timeout)
        if not flag:
            raise TimeoutError("Subscriber timed out when receiving frame")

        self._data_ready.clear()
        return self._data

    def _run(self):
        receiver = imagezmq.ImageHub('tcp://0.0.0.0:{}'.format(self.port), REQ_REP=False)
        self.add_clients(receiver)
        print("Camera Hub initialized", flush=True)

        while not self._stop:
            self._data = receiver.recv_jpg() # Permanently blocks on this line after a random amount of frames
            self._data_ready.set()

    def close(self):
        self._stop = True

class CameraHub:
    cameras = {}

    def __init__(self, port, *args, **kwargs):
        self.subscriber = VideoStreamSubscriber(port)
        self._thread = threading.Thread(target=self.camera_hub_thread, args=())
        self._thread.daemon = True
        self._thread.start()

    def camera_hub_thread(self):
        i = 0
        start_t = time.time()
        while True:
            try:
                message, frame = self.subscriber.receive()
            except TimeoutError:
                continue

            # Decode message
            sep = message.find(':')
            camera_id = message[:sep]
            frame_number = int(message[sep+2:])

            now = datetime.now()
            timestamp = now.strftime(settings.DATABASE_DATE_FORMAT)

            # Add default camera data
            if not self.cameras.get(camera_id):
                camera = self.add_camera(camera_id, timestamp)
            elif frame_number > self.cameras[camera_id]['count']:
                camera = self.cameras[camera_id]
                camera['last_active'] = timestamp
                camera['count'] = frame_number
            else:
                continue # Duplicate frame

            # Locally Store image
            camera['frame'] = frame

            # Print subscriber FPS
            if settings.DEBUG:
                i += 1
                if i % 60 == 0:
                    total_t = time.time() - start_t
                    print("Est. Subscriber FPS: {:.2f}".format(i/total_t), flush=True)
                    i = 0
                    start_t = time.time()

    def add_camera(self, camera_id, timestamp):
        self.cameras[camera_id] = {
            'last_active': timestamp,
            'last_save': None,
            'count': -1,
        }

        print("Camera {} added".format(camera_id), flush=True)
        return self.cameras[camera_id]

if __name__ == "__main__":
    port = settings.CAMERA_HUB_PORT
    app = CameraHub(port=port)
    app.thread.join()
jeffbass commented 2 years ago

Using threading in your CameraHub class may be part of the problem. Quoting from the Fast Pub Sub Subscriber Helper Class docs:

Please note that this class is not thread-safe, as there is only a single event for new data.

You may want to try running your CameraHub loop unthreaded in the main program and see if that changes the behavior.

I will leave this issue open to see if others, including @philipp-schmidt (who wrote the Fast Pub Sub Subscriber Helper Class), have any additional comments.

johnhh2 commented 2 years ago

I was also having the problem before adding the threaded event and, since I'm only using the one thread to receive with, I believe the threaded event should be safe but I will look into it

jeffbass commented 2 years ago

What caused me to suggest it is that your CameraHub thread __init__ is starting a VideoStreamSubscriber thread, which itself starts a thread. I have had problems with Python threads starting other Python threads when there are ZMQ calls are in the innermost thread.

johnhh2 commented 2 years ago

Before I implemented the VideoStreamSubscriber class, the ZMQ calls were in CameraHub. With that setup, I had only created 1 thread in which the image receiving and processing were done, but I still had the blocking issue described here. Is it a problem to use ZMQ inside any thread besides the main thread? If ZMQ is supposed to work inside a thread then perhaps a separate issue is causing ZMQ to block. In case it matters, I'm using docker to host a container in which an HTTP server and the CameraHub run in (HTTP server uses main thread and CameraHub uses its own thread). Could using ZMQ inside of docker potentially cause any issues?

jeffbass commented 2 years ago

@johnhh2, I apologize for being slow to respond. I will try to respond more quickly going forward.

I have found intermittent and sporadic blocking with imageZMQ in my own projects. I think it may be memory leaks in ZMQ, PyZMQ, Python threading, camera reading or some combination of these. While it seems to be thread related, I haven't successfully tracked these errors down yet. They may even be caused by very brief power or wifi glitches. ZMQ is a very reliable and heavily used library with great bug tracking. imageZMQ is a thin wrapper around PyZMQ. I haven't found anything specific or repeatable enough to report to ZMQ bug tracking. Like you, I use a thread to read the camera and I use another thread to send the images to an imagehub using imageZMQ. As I have been tracking down blocking and other send failures, they seem to occur more often (and are tougher to debug) when multiple threads are involved. But I don't have any repeatable examples that show threads to be the culprit.

Some projects are using docker and imageZMQ without problems. One user / contributor to my own projects uses docker and has posted his own image hub-librarian here. It is a great solution and may provide you with some ideas.

It is important to put every imageZMQ send in a try / except block. It is a recommended ZMQ best practice in the ZMQ docs (Quoting: In most of the C examples we’ve seen so far there’s been no error handling. Real code should do error handling on every single ZeroMQ call. ). See ZMQ docs for Handling Errors and ETERM.

I have found that using timers to test for "stalls in sending or receiving" are helpful. There is an example program in this repo that shows a try / except block using ZMQ timeout options.

I use imageZMQ in my yin-yang-ranch farm management project. My imagenode camera capture and sender uses imageZMQ with a separate optional "stall watcher" thread for watching for "stalls" caused by blocking on sends. I also have code that watches for stalls in my imagehub program. An overview of my workflow design for my Yin Yang Ranch system is here, in case that might be helpful to you.

I have been slow to push my ongoing experiments and improvements to my GitHub repositories. No excuses for that...other than I am retired and have had a lot of life complications lately (as have we all!)

Keep me posted on your progress and any solutions you come up with. My project and those of others will benefit from your learnings.

Thanks, Jeff