ZLMediaKit / ZLToolKit

一个基于C++11的轻量级网络框架,基于线程池技术可以实现大并发网络IO
MIT License
1.96k stars 585 forks source link

It is recommended that socket creation and connection be performed in two steps. #83

Open wujianGit123 opened 2 years ago

wujianGit123 commented 2 years ago

Currently, the SockUtil::connect function creates and binds the socket, then immediately connects to the remote destination IP and port. However, sometimes these two things need to be done separately. For example, during GB28181 playback negotiation, when the subordinate receives the invite request from the superior and returns a 200ok SDP message with the streaming source IP and port, some superiors have added security boundary devices that require verification of the subordinate's source IP and port before allowing the subordinate to connect and send stream data. In this case, three steps are required: 1. Create and bind the local port; 2. Send the streaming IP and port to the superior and wait for the superior's ack; 3. Connect to the remote destination IP and port. The specific implementation code is:

bool Socket::createAndBindSock(bool udpOrTcp, uint16_t port, const string &local_ip) {
    closeSock();
    int fd = SockUtil::createAndBindSock(udpOrTcp, port, local_ip.data());
    if (fd == -1)
        return false;

    SockFD::Ptr sock;
    if (udpOrTcp) {
        sock = makeSock(fd, SockNum::Sock_UDP);
        if (!attachEvent(sock, true))
            return false;
    }
    else
    {
        // 注意tcp在连接目标成功后才会监听事件
        sock = makeSock(fd, SockNum::Sock_TCP);
    }
    LOCK_GUARD(_mtx_sock_fd);
    _sock_fd = sock;
    return true;
}
void Socket::connect(const string &url, uint16_t port, onErrCB con_cb_in,
    float timeout_sec, const string &local_ip, uint16_t local_port, bool newSock) {
    if (newSock) {
        // 重置当前socket
        closeSock();
    }
    weak_ptr<Socket> weak_self = shared_from_this();
    auto con_cb = [con_cb_in, weak_self](const SockException &err) {
        auto strong_self = weak_self.lock();
        if (!strong_self) {
            return;
        }
        strong_self->_async_con_cb = nullptr;
        strong_self->_con_timer = nullptr;
        if (err) {
            LOCK_GUARD(strong_self->_mtx_sock_fd);
            strong_self->_sock_fd = nullptr;
        }
        con_cb_in(err);
    };

    auto async_con_cb = std::make_shared<function<void(int)> >([weak_self, con_cb, newSock](int sock) {
        auto strong_self = weak_self.lock();
        if (sock == -1 || !strong_self) {
            if (!strong_self) {
                CLOSE_SOCK(sock);
            } else {
                con_cb(SockException(Err_dns, get_uv_errmsg(true)));
            }
            return;
        }
        bool test = false;
        SockFD::Ptr sock_fd = strong_self->getSock();
        if (newSock || !sock_fd) {
            test = true;
            sock_fd = strong_self->makeSock(sock, SockNum::Sock_TCP);
        }
        weak_ptr<SockFD> weak_sock_fd = sock_fd;
        // 监听该socket是否可写,可写表明已经连接服务器成功
        int result = strong_self->_poller->addEvent(sock, Event_Write, [weak_self, weak_sock_fd, con_cb](int event) {
            auto strong_sock_fd = weak_sock_fd.lock();
            auto strong_self = weak_self.lock();
            if (strong_sock_fd && strong_self) {
                //socket可写事件,说明已经连接服务器成功
                strong_self->onConnected(strong_sock_fd, con_cb);
            }
        });

        if (result == -1) {
            con_cb(SockException(Err_other, "add event to poller failed when start connect"));
            return;
        }
        if (test) {
            // 保存fd
            strong_self->setSock(sock_fd);
        }
    });

    auto poller = _poller;
    weak_ptr<function<void(int)> > weak_task = async_con_cb;
    int sockfd_in = -1;
    if (!newSock) {
        SockFD::Ptr sock_fd = getSock();
        if (sock_fd)
            sockfd_in = sock_fd->rawFd();
    }
    WorkThreadPool::Instance().getExecutor()->async([url, port, local_ip, local_port, sockfd_in, weak_task, poller]() {
        // 阻塞式dns解析放在后台线程执行
        int sock = SockUtil::connect(url.data(), port, true, local_ip.data(), local_port, sockfd_in);
        poller->async([sock, weak_task]() {
            auto strong_task = weak_task.lock();
            if (strong_task) {
                (*strong_task)(sock);
            } else {
                CLOSE_SOCK(sock);
            }
        });
    });

    //连接超时定时器
    _con_timer = std::make_shared<Timer>(timeout_sec, [weak_self, con_cb]() {
        con_cb(SockException(Err_timeout, uv_strerror(UV_ETIMEDOUT)));
        return false;
    }, _poller);

    _async_con_cb = async_con_cb;
}
int SockUtil::createAndBindSock(bool udpOrTcp, const uint16_t port, const char* localIp, int af, bool bAsync, bool reusePort) {
    int sockfd = -1;
    if (udpOrTcp)
        sockfd = (int)socket(af, SOCK_DGRAM, IPPROTO_UDP);
    else
        sockfd = (int)socket(af, SOCK_STREAM, IPPROTO_TCP);
    if (sockfd == -1) {
        WarnL << "创建套接字失败:" << get_uv_errmsg(true);
        return -1;
    }
    setReuseable(sockfd, reusePort);
    setNoSigpipe(sockfd);
    setNoBlocked(sockfd, bAsync);
    if (!udpOrTcp)
        setNoDelay(sockfd);
    setSendBuf(sockfd);
    setRecvBuf(sockfd);
    setCloseWait(sockfd);
    setCloExec(sockfd);
    if (bindSock(sockfd, localIp, port) == -1) {
        close(sockfd);
        return -1;
    }
    return sockfd;
}
int SockUtil::connect(const char *host, uint16_t port, bool bAsync, const char* localIp, uint16_t localPort, int sockfd_in) {
    sockaddr addr;
    if(!DnsCache::Instance().getDomainIP(host,addr)){
        //dns解析失败
        return -1;
    }
    //设置端口号
    ((sockaddr_in *)&addr)->sin_port = htons(port);

    int sockfd = sockfd_in;
    if (sockfd < 0)
        sockfd = createAndBindSock(false, localPort, localIp, addr.sa_family, bAsync);
    if (sockfd < 0) {
        WarnL << "创建套接字失败:" << host;
        return -1;
    }
    if (::connect(sockfd, &addr, sizeof(struct sockaddr)) == 0) {
        //同步连接成功
        return sockfd;
    }
    if (bAsync &&  get_uv_error(true) == UV_EAGAIN) {
        //异步连接成功
        return sockfd;
    }
    WarnL << "连接主机失败:" << host << " " << port << " " << get_uv_errmsg(true);
    close(sockfd);
    return -1;
}

目前SockUtil::connect函数内完成套接字创建和端口绑定,然后立即连接到远程目的ip和端口,但有时候这两件事需要分开进行,比如在gb28181播放协商的时候,下级在收到上级invite请求,返回200ok的sdp报文中附带发流源ip和端口,有些上级加了安全边界设备,需要核对下级的源ip端口才允许下级连接并发送码流数据,这种情况就需要做三个步骤:1、创建并绑定本地端口;2、向上级发送发流ip和端口并等待上级ack;3、连接远程目的ip和端口。具体实现代码:

bool Socket::createAndBindSock(bool udpOrTcp, uint16_t port, const string &local_ip) {
    closeSock();
    int fd = SockUtil::createAndBindSock(udpOrTcp, port, local_ip.data());
    if (fd == -1)
        return false;

    SockFD::Ptr sock;
    if (udpOrTcp) {
        sock = makeSock(fd, SockNum::Sock_UDP);
        if (!attachEvent(sock, true))
            return false;
    }
    else
    {
        // 注意tcp在连接目标成功后才会监听事件
        sock = makeSock(fd, SockNum::Sock_TCP);
    }
    LOCK_GUARD(_mtx_sock_fd);
    _sock_fd = sock;
    return true;
}
void Socket::connect(const string &url, uint16_t port, onErrCB con_cb_in,
    float timeout_sec, const string &local_ip, uint16_t local_port, bool newSock) {
    if (newSock) {
        // 重置当前socket
        closeSock();
    }
    weak_ptr<Socket> weak_self = shared_from_this();
    auto con_cb = [con_cb_in, weak_self](const SockException &err) {
        auto strong_self = weak_self.lock();
        if (!strong_self) {
            return;
        }
        strong_self->_async_con_cb = nullptr;
        strong_self->_con_timer = nullptr;
        if (err) {
            LOCK_GUARD(strong_self->_mtx_sock_fd);
            strong_self->_sock_fd = nullptr;
        }
        con_cb_in(err);
    };

    auto async_con_cb = std::make_shared<function<void(int)> >([weak_self, con_cb, newSock](int sock) {
        auto strong_self = weak_self.lock();
        if (sock == -1 || !strong_self) {
            if (!strong_self) {
                CLOSE_SOCK(sock);
            } else {
                con_cb(SockException(Err_dns, get_uv_errmsg(true)));
            }
            return;
        }
        bool test = false;
        SockFD::Ptr sock_fd = strong_self->getSock();
        if (newSock || !sock_fd) {
            test = true;
            sock_fd = strong_self->makeSock(sock, SockNum::Sock_TCP);
        }
        weak_ptr<SockFD> weak_sock_fd = sock_fd;
        // 监听该socket是否可写,可写表明已经连接服务器成功
        int result = strong_self->_poller->addEvent(sock, Event_Write, [weak_self, weak_sock_fd, con_cb](int event) {
            auto strong_sock_fd = weak_sock_fd.lock();
            auto strong_self = weak_self.lock();
            if (strong_sock_fd && strong_self) {
                //socket可写事件,说明已经连接服务器成功
                strong_self->onConnected(strong_sock_fd, con_cb);
            }
        });

        if (result == -1) {
            con_cb(SockException(Err_other, "add event to poller failed when start connect"));
            return;
        }
        if (test) {
            // 保存fd
            strong_self->setSock(sock_fd);
        }
    });

    auto poller = _poller;
    weak_ptr<function<void(int)> > weak_task = async_con_cb;
    int sockfd_in = -1;
    if (!newSock) {
        SockFD::Ptr sock_fd = getSock();
        if (sock_fd)
            sockfd_in = sock_fd->rawFd();
    }
    WorkThreadPool::Instance().getExecutor()->async([url, port, local_ip, local_port, sockfd_in, weak_task, poller]() {
        // 阻塞式dns解析放在后台线程执行
        int sock = SockUtil::connect(url.data(), port, true, local_ip.data(), local_port, sockfd_in);
        poller->async([sock, weak_task]() {
            auto strong_task = weak_task.lock();
            if (strong_task) {
                (*strong_task)(sock);
            } else {
                CLOSE_SOCK(sock);
            }
        });
    });

    //连接超时定时器
    _con_timer = std::make_shared<Timer>(timeout_sec, [weak_self, con_cb]() {
        con_cb(SockException(Err_timeout, uv_strerror(UV_ETIMEDOUT)));
        return false;
    }, _poller);

    _async_con_cb = async_con_cb;
}
int SockUtil::createAndBindSock(bool udpOrTcp, const uint16_t port, const char* localIp, int af, bool bAsync, bool reusePort) {
    int sockfd = -1;
    if (udpOrTcp)
        sockfd = (int)socket(af, SOCK_DGRAM, IPPROTO_UDP);
    else
        sockfd = (int)socket(af, SOCK_STREAM, IPPROTO_TCP);
    if (sockfd == -1) {
        WarnL << "创建套接字失败:" << get_uv_errmsg(true);
        return -1;
    }
    setReuseable(sockfd, reusePort);
    setNoSigpipe(sockfd);
    setNoBlocked(sockfd, bAsync);
    if (!udpOrTcp)
        setNoDelay(sockfd);
    setSendBuf(sockfd);
    setRecvBuf(sockfd);
    setCloseWait(sockfd);
    setCloExec(sockfd);
    if (bindSock(sockfd, localIp, port) == -1) {
        close(sockfd);
        return -1;
    }
    return sockfd;
}
int SockUtil::connect(const char *host, uint16_t port, bool bAsync, const char* localIp, uint16_t localPort, int sockfd_in) {
    sockaddr addr;
    if(!DnsCache::Instance().getDomainIP(host,addr)){
        //dns解析失败
        return -1;
    }
    //设置端口号
    ((sockaddr_in *)&addr)->sin_port = htons(port);

    int sockfd = sockfd_in;
    if (sockfd < 0)
        sockfd = createAndBindSock(false, localPort, localIp, addr.sa_family, bAsync);
    if (sockfd < 0) {
        WarnL << "创建套接字失败:" << host;
        return -1;
    }
    if (::connect(sockfd, &addr, sizeof(struct sockaddr)) == 0) {
        //同步连接成功
        return sockfd;
    }
    if (bAsync &&  get_uv_error(true) == UV_EAGAIN) {
        //异步连接成功
        return sockfd;
    }
    WarnL << "连接主机失败:" << host << " " << port << " " << get_uv_errmsg(true);
    close(sockfd);
    return -1;
}

TRANS_BY_GITHUB_AI_ASSISTANT

xia-chu commented 2 years ago

This feature has been partially implemented. I will organize it and open source it when I have time.

这个特性已经部分实现 有空我整理下开源出来。

TRANS_BY_GITHUB_AI_ASSISTANT