webrtc-rs / webrtc

A pure Rust implementation of WebRTC
https://webrtc.rs
Apache License 2.0
4.05k stars 360 forks source link

Udp connection not close (reopen #174) #195

Closed shiqifeng2000 closed 2 years ago

shiqifeng2000 commented 2 years ago

hi

This is an issue reopen from 174, as state before the code below will leave some socket unreleased in the task/thread/process

let pc = peer_connection.clone();
peer_connection
        .on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
            info!("Peer Connection State has changed: {}", s);
            let is_failed = s == RTCPeerConnectionState::Failed;
            let pc1 = pc.clone();
            Box::pin(async move {
                if is_failed {
                    pc1.close().await.ok();
                }
            })
        }))
        .await;

So I try to make it async way, as it did get fixed, for some time

    let (box_sdr, mut box_rcvr) = tokio::sync::mpsc::channel::<VCError>(1);
    peer_connection
        .on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
            info!("Peer Connection State has changed: {}", s);
            let is_failed = s == RTCPeerConnectionState::Failed;
            if is_failed {
                info!("Peer Connection has gone to failed");
            }
            let box_sdr = box_sdr.clone();
            Box::pin(async move {
                if is_failed {
                    box_sdr.send(VCError::biz("Connection failed", 10)).await;
                }
            })
        }))
        .await;

    let peer_connection = peer_connection.clone();
    rt::spawn(async move {
        loop {
            let box_result = box_rcvr.try_recv();
            if box_result.is_ok() {
                tokio::time::sleep(Duration::from_secs(8)).await;
                peer_connection.close().await;
                break;
            } 
            tokio::time::sleep(Duration::from_secs(4)).await;
        }
    });

But when the task went heavy and cpu get big burden, say, 50 rtc jobs delivering large bitrate stream, I found the issue reappear from time to time. Leaving udp sockets unreleased.

I have one audio tranceiver channel, one video tranceiver channel and a data channel initialized from client side, the task get notified(started) after data channel is established. Every event handler of the peer_connection are independent, which means they are not chained. After some log working, I am pretty sure all the data I created are dropped. So once more I need some help here, just like the error before, maybe there's some cons I need to watch out?

Maybe there's other part I neglected? Hope you could help.

k0nserv commented 2 years ago

Hey, thanks for reporting this issue. Can you provide some more details?

I have two theories about this:

  1. There's a reference cycles or something else that causes the socket to be kept open and even after closing the peer connection it never gets freed because the required Drop implementation never runs
  2. The load causes tokio task starvation and the task that is supposed to close the peer connection never runs.

Of the two 1 seems much more likely as a cause. I'm not super familiar with the ICE agent stuff but I'd start debugging there(i.e. ensure that the struct that holds the socket is actually dropped properly).

shiqifeng2000 commented 2 years ago

Hey, thanks for reporting this issue. Can you provide some more details?

  • Does on_peer_connection_state_change get called correctly?
  • Does the line that actually calls close on the peer connection run?
  • Does close return a non-Err response?
  • If you run with memory leak detection are there any leaks detected?

I have two theories about this:

  1. There's a reference cycles or something else that causes the socket to be kept open and even after closing the peer connection it never gets freed because the required Drop implementation never runs
  2. The load causes tokio task starvation and the task that is supposed to close the peer connection never runs.

Of the two 1 seems much more likely as a cause. I'm not super familiar with the ICE agent stuff but I'd start debugging there(i.e. ensure that the struct that holds the socket is actually dropped properly).

Hi, thank you for the theories, I am working on the debugging

Here's the short answer for your question: Does on_peer_connection_state_change get called correctly? Since I am working on the debugging, I am using every means possible to try to adjust code, for now, may not trigger when timeout to establish connection(10 secs), and I still found socket unreleased.

Does the line that actually calls close on the peer connection run? yes, for that, I am sure, here's my "not so pretty" code list below, which aim to start all the streamer after all candidates are collected and data channel is established. When timeout happens, all the streaming work below will not trigger and function shall return. So at this point, I need to close the peer connection( is coded below).

rt::spawn(async move {
        let timeout = tokio::time::sleep(Duration::from_secs(10));
        tokio::pin!(timeout);
        tokio::select! {
            _ = timeout.as_mut() =>{
                return error_log!(target:&token.to_string(),"Timeout awaiting candidate gathering finish, now quitting");
            }
            _ = notify.notified() => {
                info!(target: &token.to_string(),"Got all the candidate");
            }
        };
            let timeout = tokio::time::sleep(Duration::from_secs(10));
            tokio::pin!(timeout);
            tokio::select! {
                _ = timeout.as_mut() =>{
                    match peer_connection.connection_state(){
                        RTCPeerConnectionState::Failed =>{
                            info!(target: &token.to_string(),"Forcing peer connection close when timeout fetching datachannel, current state {}", peer_connection.connection_state());
                            info!(target: &token.to_string(),"Forcing peer connection close status {:#?}", peer_connection.close().await);
                            info!(target: &token.to_string(),"Forcing peer connection close when timeout fetching datachannel, current state {}", peer_connection.connection_state());
                            return error_log!(target:&token.to_string(),"Timeout awaiting start command for start up green thread");
                        },
                        _=> continue
                    };
                }
                _ = dc_rcv.recv() => {
                    info!(target: &token.to_string(),"Got the data channel");
                }
            };

[Rolling] 2022-05-27T17:29:50.543278+08:00 - INFO - 195989740 - Forcing peer connection close when timeout fetching datachannel, current state failed
[Rolling] 2022-05-27T17:29:50.545733+08:00 - INFO - 195989740 - Forcing peer connection close status Ok(
    (),
)
[Rolling] 2022-05-27T17:29:50.545772+08:00 - INFO - 195989740 - Forcing peer connection close when timeout fetching datachannel, current state closed

Does close return a non-Err response? as log shows above, it returns ok;

If you run with memory leak detection are there any leaks detected? Not yet, but I did moniter memory for every feature added, I will try to run the test these days. Althrough I use ffmpeg a tool, I am pretty sure all the unsafe instance are freed.

Btw, let me know if I could share my code with your so that we could inspect.

k0nserv commented 2 years ago

I did some digging. Here's what I think happens:

Ultimately the socket is held within CandidateBase which implements Candidate. Instances of Candidate are stored within an Arc e.g. in local_candidates in AgentInternal and probably elsewhere too. Thus, for the socket to be freed correctly CandidateBase must be freed, chances are that with a lot of Arcs this isn't happening because something is holding on to it somehwere.

To investigate you can add an OnDrop to CandidateBase that logs. We expect that you should see this log message when the peer connection is closed

shiqifeng2000 commented 2 years ago

leak detection

hi kOnserv, thanks again for this hint. I am digging now, here's some progress I found. Firstly, as you just print above, the CandidateBase did not drop. But AgentInternal did get dropped. So I guess it should not be the agent issue. And the log seems prove my guessing, when agent_internal invoke close, which lead to delete_all_candidates, all returns normal with no errors, even for the unreleased ports.

And according to my log, it seems there's CandidatePair dropped event prior to every normally dropped socket

Closing candidate udp6 host :::49300
Candidate udp6 host :::49300 closed
Closing candidate udp4 host 127.0.0.1:49509
Candidate udp4 host 127.0.0.1:49509 closed
Closing candidate udp4 srflx 127.0.0.1:49910
Candidate udp4 srflx 127.0.0.1:49910 closed
Dropping udp6 host :::49300
token 986931319, is_stop false
Closing candidate udp4 host 127.0.0.1:49501
[AVIOContext @ 0x10b61c640] Statistics: 4216101 bytes read, 2 seeks
[libopus @ 0x10c1c9a00] Ignoring attempt to flush encoder that doesn't support it
Candidate udp4 host 127.0.0.1:49501 closed
Closing candidate udp4 srflx 127.0.0.1:49625
Candidate udp4 srflx 127.0.0.1:49625 closed
Closing candidate udp6 host :::49990
Candidate udp6 host :::49990 closed
Dropping udp6 host :::49990
Dropping internal agent Some(3236548177)
Dropping internal agent Some(1376896282)
Dropping internal agent Some(573185719)
Dropping candidate pair 127.0.0.1:49792
Dropping udp4 host 127.0.0.1:49792
Dropping candidate pair 127.0.0.1:50061
Dropping udp4 prflx 192.168.0.100:56764
Dropping udp4 srflx 127.0.0.1:50061
Dropping internal agent Some(3474514891)
Dropping internal agent Some(961005883)
Dropping candidate pair 127.0.0.1:50078
Dropping udp4 host 127.0.0.1:50078
Dropping candidate pair 127.0.0.1:49883
Dropping udp4 prflx 192.168.0.100:57244
Dropping udp4 srflx 127.0.0.1:49883
Dropping candidate pair 127.0.0.1:49509
Dropping udp4 host 127.0.0.1:49509
Dropping candidate pair 127.0.0.1:49910
Dropping udp4 prflx 192.168.0.100:61850
Dropping udp4 srflx 127.0.0.1:49910

Additionally, here's the port not released

vccplayer 12698 robin   54u     IPv4 0xd75407aecfb384bd        0t0                 UDP 192.168.0.100:49501
vccplayer 12698 robin   55u     IPv4 0xd75407aecfb387cd        0t0                 UDP *:49625

I just got this far, I will update you as soon as I got news

UPDATE: The candidate pair did not free because AgentConn did not drop, and sinceAgentConn impl Conn trait, it could be used in many place. Like you said, it's likely to be some reference cycles issue.

Here's how I get this error, I use a GET http request to exchange the SDP info, putting the streaming work behind the notifier from none candidate event(which means no more candidates) and data channel established event in another thread/task. And I frequently started 30(or more) http request and close, over and over. After a few rounds, I would get the error above. Normally like that, occasionally more ports unreleased.

I made up token to each request to track the port, and I found all the streaming work are not even started from timeout, the problematic peer connection state usually come to a RTCPeerConnectionState::Failed, after which I call the peer_connection.close().await method, try several times, I may get the error.

I am not able to target the issue yet but I guess it's quite likely this repo got some bug itself. Hope you could resolve it soon.

UPDATE It seems the AgentConn did not drop because many referer such as SctpTransport, DtlsTransport, IceTransport IceTransport are all not freed. And I found that's all because the PeerConnnectionInternal is not freed. After setting up a drop event, it seems promising. When sending signal such as ctrl+c, program stopped with PeerConnnectionInternal destruction method logs. So I guess it is quite possible the program is stuck somehwhere related to PeerConnnectionInternal blocking, looping or retrying. After a long time digging, I found many code with tokio channel or socket sender/receiver are written with no timeout cancelling safe guard. I dont know if it is intentional. Now to the digging part, there's 4 tokio threads and hundred of *.await written in peer_connection_internal.rs. For tokio threads, I put some timeout code to try, but sockets are still leaking.

UPDATE I spent several days logging just to target this issue and it seems I have target the issue. In PeerConnectionInternal, after local sdp and remote sdp are both set and start to negotiate, the function PeerConnection::start_rtp will be invoked, in this function, at the end of the code, self.start_sctp().await; is the place where code got stuck. With further digging, your may find PeerConnectionInternal::.sctp_transport.start -> sctp::association::Association::clientis the place all got stuck. It seems when negotiating, when the sctp process begin to create a client for association, it got no response in time(In my scenaria) before my code invoke peer_connection.close().await from timeout. The association actually got stuck because handshaking is not complete, the handshaking is achieved from a tokio::mpsc::channel, so it is cancel free with a timeout way. So I guess if it release the association and return some error before timeout, it should release the PeerConnectionInternal and get rid of the issue.

So with the theory above, I try to add this code below in /webrtc/src/sctp_transport/mod.rs in fn start

          let timeout = tokio::time::sleep(std::time::Duration::from_secs(8));
            tokio::pin!(timeout);
            let sctp_association = Arc::new(
                tokio::select! {
                    _ = timeout.as_mut() =>{
                        return Err(Error::ErrSCTPNotEstablished);
                    }
                    sctp_association_rst =
                    sctp::association::Association::client(sctp::association::Config {
                        net_conn: Arc::clone(net_conn) as Arc<dyn Conn + Send + Sync>,
                        max_receive_buffer_size: 0,
                        max_message_size: 0,
                        name: String::new(),
                    }) => {
                        sctp_association_rst?
                    }
                }
            );

I have tested dozens of rounds with the codes above, and it seems there's no unreleased port any more, for now. Please let me know if the code above is a right way fixing and if it would impect anything.

Thanks.

k0nserv commented 2 years ago

Amazing work on all this digging @shiqifeng2000, thank you so much for this!

I do think your fix should work and alleviate the issue as you have described, however I feel like a better fix might be to resolve the retain cycles by ensuring we don't use Arc everywhere instead relying on Weak for non-owning pointers. I'm not sure exactly which objects should own the relevant pieces of state, but the ideal would be that simply dropping the peer connection works as a side effect of these Weak pointers no longer being upgradable.

Since you've been digging in this code do you have thoughts on where ownership should lie?

I think we could be okay with your fix for now if you wanna send a PR.

shiqifeng2000 commented 2 years ago

Amazing work on all this digging @shiqifeng2000, thank you so much for this!

I do think your fix should work and alleviate the issue as you have described, however I feel like a better fix might be to resolve the retain cycles by ensuring we don't use Arc everywhere instead relying on Weak for non-owning pointers. I'm not sure exactly which objects should own the relevant pieces of state, but the ideal would be that simply dropping the peer connection works as a side effect of these Weak pointers no longer being upgradable.

Since you've been digging in this code do you have thoughts on where ownership should lie?

I think we could be okay with your fix for now if you wanna send a PR.

Well I am just a consumer and maybe my fix is not the best way for this repo. Arc and Weak pointer are better and should be align better to your coding pattern, so maybe it should be best to left to you to fix them.

As for ownership, please just make sure when peer_connection close, all the under-going async process are cancelled, all spawned threads released and all fields or substances related are dropped. That's all.

k0nserv commented 2 years ago

To be honest I'm also mostly a consumer although I've made several contributions to the project, but I'm not super familiar with this code path. That said, I had a look and there's RTCSctpTransport::notify_tx which is notified when RTCSctpTransport::stop is called. I think using tokio::select! with self.notify_tx.notified() as one of the branches should work to detect the stop condition.

Could you try this patch?

diff --git a/src/sctp_transport/mod.rs b/src/sctp_transport/mod.rs
index fb27b26..a1b8342 100644
--- a/src/sctp_transport/mod.rs
+++ b/src/sctp_transport/mod.rs
@@ -64,7 +64,7 @@ pub struct RTCSctpTransport {
     pub(crate) dtls_transport: Arc<RTCDtlsTransport>,

     // State represents the current state of the SCTP transport.
-    state: AtomicU8, //SCTPTransportState,
+    state: AtomicU8, // RTCSctpTransportState

     // SCTPTransportState doesn't have an enum to distinguish between New/Connecting
     // so we need a dedicated field
@@ -145,15 +145,26 @@ impl RTCSctpTransport {

         let dtls_transport = self.transport();
         if let Some(net_conn) = &dtls_transport.conn().await {
-            let sctp_association = Arc::new(
-                sctp::association::Association::client(sctp::association::Config {
-                    net_conn: Arc::clone(net_conn) as Arc<dyn Conn + Send + Sync>,
-                    max_receive_buffer_size: 0,
-                    max_message_size: 0,
-                    name: String::new(),
-                })
-                .await?,
-            );
+            let sctp_association = loop {
+                tokio::select! {
+                    _ = self.notify_tx.notified() => {
+                        // It seems like notify_tx is only notified on Stop so perhaps this check
+                        // is redundant.
+                        // TODO: Consider renaming notify_tx to shutdown_tx.
+                        if self.state.load(Ordering::SeqCst) == RTCSctpTransportState::Closed as u8 {
+                            return Err(Error::ErrSCTPTransportDTLS);
+                        }
+                    },
+                    association = sctp::association::Association::client(sctp::association::Config {
+                        net_conn: Arc::clone(net_conn) as Arc<dyn Conn + Send + Sync>,
+                        max_receive_buffer_size: 0,
+                        max_message_size: 0,
+                        name: String::new(),
+                    }) => {
+                        break Arc::new(association?);
+                    }
+                };
+            };

             {
                 let mut sa = self.sctp_association.lock().await;
diff --git a/src/sctp_transport/sctp_transport_state.rs b/src/sctp_transport/sctp_transport_state.rs
index f27f29e..67378ea 100644
--- a/src/sctp_transport/sctp_transport_state.rs
+++ b/src/sctp_transport/sctp_transport_state.rs
@@ -2,6 +2,7 @@ use std::fmt;

 /// SCTPTransportState indicates the state of the SCTP transport.
 #[derive(Debug, Copy, Clone, PartialEq)]
+#[repr(u8)]
 pub enum RTCSctpTransportState {
     Unspecified,
k0nserv commented 2 years ago

Hey @shiqifeng2000 did you get a chance to test this, if not do you have an example that replicates the problem and I can try

shiqifeng2000 commented 2 years ago

Hey @shiqifeng2000 did you get a chance to test this, if not do you have an example that replicates the problem and I can try

Hi, @k0nserv, so you are using a notifier instead of timeout, that's clever, I tried with your code above and test dozens of rounds and it seems works as well.

k0nserv commented 2 years ago

That's great, could you send a PR with this change please?

shiqifeng2000 commented 2 years ago

That's great, could you send a PR with this change please?

gladly, working on it

k0nserv commented 2 years ago

Closing this as I believe it was resolved