sipcapture / heplify

Portable and Lightweight HEP Capture Agent for HOMER
https://sipcapture.org
GNU Affero General Public License v3.0
185 stars 65 forks source link

better rtcp correlation #167

Closed xhantu closed 4 years ago

xhantu commented 4 years ago

Hello,

I have run into some issues with the current rtcp correlation:

To address these issues I have

Additionally I added some helper functions for extracting values from RFC2822 like header data, added a lot of comments and more debug output.

negbie commented 4 years ago

Hi @xhantu thank you very much for this pr! I love the additional comments and it's good that you removed the global buffer. Global state in Go is quite a No-Go ;)

One thing I've noticed is that I should have documented that my freecache fork uses 1/10 second expire time. So if you want to expire keys after 3 second you should call it like this:

rtcpCache.Set(rtcpKey, corrID, 30)

Another thing I've noticed is that we should check if we could get some parts of your code more performant by reducing some allocations. Here is a benchmark with the old code:

pkg: github.com/sipcapture/heplify/decoder BenchmarkProcess-4 609254 1858 ns/op 136 B/op 2 allocs/op

With new code: pkg: github.com/sipcapture/heplify/decoder BenchmarkProcess-4 378421 3010 ns/op 384 B/op 14 allocs/op

Looking into the profiles I think we should check the bytes.Join and debug call:

      10ms      150ms (flat, cum)  7.54% of Total
         .          .    248:// E.g. "Call-ID", "Call-Id", "call-id", "i".
         .          .    249:func getHeaderValue(headerNames [][]byte, data []byte) ([]byte, error) {
         .          .    250:   var startPos int = -1
         .          .    251:   var headerName []byte
         .          .    252:   for hederNameIdx := range headerNames {
      10ms       10ms    253:       headerName = headerNames[hederNameIdx]
         .          .    254:       // Check if first header.
         .       20ms    255:       if bytes.HasPrefix(data, headerName) {
         .          .    256:           if len(data) > len(headerName) && data[len(headerName)] == ':' {
         .          .    257:               startPos = 0
         .          .    258:               break
         .          .    259:            }
         .          .    260:       }
         .          .    261:       // Check if other header.
         .       60ms    262:       search := bytes.Join([][]byte{[]byte("\r\n"), headerName, []byte(":")}, []byte(""))
         .       50ms    263:       startPos = bytes.Index(data, search)
         .          .    264:       if startPos >= 0 {
         .          .    265:           // Skip new line
         .          .    266:           startPos += 2
         .          .    267:           break
         .          .    268:       }
         .          .    269:   }
         .          .    270:   if startPos < 0 {
         .          .    271:       return nil, errors.New("no such header")
         .          .    272:   }
         .       10ms    273:   endPos := bytes.Index(data[startPos:], []byte("\r\n"))
         .          .    274:   if endPos < 0 {
         .          .    275:       return nil, errors.New("no such header")
         .          .    276:   }
         .          .    277:   return bytes.TrimSpace(data[startPos + len(headerName) + 1 : startPos + endPos]), nil
         .          .    278:}
(pprof) 
(pprof) 
(pprof) list decoder.addSDPCacheEntry
         0      180ms (flat, cum)  9.05% of Total
         .          .     29:)
         .          .     30:
         .          .     31:// addSDPCacheEntry will add an entry to SDPCache with rtcpIP+rtcpPort as key and callID as value.
         .          .     32:// Key parts will be separated by a single space.
         .          .     33:func addSDPCacheEntry(rtcpIP []byte, rtcpPort []byte, callID []byte) {
         .       10ms     34:   key := bytes.Join([][]byte{rtcpIP, rtcpPort}, []byte(" "))
         .       90ms     35:   logp.Debug("sdp", "Add to sdpCache key=%q, value=%q", key, callID)
         .       80ms     36:   sdpCache.Set(key, callID, sdpCacheTime)
         .          .     37:}
         .          .     38:
         .          .     39:// cacheSDPIPPort will extract the Call-ID and all RTCP ip and port combinations will add them to the sdpCache,
         .          .     40:// with ip+port as key and Call-ID as value.
         .          .     41://
(pprof) 
negbie commented 4 years ago

There is already a getSIPHeaderValInt func. Mby we can move this to util.go? https://github.com/sipcapture/heplify/blob/master/decoder/tcpassembly.go#L146

negbie commented 4 years ago

Some comments like "Found it" or "Return it" can be removed I guess as it's already obvious whats going an at that point.

xhantu commented 4 years ago

One thing I've noticed is that I should have documented that my freecache fork uses 1/10 second expire time. So if you want to expire keys after 3 second you should call it like this:

rtcpCache.Set(rtcpKey, corrID, 30)

That explains the 20minutes and 60hours I thought are configured. Any reason why you switched to 1/10s? I don't think this resolution is needed for this case. Such a change without documentation sure is problematic.

Another thing I've noticed is that we should check if we could get some parts of your code more performant by reducing some allocations. Here is a benchmark with the old code:

Looking into the profiles I think we should check the bytes.Join and debug call:

How did you profile this? If I known than I can profile the code myself.

I have done some small scale testing. Looks like appends are faster than join, even more if the same buffer can be reused for each loop.

I expected the debug line to be much faster, as debugging should be normally disabled. Therefore the call should not do much more than check if debugging is enabled and return immediately because it is not. That the debug call takes longer than the cache call is really strange. Did you enable debugging during your profiling? Are the profiling results repeatable?

negbie commented 4 years ago

That explains the 20minutes and 60hours I thought are configured. Any reason why you switched to 1/10s? I don't think this resolution is needed for this case. Such a change without documentation sure is problematic.

Because I use it for sender side deduplication here https://github.com/sipcapture/heplify/blob/master/decoder/decoder.go#L166

Everything > 0.5 seconds could be a retransmission so I need sub second resolution.

How did you profile this? If I known than I can profile the code myself.

You can use pprof. Just go into heplify/decoder. For simple benchmarks you can run

go test -bench=. -benchmem

Do it multiple times. So that you see it's repeatable. If you want to profile the code you can run this:

go test -bench=. -benchmem -memprofile memprofile.out -cpuprofile profile.out

Now you have a cpu and memory profile. For CPU you can run: go tool pprof profile.out

For memory please run: go tool pprof --alloc_objects memprofile.out

negbie commented 4 years ago

Most debug log calls aren't for free and include call overhead depending on your arguments. This is why you often see in performance critical applications something like this:

if isDebug { log.Debug(....) }

xhantu commented 4 years ago

Most debug log calls aren't for free and include call overhead depending on your arguments. This is why you often see in performance critical applications something like this:

if isDebug { log.Debug(....) }

Yes. But all arguments are passed as is, with no conversions during call. logp.Debug calls logp.debugMessage, again without conversion, wich first checks _log.level >= LOG_DEBUG && IsDebug(selector). With debugging disabled I would expect that two function calls that effectively do nothing are faster.

negbie commented 4 years ago

We can look into debug later as it's only a minor thing.

xhantu commented 4 years ago

Maybe I pushed to early. Just testet benchmarking with appends and with join inside addSDPCacheEntry. Execution time is indistinguishable due to noise, but its one alloc more with append. Bye chance the test code is a case where srcIP != rtcpIP.

negbie commented 4 years ago

If you now the length already you should always preallocate your slice and set the capacity like:

key := make([]byte, 0, len(rtcpIP)+len(rtcpPort)+1)
key = append(key, rtcpIP...)
key = append(key, []byte(" ")...)
key = append(key, rtcpPort...)
xhantu commented 4 years ago

The key for fast appends without allocs seems to use a long enough predefined buffer as starting point:

var buffer [100]byte
var search []byte
...
search = append(append(append(buffer[:0], '\r', '\n'), headerName...), ':')

This brings down execution time and allocs, as the buffer will be created on stack.

negbie commented 4 years ago

Something like I suggested in my previous comment did not helped?

negbie commented 4 years ago

@xhantu before merging this could you please consider to change correlateRTCP? You say "rtcp keys uses only ssrc, which is only unique for a session, not for different calls. E.g. with multiple call-legs or when using some sip tools."

that's true so I would suggest to just delete the keyRTCP (ssrc) from the cache in case of a first match. Otherwise you could loose some RTCP packets. Take a look at the example/pcap/rtp_rtcp_sip_ipv4_udp.pcap

with your current implementation you would miss half of the RTCP packets.

negbie commented 4 years ago

On the other hand I think that pcap isn't really that common so let's merge it and address it later or if users report some issues.

negbie commented 4 years ago

@xhantu thank you again for this contribution!