ZLMediaKit / ZLToolKit

一个基于C++11的轻量级网络框架,基于线程池技术可以实现大并发网络IO
MIT License
1.94k stars 581 forks source link

修复同一个SockNum对象在一个EventPoller中有机率先进行delEvent 后进行addEvent的问题 #200

Closed baigao-X closed 9 months ago

baigao-X commented 9 months ago

典型场景:

频繁地进行调用openRtpServer和closeRtpServer, 当响应处理同一个TcpServer open和close操作的EventPoller不同时, 有机率因为close请求响应的EventPoller先对SockNum进行了delEvent,而该EventPoller的addEvent操作后执行(因为此时该EventPoller处理该SockNum对象的addEvent一定是异步的),导致SockNum和对应的端口永远不被释放。

xia-chu commented 9 months ago

感谢您的反馈 您说的这个问题确实存在,不过我觉得有个更简单的办法:

Index: src/Network/Socket.h
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/Network/Socket.h b/src/Network/Socket.h
--- a/src/Network/Socket.h  (revision 4220eb985b7d435928e2ccb57d188f5f53ae06e9)
+++ b/src/Network/Socket.h  (date 1703758830855)
@@ -195,12 +195,9 @@
      ~SockFD() { delEvent(); }

     void delEvent() {
-        if (_poller) {
-            auto num = _num;
-            // 移除io事件成功后再close fd
-            _poller->delEvent(num->rawFd(), [num](bool) {});
-            _poller = nullptr;
-        }
+        auto num = _num;
+        // 移除io事件成功后再close fd
+        _poller->delEvent(num->rawFd(), [num](bool) {});
     }

     void setConnected() {

要不麻烦您试下?

xia-chu commented 9 months ago

这个问题想了下 本质是TcpServer是多线程导致的

baigao-X commented 9 months ago

尝试理解了下您的这段修改,似乎对解决我遇到的这个问题没有作用。 实际测试了下也是如此。 附上修改后并增加调试日志的patch以及日志截图,或许能够帮助我更好地阐述这个问题的现象。

关注 端口 32360以及对应 fd 63

log

diff --git a/3rdpart/ZLToolKit/src/Network/Socket.cpp b/3rdpart/ZLToolKit/src/Network/Socket.cpp
index 72acc6b..8259c24 100644
--- a/3rdpart/ZLToolKit/src/Network/Socket.cpp
+++ b/3rdpart/ZLToolKit/src/Network/Socket.cpp
@@ -503,6 +503,7 @@ bool Socket::listen(uint16_t port, const string &local_ip, int backlog) {
     if (fd == -1) {
         return false;
     }
+    InfoL << "debug:  listen port: " << port << " to fd: " << fd;
     return fromSock_l(std::make_shared<SockNum>(fd, SockNum::Sock_TCP_Server));
 }

diff --git a/3rdpart/ZLToolKit/src/Network/Socket.h b/3rdpart/ZLToolKit/src/Network/Socket.h
index 2ca8d03..7f69418 100644
--- a/3rdpart/ZLToolKit/src/Network/Socket.h
+++ b/3rdpart/ZLToolKit/src/Network/Socket.h
@@ -18,6 +18,7 @@
 #include <sstream>
 #include <functional>
 #include "Util/SpeedStatistic.h"
+#include "Util/logger.h"
 #include "sockutil.h"
 #include "Poller/Timer.h"
 #include "Poller/EventPoller.h"
@@ -132,6 +133,7 @@ public:
         ::shutdown(_fd, SHUT_RDWR);
         #endif
         close(_fd);
+        InfoL << "debug: close fd:" << _fd;
     }

     int rawFd() const {
@@ -142,6 +144,7 @@ public:
         return _type;
     }

+
     void setConnected() {
 #if defined (OS_IPHONE)
         setSocketOfIOS(_fd);
@@ -194,12 +197,9 @@ public:
      ~SockFD() { delEvent(); }

     void delEvent() {
-        if (_poller) {
-            auto num = _num;
-            // 移除io事件成功后再close fd
-            _poller->delEvent(num->rawFd(), [num](bool) {});
-            _poller = nullptr;
-        }
+        auto num = _num;
+        // 移除io事件成功后再close fd
+        _poller->delEvent(num->rawFd(), [num](bool) {});
     }

     void setConnected() {
diff --git a/3rdpart/ZLToolKit/src/Network/sockutil.cpp b/3rdpart/ZLToolKit/src/Network/sockutil.cpp
index 0c713f0..254b31b 100644
--- a/3rdpart/ZLToolKit/src/Network/sockutil.cpp
+++ b/3rdpart/ZLToolKit/src/Network/sockutil.cpp
@@ -419,7 +419,7 @@ static int bind_sock6(int fd, const char *ifr_ip, uint16_t port) {
         addr.sin6_addr = IN6ADDR_ANY_INIT;
     }
     if (::bind(fd, (struct sockaddr *) &addr, sizeof(addr)) == -1) {
-        WarnL << "Bind socket failed: " << get_uv_errmsg(true);
+        WarnL << "Bind socket " << fd << " to port: " << port << " failed: " << get_uv_errmsg(true);
         return -1;
     }
     return 0;
diff --git a/3rdpart/ZLToolKit/src/Poller/EventPoller.cpp b/3rdpart/ZLToolKit/src/Poller/EventPoller.cpp
index 3394009..fd119cd 100644
--- a/3rdpart/ZLToolKit/src/Poller/EventPoller.cpp
+++ b/3rdpart/ZLToolKit/src/Poller/EventPoller.cpp
@@ -113,7 +113,9 @@ int EventPoller::addEvent(int fd, int event, PollEventCB cb) {
         int ret = epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &ev);
         if (ret == 0) {
             _event_map.emplace(fd, std::make_shared<PollEventCB>(std::move(cb)));
         }
+        InfoL << "debug: " << getThreadName() << "addEvent fd: " << fd;
         return ret;
 #else
 #ifndef _WIN32
@@ -127,13 +129,18 @@ int EventPoller::addEvent(int fd, int event, PollEventCB cb) {
         record->event = event;
         record->call_back = std::move(cb);
         _event_map.emplace(fd, record);
         return 0;
 #endif //HAS_EPOLL
     }

+    InfoL << "debug: " << getThreadName() << "-async addEvent fd: " << fd;
+
     async([this, fd, event, cb]() {
         addEvent(fd, event, std::move(const_cast<PollEventCB &>(cb)));
     });
     return 0;
 }

@@ -147,6 +154,8 @@ int EventPoller::delEvent(int fd, PollCompleteCB cb) {
 #if defined(HAS_EPOLL)
         bool success = epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, nullptr) == 0 && _event_map.erase(fd) > 0;
         cb(success);
+        InfoL << "debug: " << getThreadName() << "delEvent fd: " << fd;
         return success ? 0 : -1;
 #else
         cb(_event_map.erase(fd));
@@ -156,6 +165,7 @@ int EventPoller::delEvent(int fd, PollCompleteCB cb) {
     }

     //跨线程操作
+    InfoL << "debug: " << getThreadName() << "-async delEvent fd: " << fd;
     async([this, fd, cb]() {
         delEvent(fd, std::move(const_cast<PollCompleteCB &>(cb)));
     });
@@ -173,7 +183,9 @@ int EventPoller::modifyEvent(int fd, int event, PollCompleteCB cb) {
         ev.events = toEpoll(event);
         ev.data.fd = fd;
         auto ret = epoll_ctl(_epoll_fd, EPOLL_CTL_MOD, fd, &ev);
         cb(ret == 0);
+        InfoL << "debug: " << getThreadName() << "-modifyEvent fd: " << fd << " event: " << event << (!ret? " success" : " fail") << " ret: " << ret << "errno: " << errno;
         return ret;
 #else
         auto it = _event_map.find(fd);
@@ -184,6 +196,7 @@ int EventPoller::modifyEvent(int fd, int event, PollCompleteCB cb) {
         return 0;
 #endif // HAS_EPOLL
     }
+    InfoL << "debug: " << getThreadName() << "-async modifyEvent fd: " << fd;
     async([this, fd, event, cb]() {
         modifyEvent(fd, event, std::move(const_cast<PollCompleteCB &>(cb)));
     });
xia-chu commented 9 months ago

嗨 打上这个补丁 我觉得就没问题了:

Index: src/Network/TcpServer.cpp
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/Network/TcpServer.cpp b/src/Network/TcpServer.cpp
--- a/src/Network/TcpServer.cpp (revision 30a148a2d1b5877d28463b4cdc5c8c502ef8ed15)
+++ b/src/Network/TcpServer.cpp (date 1703843283941)
@@ -76,7 +76,7 @@
 }

 TcpServer::Ptr TcpServer::onCreatServer(const EventPoller::Ptr &poller) {
-    return std::make_shared<TcpServer>(poller);
+    return Ptr(new TcpServer(poller), [poller](TcpServer *ptr) { poller->async([ptr]() { delete ptr; }); });
 }

 Socket::Ptr TcpServer::onBeforeAcceptConnection(const EventPoller::Ptr &poller) {
xia-chu commented 9 months ago

这个问题本质是TcpServer的析构不是线程安全的

xia-chu commented 9 months ago

你这个修改太复杂了 我觉得不合适合并

主要是TcpServer违背了一个对象一个线程的设计原则 所以才会导致这个bug。 我们应该修复问题本质 而不是为了兼容bug而打补丁。

baigao-X commented 9 months ago

是的,确实你所说的才是根因。看逻辑这样应该就能解决这个问题。 我会在验证完成后关闭这个pr。

xia-chu commented 9 months ago

你把我的补丁推送成pr吧

baigao-X commented 9 months ago

抱歉验证花了较长时间。 采用您的pr后,测试还是出现了问题。我想问题应该出在这里。ZLMediaKit 中该处的调用也需要修改。 image

baigao-X commented 9 months ago

结合ZLMediaKit 中RtpServer创建TcpServer时采用类似方式的修改,缺陷不复现,故关闭该Pr

xia-chu commented 9 months ago

@baigao-X 你是这么修改的么?

Index: src/Rtp/RtpServer.cpp
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp
--- a/src/Rtp/RtpServer.cpp (revision f056c3037bba78f3d352bc5c3ff0495c4c485e3e)
+++ b/src/Rtp/RtpServer.cpp (date 1702807764252)
@@ -180,7 +180,8 @@
     _tcp_mode = tcp_mode;
     if (tcp_mode == PASSIVE || tcp_mode == ACTIVE) {
         //创建tcp服务器
-        tcp_server = std::make_shared<TcpServer>(rtp_socket->getPoller());
+        auto poller = rtp_socket->getPoller();
+        std::shared_ptr<TcpServer> tcp_server(new TcpServer(poller), [poller](TcpServer *ptr) { poller->async([ptr]() { delete ptr; }); });
         (*tcp_server)[RtpSession::kStreamID] = stream_id;
         (*tcp_server)[RtpSession::kSSRC] = ssrc;
         (*tcp_server)[RtpSession::kOnlyAudio] = only_audio;
xia-chu commented 9 months ago

我觉得还是这样改才能一劳永逸:

Index: src/Network/Socket.h
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/Network/Socket.h b/src/Network/Socket.h
--- a/src/Network/Socket.h  (revision 0dbf6c8d727c9f905458c81043f8fbad07ca5c58)
+++ b/src/Network/Socket.h  (date 1704285899355)
@@ -300,7 +300,6 @@
      * @param enable_mutex 是否启用互斥锁(接口是否线程安全)
     */
     static Ptr createSocket(const EventPoller::Ptr &poller = nullptr, bool enable_mutex = true);
-    Socket(const EventPoller::Ptr &poller = nullptr, bool enable_mutex = true);
     ~Socket() override;

     /**
@@ -512,6 +511,8 @@
     std::string getIdentifier() const override;

 private:
+    Socket(EventPoller::Ptr poller, bool enable_mutex = true);
+
     void setSock(SockNum::Ptr sock);
     int onAccept(const SockNum::Ptr &sock, int event) noexcept;
     ssize_t onRead(const SockNum::Ptr &sock, const BufferRaw::Ptr &buffer) noexcept;
Index: src/Network/Socket.cpp
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/Network/Socket.cpp b/src/Network/Socket.cpp
--- a/src/Network/Socket.cpp    (revision 0dbf6c8d727c9f905458c81043f8fbad07ca5c58)
+++ b/src/Network/Socket.cpp    (date 1704286231483)
@@ -49,20 +49,17 @@
     return toSockException(error);
 }

-Socket::Ptr Socket::createSocket(const EventPoller::Ptr &poller, bool enable_mutex) {
-    return std::make_shared<Socket>(poller, enable_mutex);
+Socket::Ptr Socket::createSocket(const EventPoller::Ptr &poller_in, bool enable_mutex) {
+    auto poller = poller_in ? poller_in : EventPollerPool::Instance().getPoller();
+    return Socket::Ptr(new Socket(poller, enable_mutex), [poller](Socket *ptr) { poller->async([ptr]() { delete ptr; }); });
 }

-Socket::Socket(const EventPoller::Ptr &poller, bool enable_mutex)
-    : _mtx_sock_fd(enable_mutex)
+Socket::Socket(EventPoller::Ptr poller, bool enable_mutex)
+    : _poller(std::move(poller))
+    , _mtx_sock_fd(enable_mutex)
     , _mtx_event(enable_mutex)
     , _mtx_send_buf_waiting(enable_mutex)
     , _mtx_send_buf_sending(enable_mutex) {
-
-    _poller = poller;
-    if (!_poller) {
-        _poller = EventPollerPool::Instance().getPoller();
-    }
     setOnRead(nullptr);
     setOnErr(nullptr);
     setOnAccept(nullptr);
baigao-X commented 9 months ago

是的。 不过我选择仿照你TcpServer::onCreatServer的写法 额外封装了个TcpServer::createServer的static接口,并对外隐藏了TcpServer的构造。 我觉得既然直接 new 出来的TcpServer存在这个风险,就不应该能被外部直接调用。

baigao-X commented 9 months ago

嗯 封装到Socket层看起来会更加好。 我明天验证下。

xia-chu commented 9 months ago

对 隐藏构造方法更好

xia-chu commented 9 months ago

@baigao-X 你要不用我那个patch再提高pr ?