Closed gizahNL closed 2 years ago
@jeoliva @iSchluff mind taking a look and taking it for a test drive? In all my tests it's now working as intended, though perhaps there is more room for improvement.
Hi, thanks for the implementation.
I tried making some sense of the new benchmarks, I think the parallel ones still need some work. The clients should probably not run in an infinite loop there. I am wondering how many sockets we can even realistically have as libsrt is pretty busy with internal synchronization and spawns multiple threads for each one.
goarch: amd64
pkg: github.com/haivision/srtgo
cpu: AMD Ryzen 9 5900X 12-Core Processor
BenchmarkAcceptNonBlocking-24 2070 5219646 ns/op
BenchmarkAcceptBlocking-24 3664 3788227 ns/op
BenchmarkAcceptNonBlockingParallel-24 100 68788401 ns/op
BenchmarkAcceptBlockingParallel-24 100 64876192 ns/op
PASS
ok github.com/haivision/srtgo 38.355s
When using the pollserver I very often get EAsyncRCV on the receiving side even though I do not have a deadline set. Any ideas?
The pollserver code looks fine to me however I think the polldesc implementation can be improved a bit. If you want I can add some annotations.
Getting EASYNCRCV is likely a bug, it shouldn't return that. I might have an idea as to the cause.
Annotations are definitely welcome. I did change some code today wrt the timer, now I reuse the timer so we don't need to allocate one every time SetDeadline is called
@iSchluff did you manage to test with the last commits?
ah not yet, but will do
So now my listening code works better and doesn't throw errors which is good. However the poll-tests now get stuck on ulimit for me, probably because the sockets are not cleaned up correctly.
with some output:
...
2021/08/04 00:53:04 accepted
2021/08/04 00:53:04 accept
2021/08/04 00:53:04 connect err The socket has been closed
2021/08/04 00:53:04 connect
2021/08/04 00:53:04 connect err General setup error resulting from internal system state
2021/08/04 00:53:04 connect
2021/08/04 00:53:04 connect err General setup error resulting from internal system state
2021/08/04 00:53:04 connect
2021/08/04 00:53:04 connect err General setup error resulting from internal system state
2021/08/04 00:53:04 connect
2021/08/04 00:53:05 timeout
So now my listening code works better and doesn't throw errors which is good. However the poll-tests now get stuck on ulimit for me, probably because the sockets are not cleaned up correctly.
with some output:
... 2021/08/04 00:53:04 accepted 2021/08/04 00:53:04 accept 2021/08/04 00:53:04 connect err The socket has been closed 2021/08/04 00:53:04 connect 2021/08/04 00:53:04 connect err General setup error resulting from internal system state 2021/08/04 00:53:04 connect 2021/08/04 00:53:04 connect err General setup error resulting from internal system state 2021/08/04 00:53:04 connect 2021/08/04 00:53:04 connect err General setup error resulting from internal system state 2021/08/04 00:53:04 connect 2021/08/04 00:53:05 timeout
Hmm, the tests I wrote do call the SrtSocket.Close() function, which in turn calls srt_close(), perhaps the SRT library is not cleaning up the sockets fast enough?
I am not sure, I kind of would expect close to be synchronous.
Edit: Mh apparently the lib has some GC thread that needs to clean up sockets. So this test doesn't really make sense in the current form.
I think the implementation is generally fine, the polldesc stuff could probably use a bit more documentation. Could you maybe explain what the timeout counters are for? I guessed you are using them to ensure a new deadline for each operation. Afaik deadlines in the stdlib don't reset after an operation so we should probably also mimic this behaviour. Greetings, Anton
Have been focused on other projects lately, so didn't get time to cleanup any time sooner.
Could you take another look?
Once we're good I'll squash all the commits, so that it becomes ready for merging.
BTW: the timer doesn't reset; We set the deadline to -1 upon timeout, and then each operation will fail with timed out error (poll.go line 180)
@iSchluff @jeoliva
jup, will check it out again
Currently the enable_empty option that is set on a intermediate epollid that is then discarded. Fix as follows:
index 61c7dd7..b8a2be7 100644
--- a/pollserver.go
+++ b/pollserver.go
@@ -22,12 +22,11 @@ func pollServerCtx() *pollServer {
}
func pollServerCtxInit() {
- eid := C.srt_epoll_create()
- C.srt_epoll_set(eid, C.SRT_EPOLL_ENABLE_EMPTY)
phctx = &pollServer{
srtEpollDescr: C.srt_epoll_create(),
pollDescs: make(map[C.SRTSOCKET]*pollDesc),
}
+ C.srt_epoll_set(phctx.srtEpollDescr, C.SRT_EPOLL_ENABLE_EMPTY)
go phctx.run()
}
I have also noticed that under a lot of read/write load a socket might return EAsyncRCV even after epoll. I think it's probably a problem with the library as I get that without the pollserver too. But might be needed to handle that case by wrapping read/write in a loop.
So an idea would be:
index 059583f..863e66e 100644
--- a/read.go
+++ b/read.go
@@ -43,11 +43,11 @@ func (s SrtSocket) Read(b []byte) (n int, err error) {
}
n, err = srtRecvMsg2Impl(s.socket, b, nil)
- if err != nil {
- if errors.Is(err, error(EAsyncRCV)) && !s.blocking {
- s.pd.wait(ModeRead)
- n, err = srtRecvMsg2Impl(s.socket, b, nil)
+ for {
+ if !errors.Is(err, error(EAsyncRCV)) || s.blocking {
+ return
}
+ s.pd.wait(ModeRead)
+ n, err = srtRecvMsg2Impl(s.socket, b, nil)
}
- return
}
diff --git a/write.go b/write.go
index 8fc34ce..0d2309f 100644
--- a/write.go
+++ b/write.go
@@ -44,11 +44,11 @@ func (s SrtSocket) Write(b []byte) (n int, err error) {
}
n, err = srtSendMsg2Impl(s.socket, b, nil)
- if err != nil {
- if errors.Is(err, error(EAsyncSND)) && !s.blocking {
- s.pd.wait(ModeWrite)
- n, err = srtSendMsg2Impl(s.socket, b, nil)
+ for {
+ if !errors.Is(err, error(EAsyncSND)) || s.blocking {
+ return
}
+ s.pd.wait(ModeWrite)
+ n, err = srtSendMsg2Impl(s.socket, b, nil)
}
- return
}
I also haven't been able to produce any tangible performance difference from the pollserver in relation to blocking-mode. In my testing the simple blocking mode is at least on par, but I also haven't found a good way to test srt performance anyways. I have implemented some read/write code in #44 if you want to test it yourself. Just note that the results are all over the place.
But I would say the pollserver is most likely faster than the current non-blocking implementation, so thumbs up for the merge. Just need to get enable_empty fix in I mentioned above.
Currently the enable_empty option that is set on a intermediate epollid that is then discarded. Fix as follows:
index 61c7dd7..b8a2be7 100644 --- a/pollserver.go +++ b/pollserver.go @@ -22,12 +22,11 @@ func pollServerCtx() *pollServer { } func pollServerCtxInit() { - eid := C.srt_epoll_create() - C.srt_epoll_set(eid, C.SRT_EPOLL_ENABLE_EMPTY) phctx = &pollServer{ srtEpollDescr: C.srt_epoll_create(), pollDescs: make(map[C.SRTSOCKET]*pollDesc), } + C.srt_epoll_set(phctx.srtEpollDescr, C.SRT_EPOLL_ENABLE_EMPTY) go phctx.run() }
Thanks, I figured the same yesterday :/ Fixing soon.
I have also noticed that under a lot of read/write load a socket might return EAsyncRCV even after epoll. I think it's probably a problem with the library as I get that without the pollserver too. But might be needed to handle that case by wrapping read/write in a loop.
So an idea would be:
index 059583f..863e66e 100644 --- a/read.go +++ b/read.go @@ -43,11 +43,11 @@ func (s SrtSocket) Read(b []byte) (n int, err error) { } n, err = srtRecvMsg2Impl(s.socket, b, nil) - if err != nil { - if errors.Is(err, error(EAsyncRCV)) && !s.blocking { - s.pd.wait(ModeRead) - n, err = srtRecvMsg2Impl(s.socket, b, nil) + for { + if !errors.Is(err, error(EAsyncRCV)) || s.blocking { + return } + s.pd.wait(ModeRead) + n, err = srtRecvMsg2Impl(s.socket, b, nil) } - return } diff --git a/write.go b/write.go index 8fc34ce..0d2309f 100644 --- a/write.go +++ b/write.go @@ -44,11 +44,11 @@ func (s SrtSocket) Write(b []byte) (n int, err error) { } n, err = srtSendMsg2Impl(s.socket, b, nil) - if err != nil { - if errors.Is(err, error(EAsyncSND)) && !s.blocking { - s.pd.wait(ModeWrite) - n, err = srtSendMsg2Impl(s.socket, b, nil) + for { + if !errors.Is(err, error(EAsyncSND)) || s.blocking { + return } + s.pd.wait(ModeWrite) + n, err = srtSendMsg2Impl(s.socket, b, nil) } - return }
Seems like a good idea.
I also haven't been able to produce any tangible performance difference from the pollserver in relation to blocking-mode. In my testing the simple blocking mode is at least on par, but I also haven't found a good way to test srt performance anyways. I have implemented some read/write code in #44 if you want to test it yourself. Just note that the results are all over the place.
But I would say the pollserver is most likely faster than the current non-blocking implementation, so thumbs up for the merge. Just need to get enable_empty fix in I mentioned above.
The advantage should be there mostly when there are multiple (many) sockets open, at a minimum it'll reduce the amount of fds created ;) (already saw one of my processes starve due to open file limitations ;) )
@jeoliva did you already look at this?
@gizahNL , @iSchluff , amazing work. Hats off.
This is a pollserver implementation based on the Golang netpoll pollserver, using a pollserver should reduce blocking cgo calls dramatically when there are many sockets active, thus reducing load on the Golang scheduler and increasing performance.
I've marked this still as WIP as I'd like to see it get more testing & feedback from others.
Linking issue #36