elsampsa / valkka-core

Valkka - Create video surveillance, management and analysis programs with PyQt
GNU Lesser General Public License v3.0
181 stars 35 forks source link

Random crashes when using the RTSP server #24

Open pslarionov opened 3 years ago

pslarionov commented 3 years ago

Hello.

Im making two apps. First one is where somebody can add cameras by typing its IP and retransmit it to N ports (lets say 10 for example). And second one is for connecting to Server and watching these retransmitted streams.

Server (app that opens rtsp stream and retransmits it to ports) works fine and I can connect to any retransmitted stream using ffplay or Client (app that connects to the server and opens retransmitted streams) on local machine.

Filtergraph for every camera looks like:

(LiveThread:livethread) -----------> {ForkFrameFilter: fork_filterN}----+
                                                                        |
                                                                        +---> (LiveThread:livethread)      retransmitting to 1st port
                                                                        |
                                                                        +---> (LiveThread:livethread)      retransmitting to 2nd port
                                                                        |
                                                                        ...
                                                                        |
                                                                        +---> (LiveThread:livethread)      retransmitting to Nth port

The problem: sometimes if I use Client or try to watch using ffplay or vlc on other machine Server starts to spam messages like this:

FrameFifo: livethread writeCopy: OVERFLOW! No more frames in stack. Frame=<BasicFrame: timestamp=1611578970515 subsession_index=0 slot=1 / payload size=10804 / H264: slice_type=1>

and then crashes.

Is there any way to solve this?

Thank you in advance.

elsampsa commented 3 years ago

Some questions before looking deeper into this:

Could you please provide a minimal code example demonstrating this crash - this way I don't have to try to guess what's going on. It also saves me time when debugging..

pslarionov commented 3 years ago

My target is to avoid restriction on the number of connection to rtsp stream so I need to retransmit one camera to several ports (or substreams. Its not critical). One input livethread to one output livethread is not an option because I need to watch stream from at least 2 computers. Thats why I have to fork incoming streams into N livethreads for output.

Code example: code_example.txt

Just try to connect using other machine's vlc player to 60000 port then 60001 and etc. On some random port server will start to spam messages and then crash after some time.

If use of one input livethread and one output livethread is critical I can try to fork into N slotFrameFilters and connect them to output livethread. But I think it's something weird.

elsampsa commented 3 years ago

Here you have two files: (1) rtsp_fw.py and (2) connect_some.bash:

(1) Does what you wanted to do, but in the proper way (2) Helps you to test it

Before trying them, let's discuss the possible origins of the problem:

OVERFLOW! No more frames in stack means that you are getting incoming (or outgoing) data at such a fast pace that there is not enough memory in the cache. Please remember that in libValkka all cache memory (frames) are pre-reserved. This way we avoid constant memory (de)allocations. However, this places some design burden on the API user (you must know what you're doing. See also here).

..said that, getting "no more frames in stack" should not crash the program and that makes me worried.

Which version of libValkka are you using? In the main repo there is a new version where I reverted to an older version of Live555 since the latest version was giving lots of warning when valgrinding the code, so maybe the problem is there.. the latest version will be pushed into the PPA pre-built repo once I have time to do that.

But said all that, please try the code and the bash script & report me back systematically, under which conditions the code crashes (does it report "segmentation fault" ?):

Here we go:

rtsp_fw.py:

from valkka.core import *
import time

class OneToMany:
    """Implements the following filterchain:

    Stream slot number is indicated with S=N

    (LiveThread) ---> {ForkFrameFilter: fork_n} ---+                                             + ---> (LiveThread)
                  S=1                              |                                     S=1     |
                                                   +-----> {SetSlotFrameFilter: slot_1} ------>--+
                                                   |                                     S=2     |
                                                   +-----> {SetSlotFrameFilter: slot_2} ------>--+
                                                   |                                             |                                           
                                                  ...                                           ...

    Both LiveThreads are given as an input to this class
    """

    def __init__(self, live_in_thread, live_out_thread, address, n_replicate, slot=1):
        """Input:

        - Two LiveThreads that have been started
        - RTSP address
        - How many times the stream is replicated
        - Stream slot number
        """
        self.closed = False
        self.ports = []                            
        self.address = address
        self.slot = slot                   

        self.live_in_thread = live_in_thread
        self.live_out_thread = live_out_thread

        # a fork: use it to feed the outgoing rtsp connection and additionally you can hook into
        # the stream for some other purposes (for visualizing / analyzing)
        self.fork_framefilter =ForkFrameFilterN("fork_"+str(slot))
        # connect to outgoing rtsp connection:

        # define incoming rtsp connection
        self.in_ctx = LiveConnectionContext(LiveConnectionType_rtsp, self.address, self.slot, self.fork_framefilter)
        self.in_ctx.msreconnect = 20000

        # define outgoing rtsp connections
        self.slot_ffs = []
        self.out_ctxs = []
        for i in range(1, n_replicate+1):
            slot_frame_filter = SetSlotFrameFilter("slot_"+str(i),self.live_out_thread.getFrameFilter())
            slot_frame_filter.setSlot(i)
            self.fork_framefilter.connect(
                'fork'+str(i),
                slot_frame_filter
                )

            self.slot_ffs.append(slot_frame_filter)
            out_ctx = LiveOutboundContext(LiveConnectionType_rtsp, str(i), i, 5000)
            self.out_ctxs.append(out_ctx)
            # register outgoing rtsp connection
            self.live_out_thread.registerOutboundCall(out_ctx)

        # register incoming rtsp connection
        self.live_in_thread.registerStreamCall(self.in_ctx)
        self.live_in_thread.playStreamCall(self.in_ctx)

    def close(self):
        self.live_in_thread.stopStreamCall(self.in_ctx)
        self.live_out_thread.deregisterStreamCall(self.in_ctx)
        for out_ctx in self.out_ctxs:
            self.live_out_thread.deregisterOutboundCall(out_ctx)
        self.closed = True

    def __del__(self):
        if not self.closed:
            self.close()

def get_ff_ctx():
    ff_ctx = FrameFifoContext()
    ff_ctx.n_basic = 500
    ff_ctx.n_avpkt = 0
    ff_ctx.n_avframe = 0
    ff_ctx.n_yuvpbo = 0
    ff_ctx.n_setup = 100
    ff_ctx.n_signal = 10
    ff_ctx.n_marker = 0
    ff_ctx.flush_when_full=True
    return ff_ctx

def main():
    """
    streams are served at

    ::

        rtsp://localhost:port/N

    where N is a running number
    """
    rtsp_address = "rtsp://root:silopassword@10.0.0.2/axis-media/media.amp"
    setLiveOutPacketBuffermaxSize(1000000)
    port=1024

    in_ff_ctx = get_ff_ctx()
    out_ff_ctx = get_ff_ctx()
    live_in_thread = LiveThread("live_incoming", in_ff_ctx)
    live_out_thread = LiveThread("live_outgoing", out_ff_ctx)

    live_in_thread.startCall()
    live_out_thread.startCall()

    live_out_thread.setRTSPServer(port)

    otm = OneToMany(
        live_in_thread,
        live_out_thread,
        rtsp_address,
        10 # replicate 10 times
    )

    print("\nSERVING FOR TWO MINUTES AT: rtsp://localhost:%i/N" % (port))
    time.sleep(180)

    print("cleaning up")
    otm.close()
    live_in_thread.stopCall()
    live_out_thread.stopCall()
    print("bye")

if __name__ == "__main__":
    main()

connect_some.bash:

#!/bin/bash
nums="1 2 3 4 5 6 7 8 9 10"
for i in $nums
do
    echo $i
    ffplay rtsp://localhost:1024/$i &
done

EDIT

I just realized that there is some maddness involved in all this.. :joy:

you can connect several times to the exactly same rtsp address

Go ahead and try (note that you don't need to change the last number):

ffplay rtsp://localhost:1024/1 &
ffplay rtsp://localhost:1024/1 &
ffplay rtsp://localhost:1024/1

The underlying Live555 library is perfectly capable of replicating & serving multiple streams from the same endpoint.

So in my (and your) example we are doing that forking & replicating for nothing.. :joy: :joy:

pslarionov commented 3 years ago

I made some tests with provided code:

  1. Run the code, try to connect immediately after with

    ffplay rtsp://localhost:1024/1

    Result: works fine but if I try to connect second time on the same endpoint first connection will be dropped. (Second one still works fine)

  2. Run the code, try to connect immediately after with

    ffplay rtsp://localhost:1024/1 &
    ffplay rtsp://localhost:1024/1 &
    ffplay rtsp://localhost:1024/1

    Result: nothing happens, in about 20 seconds console is being spammed with FrameFifo: live_outgoing writeCopy: OVERFLOW! No more frames in stack. Frame=<BasicFrame: timestamp=1611668549830 subsession_index=0 slot=7 / payload size=10981 / H264: slice_type=1>

No crash for 20 minutes.

  1. Run the code, wait ~30seconds, connect like in 2nd case. Result: same as 2nd case

  2. Like 3rd case but wait 5 minutes before connecting. Result: same as 2nd and 3rd case.

I noticed that if console is being spammed cleaning up (after sleep function) will be "stuck" - nothing will happen (I was waiting around a minute). These messages also was indicating slot number 7, but this slot number is stream with /7, correct me if I'm wrong. After some time slot number will change but never will be 1 (I was waiting around 20 minutes).

Crash is a very rare case that did not happen in these tests. But last messages I got when crash happened

BasicTaskScheduler::SingleStep(): select() fails: wrong file descriptor
socket numbers used in the select() call: 18(r) 253(re) 258(r) *** buffer overflow detected ***: terminated

I'm going to reinstall all the libraries by following installing section and make tests again.

Camera is ORIENT IP-940-IH2C, 1920x1080@25fps, its giving only video. I'm not sure about bitrate. I will check it later.

elsampsa commented 3 years ago

Thanks for the report.

I have reproduced the issue when you connect several times into the same end-point (i.e. "/1"). Now that you refreshed my mind about this, it was actually never intended to work that way, i.e. you can't take N number of connections to the same end-point, only one. Btw, even the rtsp cameras limit the number of connections to the same end-point typically to max 4.

Please connect only once to each of the different end-points (say, using that provided bash script), i.e. once to "/1", once to "/2", etc. Any problems there? (seems to work at first sight). I did a test with two clients that ran 30 minutes and had no problems whatsoever.

Maybe this would resolve your problem for the moment until I get into the bottom of this (**).

These messages also was indicating slot number 7, but this slot number is stream with /7, correct me if I'm wrong

If you look at the filtergraph, you see that the stream is copied N times and all the copies end up into the (outgoing) LiveThread. ..however, from there they are not going anywhere (unless the user initiates an rtsp client).

(**) The correct way to do this, would be to activate a new terminal in the ForkFrameFilter when a new rtsp connection is requested, i.e. start copying the stream only once it is requested (and not all the time). For this one would need a callback from the live555 c++ code into python, where you then do that new "plumping" to connect a new ForkFrameFilter terminal into the out LiveThread. Let's see if I get motivated to go into that..

Another bug I found is this:

ffplay -rtsp_transport tcp rtsp://localhost:1024/1

i.e. if you request tcp transport from the libValkka rtsp server, it is not handled properly.

pslarionov commented 3 years ago

Thank you for the quick and detailed answers.

I got inspired by your idea of activating a new terminal in the ForkFrameFilter on demand. I am not sure if I did it (*) in the way it should be, but it will not work.

Another bug I found is this:

ffplay -rtsp_transport tcp rtsp://localhost:1024/1

i.e. if you request tcp transport from the libValkka rtsp server, it is not handled properly.

Well, I did not manage to reproduce this. What should be happened?

Please connect only once to each of the different end-points (say, using that provided bash script), i.e. once to "/1", once to "/2", etc. Any problems there? (seems to work at first sight). I did a test with two clients that ran 30 minutes and had no problems whatsoever.

Yea, it works on local machine. My next step is to test it with two or more machines.

(*) If you run it, there will be messages:

RTSPOutbound:100 : handleFrame : no stream registered for 0

I checked the code twice and no errors (like wrong filtergraph building) is found.

example code:


import time
from valkka.core import *

def get_ff_ctx():
    ff_ctx = FrameFifoContext()
    ff_ctx.n_basic = 500
    ff_ctx.n_avpkt = 0
    ff_ctx.n_avframe = 0
    ff_ctx.n_yuvpbo = 0
    ff_ctx.n_setup = 100
    ff_ctx.n_signal = 10
    ff_ctx.n_marker = 0
    ff_ctx.flush_when_full=True
    return ff_ctx

class Stream:

    """
    (LiveThread) ---> {ForkFrameFilter: fork_n} ---+                                           + ---> (LiveThread)
                                                   |                                     S=1   |
                                                   +-----> {SlotFrameFilter: slot_1} ------>--+
                                                   |                                     S=2   |
                                                   +-----> {SlotFrameFilter: slot_2} ------>--+
                                                   |                                           |                                           
                                                  ...                                         ...
    """
    def __init__(self, address):
        self.slotFrameFilters = []
        self.out_ctxs = []

        in_ff_ctx = get_ff_ctx()
        self.livethread_input = LiveThread('livethread_input', in_ff_ctx)
        self.livethread_input_filter = self.livethread_input.getFrameFilter()
        self.livethread_input.startCall()

        out_ff_ctx = get_ff_ctx()
        self.livethread_output = LiveThread('livethread_output', out_ff_ctx)
        self.livethread_output_filter = self.livethread_output.getFrameFilter()
        self.livethread_output.startCall()

        self.livethread_output.setRTSPServer(60000)

        self.fork_framefilter =ForkFrameFilterN("fork_framefilter")

        self.ctx =LiveConnectionContext(LiveConnectionType_rtsp, address, 1, self.fork_framefilter)
        self.ctx.msreconnect = 10000          

        self.livethread_input.registerStreamCall(self.ctx)        
        self.livethread_input.playStreamCall(self.ctx)

    def AddOutbound(self, slot):

        '''
            stream should be available at rtsp://localhost:60000/slot
        '''

        out_ctx = LiveOutboundContext(LiveConnectionType_rtsp, str(slot), slot, 5000)
        self.out_ctxs.append(out_ctx)

        slot_filter = SlotFrameFilter("SlotFrameFilter_"+str(slot), slot, self.livethread_output_filter)
        self.slotFrameFilters.append(slot_filter)

        self.livethread_output.registerOutboundCall(out_ctx)

        self.fork_framefilter.connect('fork_'+str(slot), slot_filter)

url = 'YourUrlHere'
stream = Stream(url)
time.sleep(1)
stream.AddOutbound(100)
time.sleep(2)