iamkinetic / NEventSocket

A reactive FreeSwitch eventsocket library for .Net/.Net Framework (.net standard 2)
Mozilla Public License 2.0
26 stars 11 forks source link

Fix race condition with the reader task in ObservableSocket.cs #15

Closed pragmatrix closed 2 years ago

pragmatrix commented 2 years ago

Hi,

using NEventSocket under Ubuntu Linux .NET 6 on the same machine together with FreeSWITCH, we encountered unexpected timeouts in relatively low load situations that looked like the following:

System.AggregateException: One or more errors occurred. (Timeout when trying to connect to 172.17.0.1:8021.No Auth Request received within the specified timeout of 00:00:05.)
 ---> NEventSocket.InboundSocketConnectionFailedException: Timeout when trying to connect to 172.17.0.1:8021.No Auth Request received within the specified timeout of 00:00:05.
 ---> System.TimeoutException: No Auth Request received within the specified timeout of 00:00:05.
   at NEventSocket.InboundSocket.Connect(String host, Int32 port, String password, Nullable`1 timeout)

Then we run another client we develop in a loop alongside while the load test was running, which was happily authenticating with over 300 connections / seconds to the mod_event_socket endpoint (port 8021, sub millisecond authentication times to FreeSWITCH), which let us strongly believe that FreeSWITCH and neither the load wasn't the problem here.

Then looking closer at the NEventSocket source code we found that - if the task that reads the TcpClient stream - receives packets faster than the first subscriber is actually subscribed at the Rx Subject, the auth/request packet may get lost.

The actual problem here is that the Defer() operator in Rx is invoked before the subscriber is connected to the subject. So if the TcpClientconnects, and FreeSWITCH responds immediately, and the Task starts running before the subject.Subscribe() has returned, subject.HasSubscribers is false, and the invocation of subject.OnNext() dismisses the packet, causing a subsequent timeout for the first auth/request.

This PR fixes that from the perspective of someone that has no idea how the idiomatic solution in Rx would look like. My solution is to intercept the Subscribe invocation and start the reader task after that (once only that is, of course). That worked for us so far.

@iamkinetic @ajgolledge

iamkinetic commented 2 years ago

We will be taking a look at this pull request within the next two weeks.

@JosBleuet

danbarua commented 1 year ago

👏 great work!