deepch / RTSPtoWeb

RTSP Stream to WebBrowser
MIT License
1.17k stars 284 forks source link

Memory leak in case of high load (caused by unused HLS Muxer) #415

Open tillschaefer opened 6 months ago

tillschaefer commented 6 months ago

Running RTSPtoWeb with many camera streams (above 25 here) causes the memory to rise until OOM occurs.

I have diagnosed the problem and found the trigger of the memory leak in the function func (client *RTSPClient) startStream() of vdk. If the OutgoingProxyQueue of the RTSPClient is full, the the succeeding return causes the stream to be restarted since SignalStreamRTPStop is emitted. However, It seems that not all resources are freed correctly.

To confirm the analysis, I have replaced the return in the case of a full channel with a continue. In this case no leak occurs and the memory consumption of the server is stable.

Relevant part of the code (vdk, format/rtspv2/client.go):

if client.options.OutgoingProxy {
  if len(client.OutgoingProxyQueue) < 2000 {
    client.OutgoingProxyQueue <- &content
  } else {
    client.Println("RTSP Client OutgoingProxy Chanel Full")
    return    
  }
}
pkt, got := client.RTPDemuxer(&content)
tillschaefer commented 6 months ago

Digging deeper with pprof the majority (about 90%) of the memory is allocated in vdks format/rtspv2/client.go:

top

(pprof) top
Showing nodes accounting for 1366.27MB, 96.89% of 1410.06MB total
Dropped 126 nodes (cum <= 7.05MB)
Showing top 10 nodes out of 19
      flat  flat%   sum%        cum   cum%
 1255.25MB 89.02% 89.02%  1262.08MB 89.51%  github.com/deepch/vdk/format/rtspv2.(*RTSPClient).RTPDemuxer
   80.62MB  5.72% 94.74%    80.62MB  5.72%  github.com/pion/interceptor/pkg/nack.(*ResponderInterceptorFactory).NewInterceptor.func2
   22.41MB  1.59% 96.33%    22.41MB  1.59%  main.(*MuxerHLS).UpdateIndexM3u8
       7MB   0.5% 96.82%    90.12MB  6.39%  github.com/pion/interceptor/pkg/nack.(*packetManager).NewPacket
    0.50MB 0.036% 96.86%  1262.58MB 89.54%  github.com/deepch/vdk/format/rtspv2.(*RTSPClient).startStream
    0.50MB 0.035% 96.89%    28.97MB  2.05%  main.StreamServerRunStreamDo
         0     0% 96.89%    90.12MB  6.39%  github.com/deepch/vdk/format/webrtcv3.(*Muxer).WritePacket
         0     0% 96.89%    90.12MB  6.39%  github.com/pion/interceptor.RTPWriterFunc.Write
         0     0% 96.89%    90.12MB  6.39%  github.com/pion/interceptor/pkg/nack.(*ResponderInterceptor).BindLocalStream.func1
         0     0% 96.89%    90.12MB  6.39%  github.com/pion/interceptor/pkg/report.(*SenderInterceptor).BindLocalStream.func1

listing:

(pprof) list RTPDemuxer
Total: 1.38GB
ROUTINE ======================== github.com/deepch/vdk/format/rtspv2.(*RTSPClient).RTPDemuxer in /home/tschaefer/go/pkg/mod/bitbucket.dt.log.psi.de/!t!p!f/vdk@v0.0.2/format/rtspv2/client.go
    1.23GB     1.23GB (flat, cum) 89.51% of Total
         .          .    658:                   } else if client.videoCodec == av.H264 {
         .          .    659:                           naluType := nal[0] & 0x1f
         .          .    660:                           switch {
         .          .    661:                           case naluType >= 1 && naluType <= 5:
         .          .    662:                                   retmap = append(retmap, &av.Packet{
  512.69kB   512.69kB    663:                                           Data:            append(binSize(len(nal)), nal...),
         .          .    664:                                           CompositionTime: time.Duration(1) * time.Millisecond,
         .          .    665:                                           Idx:             client.videoIDX,
         .          .    666:                                           IsKeyFrame:      naluType == 5,
         .          .    667:                                           Duration:        time.Duration(float32(timestamp-client.PreVideoTS)/90) * time.Millisecond,
         .          .    668:                                           Time:            time.Duration(timestamp/90) * time.Millisecond,
         .          .    669:                                   })
         .          .    670:                           case naluType == 7:
         .          .    671:                                   client.CodecUpdateSPS(nal)
         .          .    672:                           case naluType == 8:
         .          .    673:                                   client.CodecUpdatePPS(nal)
         .          .    674:                           case naluType == 24:
         .          .    675:                                   packet := nal[1:]
         .          .    676:                                   for len(packet) >= 2 {
         .          .    677:                                           size := int(packet[0])<<8 | int(packet[1])
         .          .    678:                                           if size+2 > len(packet) {
         .          .    679:                                                   break
         .          .    680:                                           }
         .          .    681:                                           naluTypefs := packet[2] & 0x1f
         .          .    682:                                           switch {
         .          .    683:                                           case naluTypefs >= 1 && naluTypefs <= 5:
         .          .    684:                                                   retmap = append(retmap, &av.Packet{
         .          .    685:                                                           Data:            append(binSize(len(packet[2:size+2])), packet[2:size+2]...),
         .          .    686:                                                           CompositionTime: time.Duration(1) * time.Millisecond,
         .          .    687:                                                           Idx:             client.videoIDX,
         .          .    688:                                                           IsKeyFrame:      naluType == 5,
         .          .    689:                                                           Duration:        time.Duration(float32(timestamp-client.PreVideoTS)/90) * time.Millisecond,
         .          .    690:                                                           Time:            time.Duration(timestamp/90) * time.Millisecond,
         .          .    691:                                                   })
         .          .    692:                                           case naluTypefs == 7:
         .          .    693:                                                   client.CodecUpdateSPS(packet[2 : size+2])
         .          .    694:                                           case naluTypefs == 8:
         .          .    695:                                                   client.CodecUpdatePPS(packet[2 : size+2])
         .          .    696:                                           }
         .          .    697:                                           packet = packet[size+2:]
         .          .    698:                                   }
         .          .    699:                           case naluType == 28:
         .          .    700:                                   fuIndicator := content[offset]
         .          .    701:                                   fuHeader := content[offset+1]
         .          .    702:                                   isStart := fuHeader&0x80 != 0
         .          .    703:                                   isEnd := fuHeader&0x40 != 0
         .          .    704:                                   if isStart {
         .          .    705:                                           client.fuStarted = true
         .          .    706:                                           client.BufferRtpPacket.Truncate(0)
         .          .    707:                                           client.BufferRtpPacket.Reset()
         .          .    708:                                           client.BufferRtpPacket.Write([]byte{fuIndicator&0xe0 | fuHeader&0x1f})
         .          .    709:                                   }
         .          .    710:                                   if client.fuStarted {
         .     6.84MB    711:                                           client.BufferRtpPacket.Write(content[offset+2 : end])
         .          .    712:                                           if isEnd {
         .          .    713:                                                   client.fuStarted = false
         .          .    714:                                                   naluTypef := client.BufferRtpPacket.Bytes()[0] & 0x1f
         .          .    715:                                                   if naluTypef == 7 || naluTypef == 9 {
         .          .    716:                                                           bufered, _ := h264parser.SplitNALUs(append([]byte{0, 0, 0, 1}, client.BufferRtpPacket.Bytes()...))
         .          .    717:                                                           for _, v := range bufered {
         .          .    718:                                                                   naluTypefs := v[0] & 0x1f
         .          .    719:                                                                   switch {
         .          .    720:                                                                   case naluTypefs == 5:
         .          .    721:                                                                           client.BufferRtpPacket.Reset()
         .          .    722:                                                                           client.BufferRtpPacket.Write(v)
         .          .    723:                                                                           naluTypef = 5
         .          .    724:                                                                   case naluTypefs == 7:
         .          .    725:                                                                           client.CodecUpdateSPS(v)
         .          .    726:                                                                   case naluTypefs == 8:
         .          .    727:                                                                           client.CodecUpdatePPS(v)
         .          .    728:                                                                   }
         .          .    729:                                                           }
         .          .    730:                                                   }
       2MB        2MB    731:                                                   retmap = append(retmap, &av.Packet{
    1.22GB     1.22GB    732:                                                           Data:            append(binSize(client.BufferRtpPacket.Len()), client.BufferRtpPacket.Bytes()...),
         .          .    733:                                                           CompositionTime: time.Duration(1) * time.Millisecond,
         .          .    734:                                                           Duration:        time.Duration(float32(timestamp-client.PreVideoTS)/90) * time.Millisecond,
         .          .    735:                                                           Idx:             client.videoIDX,
         .          .    736:                                                           IsKeyFrame:      naluTypef == 5,
         .          .    737:                                                           Time:            time.Duration(timestamp/90) * time.Millisecond,
deepch commented 6 months ago

share you streams and config Im fix it

deepch commented 6 months ago

check lock loop add continue after https://github.com/deepch/RTSPtoWeb/blob/c15b3b04ee9fc2e4cd43aec4031a5296cfb73cf7/streamCore.go#L140C3-L140C53

deepch commented 6 months ago

please get report if usage 8G+

tillschaefer commented 6 months ago

check lock loop add continue after https://github.com/deepch/RTSPtoWeb/blob/c15b3b04ee9fc2e4cd43aec4031a5296cfb73cf7/streamCore.go#L140C3-L140C53

If i skip the packetAv Processing (i.e. discard the package, i got meny ErrorStreamNoVideo logs), the memory stays way below 100 MiB. Does this mean this might be only a buffer bloat caused by some performance bottleneck while sending the packages?

The interesting part is, that I can avoid the issue by starting multiple smaller instances of the the server in parallel (i.e., partition the streams). Thus, we might se some lock contention here. For example, the function StreamChannelCast is locked on the StorageST struct, i.e. we have effectively a global lock at this place.

tillschaefer commented 6 months ago

So i used a profiler to see where all the time is spend. Most of the time is caused by muxing the HLS Stream M3u8. This even happens despite no HLS Stream is actually streamed. Only WebRtc streams are actually connected. As far as I have seen the Streams dont even have an audio channel at all. Thus, it might benefit if we only do the HLS / Audio Muxing when it is actually needed.

(pprof) top20
Showing nodes accounting for 31.30s, 62.26% of 50.27s total
Dropped 515 nodes (cum <= 0.25s)
Showing top 20 nodes out of 162
      flat  flat%   sum%        cum   cum%
     9.51s 18.92% 18.92%      9.51s 18.92%  runtime.memmove
     4.71s  9.37% 28.29%      4.77s  9.49%  runtime.cgocall
     2.56s  5.09% 33.38%      2.93s  5.83%  runtime.findObject
     2.03s  4.04% 37.42%      8.43s 16.77%  runtime.scanobject
     1.50s  2.98% 40.40%      1.50s  2.98%  runtime.stdcall2
     1.14s  2.27% 42.67%      2.27s  4.52%  runtime.greyobject
     1.13s  2.25% 44.92%     14.93s 29.70%  runtime.concatstrings
     0.99s  1.97% 46.89%      0.99s  1.97%  github.com/pion/srtp/v2.xorBytes
     0.95s  1.89% 48.78%     20.06s 39.90%  main.(*MuxerHLS).UpdateIndexM3u8
     0.89s  1.77% 50.55%      0.89s  1.77%  strconv.rightShift
     0.71s  1.41% 51.96%      6.50s 12.93%  runtime.mallocgc
     0.68s  1.35% 53.31%      0.68s  1.35%  runtime.markBits.isMarked (inline)
     0.65s  1.29% 54.61%      0.65s  1.29%  runtime._ExternalCode
     0.64s  1.27% 55.88%      2.30s  4.58%  runtime.(*mheap).allocSpan
     0.64s  1.27% 57.15%      0.64s  1.27%  runtime.heapBits.bits (inline)
     0.55s  1.09% 58.25%      0.55s  1.09%  crypto/sha1.blockAVX2
     0.55s  1.09% 59.34%      0.58s  1.15%  strconv.(*decimal).Assign
     0.50s  0.99% 60.33%      0.50s  0.99%  runtime.stdcall6
     0.49s  0.97% 61.31%      2.20s  4.38%  runtime.sweepone
     0.48s  0.95% 62.26%      0.48s  0.95%  crypto/aes.encryptBlockAsm

image

deepch commented 6 months ago

try comment line //Storage.StreamChannelCast(streamID, channelID, packetAV) and test again

deepch commented 6 months ago

Yes, it can be done better, but for hls you need at least 4 segments of 4 seconds to render the video.

tillschaefer commented 6 months ago

try comment line //Storage.StreamChannelCast(streamID, channelID, packetAV) and test again

I think the issue here is quite the opposite. If I just keep the above line (an the keyTest timer) everything runs smooth, otherwise the HLS Muxer is always run (even without any connected client) and that causes the Buffer / OutgoingPacketQueue channel to blow up. Is there a reason the hls clients are not registered the same way as MSE and WebRtc clients via Storage.ClientAdd? This would allow the execution of the HLS Muxer only in cases the HLS stream is actually requested.

tillschaefer commented 6 months ago

Ok, I have been investigating the problem a bit more and there is indeed a memory leak, which is more likely unter high load (disconnecting Clients):

The gin context in func HTTPAPIServerStreamWebRTC(c *gin.Context) { is only valid until the function returns. However the goroutine is still alive afterwards and the line defer Storage.ClientDelete(c.Param("uuid"), cid, c.Param("channel")) does not reliably delete the clients, because in some cases the streamID and channelID are overwritten with different values.