ZLMediaKit / ZLToolKit

一个基于C++11的轻量级网络框架,基于线程池技术可以实现大并发网络IO
MIT License
1.94k stars 581 forks source link

修复Udp数据已关闭流,由于最后几帧数据传入又重新创建session对象 #191

Closed mc373906408 closed 10 months ago

mc373906408 commented 10 months ago

目前是通过0.5s自动删除的map,来判断这几帧数据是不是当前关闭的流。

xia-chu commented 10 months ago

这个针对什么协议?国标吗?

mc373906408 commented 10 months ago

我是SIP协议,应该是UDP传入的都会发生这个问题

xia-chu commented 10 months ago

其实 可不可以换种思路呢 就是udp socket触发onEr后 延时移除对象呢?我觉得这样可能更好。 因为onError是emitErr时触发的 而emitErr时会delEvent,这样udp socket就不会再触发网络接收事件了。 但是由于延时移除对象,所以这段时间内 这个socket收到的数据都会被直接丢弃

xia-chu commented 10 months ago

不过这个可能对某些老内核不适用 因为有些老内核 触发数据接收的udp fd可能并不是这个对应的fd

xia-chu commented 10 months ago

不过这个可能对某些老内核不适用 因为有些老内核 触发数据接收的udp fd可能并不是这个对应的fd

这个补丁能修复这个问题:

Index: src/Network/Server.h
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/Network/Server.h b/src/Network/Server.h
--- a/src/Network/Server.h  (revision ad44a16c99834540b397774ad6c7f3f8ed619d56)
+++ b/src/Network/Server.h  (date 1700549107505)
@@ -49,6 +49,8 @@

 class SessionHelper {
 public:
+    bool enable = true;
+
     using Ptr = std::shared_ptr<SessionHelper>;

     SessionHelper(const std::weak_ptr<Server> &server, Session::Ptr session, std::string cls);
Index: src/Network/UdpServer.h
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/Network/UdpServer.h b/src/Network/UdpServer.h
--- a/src/Network/UdpServer.h   (revision ad44a16c99834540b397774ad6c7f3f8ed619d56)
+++ b/src/Network/UdpServer.h   (date 1700548667692)
@@ -89,12 +89,12 @@
     /**
      * @brief 根据对端信息获取或创建一个会话
      */
-    Session::Ptr getOrCreateSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len, bool &is_new);
+    SessionHelper::Ptr getOrCreateSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len, bool &is_new);

     /**
      * @brief 创建一个会话, 同时进行必要的设置
      */
-    Session::Ptr createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len);
+    SessionHelper::Ptr createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len);

     /**
      * @brief 创建socket
Index: src/Network/UdpServer.cpp
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/Network/UdpServer.cpp b/src/Network/UdpServer.cpp
--- a/src/Network/UdpServer.cpp (revision ad44a16c99834540b397774ad6c7f3f8ed619d56)
+++ b/src/Network/UdpServer.cpp (date 1700549173025)
@@ -19,6 +19,8 @@
 static const uint8_t s_in6_addr_maped[]
     = { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00 };

+static constexpr auto kUdpDelayCloseMS = 3 * 1000;
+
 static UdpServer::PeerIdType makeSockId(sockaddr *addr, int) {
     UdpServer::PeerIdType ret;
     switch (addr->sa_family) {
@@ -142,32 +144,36 @@
     onRead_l(true, id, buf, addr, addr_len);
 }

-static void emitSessionRecv(const Session::Ptr &session, const Buffer::Ptr &buf) {
+static void emitSessionRecv(const SessionHelper::Ptr &helper, const Buffer::Ptr &buf) {
+    if (!helper->enable) {
+        // 延时销毁中
+        return;
+    }
     try {
-        session->onRecv(buf);
+        helper->session()->onRecv(buf);
     } catch (SockException &ex) {
-        session->shutdown(ex);
+        helper->session()->shutdown(ex);
     } catch (exception &ex) {
-        session->shutdown(SockException(Err_shutdown, ex.what()));
+        helper->session()->shutdown(SockException(Err_shutdown, ex.what()));
     }
 }

 void UdpServer::onRead_l(bool is_server_fd, const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len) {
     // udp server fd收到数据时触发此函数;大部分情况下数据应该在peer fd触发,此函数应该不是热点函数
     bool is_new = false;
-    if (auto session = getOrCreateSession(id, buf, addr, addr_len, is_new)) {
-        if (session->getPoller()->isCurrentThread()) {
+    if (auto helper = getOrCreateSession(id, buf, addr, addr_len, is_new)) {
+        if (helper->session()->getPoller()->isCurrentThread()) {
             //当前线程收到数据,直接处理数据
-            emitSessionRecv(session, buf);
+            emitSessionRecv(helper, buf);
         } else {
             //数据漂移到其他线程,需要先切换线程
             WarnL << "UDP packet incoming from other thread";
-            std::weak_ptr<Session> weak_session = session;
+            std::weak_ptr<SessionHelper> weak_helper = helper;
             //由于socket读buffer是该线程上所有socket共享复用的,所以不能跨线程使用,必须先拷贝一下
             auto cacheable_buf = std::make_shared<BufferString>(buf->toString());
-            session->async([weak_session, cacheable_buf]() {
-                if (auto strong_session = weak_session.lock()) {
-                    emitSessionRecv(strong_session, cacheable_buf);
+            helper->session()->async([weak_helper, cacheable_buf]() {
+                if (auto strong_helper = weak_helper.lock()) {
+                    emitSessionRecv(strong_helper, cacheable_buf);
                 }
             });
         }
@@ -207,42 +213,40 @@
     });
 }

-Session::Ptr UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len, bool &is_new) {
+SessionHelper::Ptr UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len, bool &is_new) {
     {
         //减小临界区
         std::lock_guard<std::recursive_mutex> lock(*_session_mutex);
         auto it = _session_map->find(id);
         if (it != _session_map->end()) {
-            return it->second->session();
+            return it->second;
         }
     }
     is_new = true;
     return createSession(id, buf, addr, addr_len);
 }

-static Session::Ptr s_null_session;
-
-Session::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
+SessionHelper::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
     // 此处改成自定义获取poller对象,防止负载不均衡
     auto socket = createSocket(EventPollerPool::Instance().getPoller(false), buf, addr, addr_len);
     if (!socket) {
         //创建socket失败,本次onRead事件收到的数据直接丢弃
-        return s_null_session;
+        return nullptr;
     }

     auto addr_str = string((char *) addr, addr_len);
     std::weak_ptr<UdpServer> weak_self = std::static_pointer_cast<UdpServer>(shared_from_this());
-    auto session_creator = [this, weak_self, socket, addr_str, id]() -> Session::Ptr {
+    auto helper_creator = [this, weak_self, socket, addr_str, id]() -> SessionHelper::Ptr {
         auto server = weak_self.lock();
         if (!server) {
-            return s_null_session;
+            return nullptr;
         }

         //如果已经创建该客户端对应的UdpSession类,那么直接返回
         lock_guard<std::recursive_mutex> lck(*_session_mutex);
         auto it = _session_map->find(id);
         if (it != _session_map->end()) {
-            return it->second->session();
+            return it->second;
         }

         assert(_socket);
@@ -250,13 +254,11 @@
         socket->bindPeerAddr((struct sockaddr *) addr_str.data(), addr_str.size());

         auto helper = _session_alloc(server, socket);
-        auto session = helper->session();
         // 把本服务器的配置传递给 Session
-        session->attachServer(*this);
+        helper->session()->attachServer(*this);

-        std::weak_ptr<Session> weak_session = session;
-        auto cls = helper->className();
-        socket->setOnRead([weak_self, weak_session, id](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
+        std::weak_ptr<SessionHelper> weak_helper = helper;
+        socket->setOnRead([weak_self, weak_helper, id](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
             auto strong_self = weak_self.lock();
             if (!strong_self) {
                 return;
@@ -264,8 +266,8 @@

             //快速判断是否为本会话的的数据, 通常应该成立
             if (id == makeSockId(addr, addr_len)) {
-                if (auto strong_session = weak_session.lock()) {
-                    emitSessionRecv(strong_session, buf);
+                if (auto strong_helper = weak_helper.lock()) {
+                    emitSessionRecv(strong_helper, buf);
                 }
                 return;
             }
@@ -273,7 +275,7 @@
             //收到非本peer fd的数据,让server去派发此数据到合适的session对象
             strong_self->onRead_l(false, id, buf, addr, addr_len);
         });
-        socket->setOnErr([weak_self, weak_session, id, cls](const SockException &err) {
+        socket->setOnErr([weak_self, weak_helper, id](const SockException &err) {
             // 在本函数作用域结束时移除会话对象
             // 目的是确保移除会话前执行其 onError 函数
             // 同时避免其 onError 函数抛异常时没有移除会话对象
@@ -283,40 +285,47 @@
                 if (!strong_self) {
                     return;
                 }
-                //从共享map中移除本session对象
-                lock_guard<std::recursive_mutex> lck(*strong_self->_session_mutex);
-                strong_self->_session_map->erase(id);
+                // 延时移除udp session, 防止频繁快速重复对象
+                strong_self->_poller->doDelayTask(kUdpDelayCloseMS, [weak_self, id]() {
+                    if (auto strong_self = weak_self.lock()) {
+                        // 从共享map中移除本session对象
+                        lock_guard<std::recursive_mutex> lck(*strong_self->_session_mutex);
+                        strong_self->_session_map->erase(id);
+                    }
+                    return 0;
+                });
             });

             // 获取会话强应用
-            if (auto strong_session = weak_session.lock()) {
+            if (auto strong_helper = weak_helper.lock()) {
                 // 触发 onError 事件回调
-                TraceP(strong_session) << cls << " on err: " << err;
-                strong_session->onError(err);
+                TraceP(strong_helper->session()) << strong_helper->className() << " on err: " << err;
+                strong_helper->enable = false;
+                strong_helper->session()->onError(err);
             }
         });

         auto pr = _session_map->emplace(id, std::move(helper));
         assert(pr.second);
-        return pr.first->second->session();
+        return pr.first->second;
     };

     if (socket->getPoller()->isCurrentThread()) {
         //该socket分配在本线程,直接创建session对象,并处理数据
-        return session_creator();
+        return helper_creator();
     }

     //该socket分配在其他线程,需要先拷贝buffer,然后在其所在线程创建session对象并处理数据
     auto cacheable_buf = std::make_shared<BufferString>(buf->toString());
-    socket->getPoller()->async([session_creator, cacheable_buf]() {
+    socket->getPoller()->async([helper_creator, cacheable_buf]() {
         //在该socket所在线程创建session对象
-        auto session = session_creator();
-        if (session) {
+        auto helper = helper_creator();
+        if (helper) {
             //该数据不能丢弃,给session对象消费
-            emitSessionRecv(session, cacheable_buf);
+            emitSessionRecv(helper, cacheable_buf);
         }
     });
-    return s_null_session;
+    return nullptr;
 }

 void UdpServer::setOnCreateSocket(onCreateSocket cb) {
xia-chu commented 10 months ago

你可以测试下这个分支:https://github.com/ZLMediaKit/ZLToolKit/tree/feature/udp_delay_close 看看是否满足你的需求

mc373906408 commented 10 months ago

你是对的,这个问题修复了。可以关闭这个pr。