datarhei / gosrt

Implementation of the SRT protocol in pure Go
https://datarhei.com
MIT License
98 stars 15 forks source link

Fix race condition when disposing late packets #36

Closed aler9 closed 7 months ago

aler9 commented 8 months ago

in liveSend, packets are gathered and sent periodically with deliver():

https://github.com/datarhei/gosrt/blob/e3ca3449f90e2c8e46d50fac493bd752490bb18e/internal/congestion/live.go#L174-L179

Shortly after deliver(), liveSend may dispose packet content through Packet.Decommission():

https://github.com/datarhei/gosrt/blob/e3ca3449f90e2c8e46d50fac493bd752490bb18e/internal/congestion/live.go#L205-L215

This causes a race condition, since deliver() (that is Listener.Send()) is asynchronous, and packets may not have been marshaled and sent yet:

https://github.com/datarhei/gosrt/blob/e3ca3449f90e2c8e46d50fac493bd752490bb18e/listen.go#L492-L499

This patch fixes the issue by making Listener.Send() synchronous.

This may be less efficient (MAYBE, because a queue has a performance cost and this patch removes a queue) but doesn't impact the stability of the server: i've checked all calls to this method, and they are all in dedicated routines, one for handling handshakes and one for each client, thus reading and sending are still in two separate routines, and a delay in a specific client routine won't impact the reading routine or other clients.

aler9 commented 8 months ago

data race:

==================
WARNING: DATA RACE
Read at 0x00c0001d7090 by goroutine 4588:
  github.com/datarhei/gosrt/internal/packet.(*pkt).Marshal()
      /go/pkg/mod/github.com/datarhei/gosrt@v0.5.4/internal/packet/packet.go:429 +0x99
  github.com/datarhei/gosrt.(*listener).writer()
      /go/pkg/mod/github.com/datarhei/gosrt@v0.5.4/listen.go:517 +0x2e1
  github.com/datarhei/gosrt.Listen.func5()
      /go/pkg/mod/github.com/datarhei/gosrt@v0.5.4/listen.go:229 +0x4f

Previous write at 0x00c0001d7090 by goroutine 4615:
  github.com/datarhei/gosrt/internal/packet.(*pkt).Decommission()
      /go/pkg/mod/github.com/datarhei/gosrt@v0.5.4/internal/packet/packet.go:343 +0x97
  github.com/datarhei/gosrt/internal/congestion.(*liveSend).Tick()
      /go/pkg/mod/github.com/datarhei/gosrt@v0.5.4/internal/congestion/live.go:215 +0x1204
  github.com/datarhei/gosrt.(*srtConn).ticker()
      /go/pkg/mod/github.com/datarhei/gosrt@v0.5.4/connection.go:356 +0x252
  github.com/datarhei/gosrt.newSRTConn.func6()
      /go/pkg/mod/github.com/datarhei/gosrt@v0.5.4/connection.go:286 +0x33

Goroutine 4588 (running) created at:
  github.com/datarhei/gosrt.Listen()
      /go/pkg/mod/github.com/datarhei/gosrt@v0.5.4/listen.go:229 +0xd26
  github.com/bluenviron/mediamtx/internal/core.newSRTServer()
      /s/internal/core/srt_server.go:106 +0x213
  github.com/bluenviron/mediamtx/internal/core.(*Core).createResources()
      /s/internal/core/core.go:501 +0x3f84
  github.com/bluenviron/mediamtx/internal/core.New()
      /s/internal/core/core.go:144 +0x796
  github.com/bluenviron/mediamtx/internal/core.newInstance()
      /s/internal/core/core_test.go:92 +0x12d
  github.com/bluenviron/mediamtx/internal/core.TestSRTServer.func1()
      /s/internal/core/srt_server_test.go:31 +0x117
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1595 +0x238
  testing.(*T).Run.func1()
      /usr/local/go/src/testing/testing.go:1648 +0x44

Goroutine 4615 (running) created at:
  github.com/datarhei/gosrt.newSRTConn()
      /go/pkg/mod/github.com/datarhei/gosrt@v0.5.4/connection.go:286 +0x1673
  github.com/datarhei/gosrt.(*listener).Accept()
      /go/pkg/mod/github.com/datarhei/gosrt@v0.5.4/listen.go:320 +0xcd7
  github.com/bluenviron/mediamtx/internal/core.(*srtListener).runInner()
      /s/internal/core/srt_listener.go:43 +0x1c1
  github.com/bluenviron/mediamtx/internal/core.(*srtListener).run()
      /s/internal/core/srt_listener.go:35 +0xfc
  github.com/bluenviron/mediamtx/internal/core.newSRTListener.func1()
      /s/internal/core/srt_listener.go:27 +0x33
==================
ioppermann commented 8 months ago

Alternatively, the Tick function in liveSend could provide a clone to s.deliver

diff --git a/internal/congestion/live.go b/internal/congestion/live.go
index 6012fac..f6d1987 100644
--- a/internal/congestion/live.go
+++ b/internal/congestion/live.go
@@ -173,7 +173,7 @@ func (s *liveSend) Tick(now uint64) {

                        s.rate.bytesSent += pktLen

-                       s.deliver(p)
+                       s.deliver(p.Clone())
                        removeList = append(removeList, e)
                } else {
                        break

Then listener.writer() can decommission all packets (not only control packets)

diff --git a/listen.go b/listen.go
index c2ad21f..61aaf13 100644
--- a/listen.go
+++ b/listen.go
@@ -527,10 +527,7 @@ func (ln *listener) writer(ctx context.Context) {
                        // Write the packet's contents to the wire
                        ln.pc.WriteTo(buffer, p.Header().Addr)

-                       if p.Header().IsControlPacket {
-                               // Control packets can be decommissioned because they will not be sent again (data packets might be retransferred)
-                               p.Decommission()
-                       }
+                       p.Decommission()
                }
        }
 }
aler9 commented 8 months ago

The fix implemented in place of this patch is extremely inefficient, as it duplicates each packet. The sndQueue was left untouched evidently in order to preserve performance but the resulting effect is the opposite.

ioppermann commented 8 months ago

This is true, there is a performance impact on cloning the packet. The memory for the data of a packet is taken from a sync.Pool in order to avoid allocations a much as possible. There's still the copy of at most 1500 bytes.

I kept the sndQueue untouched for the sake of decoupling, not for performance. As you mentioned, a channel has an overhead as well.

I will do benchmarks, in order to find out how much the performance impact is, also compared to your solution.

aler9 commented 8 months ago

From my experience, I can say that using different routines for reading and writing from/to sockets definitely improves performance, especially in case of UDP, but using multiple routines for the same client is not worth the effort in terms of performance, because queues have a performance cost that is greater than the benefits of processing in parallel data from the SAME client, unless there's some computationally-intensive operation somewhere (and in case of a protocol, there isn't any).

When i say performance, I mean maximizing throughput in the presence of a high quantity of clients. My ideal architecture is:

UDP reader routine --> client specific routine --> UDP write function

And nothing more.

Furthermore, using non-blocking queues with networking has another disadvantage, that is the risk of unnecessary packet drops in case of congestions.

In case of congestions, conn.WriteTo() may block for a short period of time. If there's a non-blocking queue before it, this queue starts dropping packets. On the contrary, if there is not a queue, the writer routine can send out everything as soon as there's enough bandwidth. And packet drops can be controlled through SetWriteDeadline().

Finally, in case of SRT there's already a routine that collects packets and sends them out periodically, since this is dictated by the specification, and correctly implemented in this library. Another one is really redundant.

ioppermann commented 8 months ago

OK, I see your point. Thanks for clarifying.

I guess, same applies to the (*dialer).send/write combo as well?

aler9 commented 8 months ago

I guess, same applies to the (*dialer).send/write combo as well?

Since *dialer handles a single connection only, using queues there has the same disadvantages, plus another one: there is no possibility of detecting send errors, since the send() function doesn't and can't return errors:


// send adds a packet to the send queue
func (dl *dialer) send(p packet.Packet) {
    // non-blocking
    select {
    case dl.sndQueue <- p:
    default:
        dl.log("dial", func() string { return "send queue is full" })
    }
}
ioppermann commented 7 months ago

I removed the sndQueue in both dial.go and listen.go according to your pull request. Thanks a lot for pointing this out.

aler9 commented 7 months ago

Thanks for improving the performance aspect too! in my opinion it's very important to have a performant, pure Golang implementation of the SRT procotocol which is peer reviewed and available with a permissive license. It's critical now and will become much critical in the following years given the great number of SRT-compatible cameras that will be released into the market.