Haivision / srt

Secure, Reliable, Transport
https://www.srtalliance.org
Mozilla Public License 2.0
3.08k stars 842 forks source link

[FR] Listener callback: provide group ID #2728

Open maxsharabayko opened 1 year ago

maxsharabayko commented 1 year ago

Problem Statement

The SRT listener callback currently (SRT v1.5.2) does not provide a way to find out SRT group ID of a socket to be accepted.

typedef int srt_listen_callback_fn(void* opaq, SRTSOCKET ns, int hsversion, const struct sockaddr* peeraddr, const char* streamid);
SRT_API int srt_listen_callback(SRTSOCKET lsn, srt_listen_callback_fn* hook_fn, void* hook_opaque);

Solution 1 (Prefered)

Extend the listener callback with the Group ID value, or better with some key-value array to enable further extensions in the future.

The Group ID value can be used to limit the number of group connections on a listener socket.

Solution 2 (Discouraged)

Calling srt_groupof(ns) returns SRT_INVALID_SOCK because the HS was not yet parsed, and no group has been created/assigned to a socket. Creating a fake group ID value for the socket ID to be accepted may be a way around it, but would add unnecessary and potentially confusing logic.

ethouris commented 1 year ago

Are you sure that it is possible to obtain this number in case when this is the first connection in the group?

jeandube commented 3 months ago

What I really need is to distinguish a new group connection from a new link of an already connected group.

ethouris commented 3 months ago

Well, the handshake structure is passed to the runAcceptHook method, and this is also used to extract the group type that can be later obtained. Of course, there's only the PEER's group ID available this way, but there can be also called a function to find the group that has that peer ID, and that group ID can be made available through the srt_groupof call, similarly like this is done for the group type. Of course, for the very first call in the group it would return an invalid ID, but I think that's not a problem.

maxsharabayko commented 3 months ago

(Elaborating on @ethouris's comment) As a hack, a group connection may be distinguished from a regular connection attempt via the SRTO_GROUPTYPE option. A regular connection will have the value 0, while a group member connection will have a non-zero group type value.

Regarding the group ID. At the point of the listener callback, we only know the group ID of a peer, but not the local one unless a local group already exists. There is a corner case though when both members connect over separate listener ports in parallel. Then there would be two listener callback calls both BEFORE a local group is created, meaning both would be treated as a new group connection request.

SRT.cn: @600335716: runAcceptHook: peer group ID @1332354199 (does not exist).
SRT.cn: @600335715: runAcceptHook: peer group ID @1332354199 (does not exist).
SRT.gm: addGroup: @1674077536  <--- Local group has been created!!!

Example 2: one member connects a bit later.

SRT.cn: @803205604: runAcceptHook: remote group ID 1904656706 (does not exist).
SRT.gm: addGroup: @1876947425 <--- Local group has been created!!!
SRT.cn: @803205603: runAcceptHook: remote group ID 1904656706 (exists) <--- Second member connects
jeandube commented 3 months ago

The group type does not distinguish an added link from a new group. The peer group ID is a good solution and probably better than the local one due to the listen_callback race condition. It tells that only one group will be created. It help count both group links and accepted connections.

maxsharabayko commented 3 months ago

Proposal: Adding local and remote group IDs to the listen callback.

Could be added using the #ifdef SRT_160_API_PREVIEW in the next release, and replace the previous callback in v1.6.0.

Drawbacks:

Git diff: listener callback with group IDs (click to expand) ```diff diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 1612830..4d8b388 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -11850,6 +11850,8 @@ bool srt::CUDT::runAcceptHook(CUDT *acore, const sockaddr* peer, const CHandShak bool have_group = false; SRT_GROUP_TYPE gt = SRT_GTYPE_UNDEFINED; #endif + SRTSOCKET grpid = SRT_INVALID_SOCK; + SRTSOCKET peergrpid = SRT_INVALID_SOCK; // This tests if there are any extensions. if (hspkt.getLength() > CHandShake::m_iContentSize + 4 && IsSet(ext_flags, CHandShake::HS_EXT_CONFIG)) @@ -11890,6 +11892,11 @@ bool srt::CUDT::runAcceptHook(CUDT *acore, const sockaddr* peer, const CHandShak { uint32_t gd = groupdata[GRPD_GROUPDATA]; gt = SRT_GROUP_TYPE(SrtHSRequest::HS_GROUP_TYPE::unwrap(gd)); + + peergrpid = groupdata[GRPD_GROUPID]; + CUDTGroup* gp = uglobal().findPeerGroup_LOCKED(peergrpid); + if (gp) + grpid = gp->id(); } } #endif @@ -11922,7 +11929,7 @@ bool srt::CUDT::runAcceptHook(CUDT *acore, const sockaddr* peer, const CHandShak acore->m_RejectReason = SRT_REJX_FALLBACK; try { - int result = CALLBACK_CALL(m_cbAcceptHook, acore->m_SocketID, hs.m_iVersion, peer, target); + int result = CALLBACK_CALL(m_cbAcceptHook, acore->m_SocketID, grpid, peergrpid, hs.m_iVersion, peer, target); if (result == -1) return false; } diff --git a/srtcore/srt.h b/srtcore/srt.h index 614a85a..00c97d1 100644 --- a/srtcore/srt.h +++ b/srtcore/srt.h @@ -762,8 +762,15 @@ static inline int srt_bind_peerof (SRTSOCKET u, UDPSOCKET sys_udp_sock) { retur SRT_API int srt_listen (SRTSOCKET u, int backlog); SRT_API SRTSOCKET srt_accept (SRTSOCKET u, struct sockaddr* addr, int* addrlen); SRT_API SRTSOCKET srt_accept_bond (const SRTSOCKET listeners[], int lsize, int64_t msTimeOut); -typedef int srt_listen_callback_fn (void* opaq, SRTSOCKET ns, int hsversion, const struct sockaddr* peeraddr, const char* streamid); + +//#ifdef SRT_160_API_PREVIEW +typedef int srt_listen_callback_fn(void* opaq, SRTSOCKET ns, SRTSOCKET grpid, SRTSOCKET peergrpid, int hsversion, const struct sockaddr* peeraddr, const char* streamid); SRT_API int srt_listen_callback(SRTSOCKET lsn, srt_listen_callback_fn* hook_fn, void* hook_opaque); +//#else +//typedef int srt_listen_callback_fn (void* opaq, SRTSOCKET ns, int hsversion, const struct sockaddr* peeraddr, const char* streamid); +//SRT_API int srt_listen_callback(SRTSOCKET lsn, srt_listen_callback_fn* hook_fn, void* hook_opaque); +//#endif + typedef void srt_connect_callback_fn (void* opaq, SRTSOCKET ns, int errorcode, const struct sockaddr* peeraddr, int token); SRT_API int srt_connect_callback(SRTSOCKET clr, srt_connect_callback_fn* hook_fn, void* hook_opaque); SRT_API int srt_connect (SRTSOCKET u, const struct sockaddr* name, int namelen); ```
srt-xtransmit diff ```diff diff --git a/submodule/srt b/submodule/srt index cf13200..72303d7 160000 --- a/submodule/srt +++ b/submodule/srt @@ -1 +1 @@ -Subproject commit cf132005044232689cdc890a266b976e74333ca6 +Subproject commit 72303d7934f9c6b1cbe23c438672f0eba0f318cb-dirty diff --git a/xtransmit/misc.cpp b/xtransmit/misc.cpp index 1bf6a55..11420fc 100644 --- a/xtransmit/misc.cpp +++ b/xtransmit/misc.cpp @@ -30,7 +30,7 @@ shared_sock_t create_connection(const vector& parsed_urls, shared_soc listening_sock = make_shared(parsed_urls); socket::srt_group* s = dynamic_cast(listening_sock.get()); const bool accept = s->mode() == socket::srt_group::LISTENER; - if (accept) { + if (accept && !is_listening) { s->listen(); } shared_sock_t connection = accept ? s->accept() : s->connect(); diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index ddca1bb..f0ca3a2 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -425,7 +425,7 @@ int socket::srt_group::on_listen_callback(SRTSOCKET sock) return 0; } -int socket::srt_group::listen_callback_fn(void* opaq, SRTSOCKET sock, int hsversion, +int socket::srt_group::listen_callback_fn(void* opaq, SRTSOCKET sock, SRTSOCKET grpid, int hsversion, const struct sockaddr* peeraddr, const char* streamid) { if (opaq == nullptr) @@ -434,16 +434,21 @@ int socket::srt_group::listen_callback_fn(void* opaq, SRTSOCKET sock, int hsvers return 0; } + int optval = 0; + int optlen = sizeof optval; + srt_getsockflag(sock, SRTO_GROUPTYPE, (void*) &optval, &optlen); + netaddr_any sa(peeraddr); - sockaddr host_sa = {}; - int host_sa_len = sizeof host_sa; - srt_getsockname(sock, &host_sa, &host_sa_len); - netaddr_any host(&host_sa, host_sa_len); - spdlog::trace(LOG_SRT_GROUP "Accepted member socket @{}, host IP {}, remote IP {}", sock, host.str(), sa.str()); + netaddr_any host_sa; + int host_sa_len = host_sa.storage_size(); + srt_getsockname(sock, host_sa.get(), &host_sa_len); + netaddr_any host(host_sa.get(), host_sa_len); // TODO: this group may no longer exist. Use some global array to track valid groups. socket::srt_group* group = reinterpret_cast(opaq); + spdlog::trace(LOG_SRT_GROUP "@{} Accepted member socket @{}, host IP {}, remote IP {}, GT {}.", grpid, sock, host.str(), sa.str(), optval); + return group->on_listen_callback(sock); } diff --git a/xtransmit/srt_socket_group.hpp b/xtransmit/srt_socket_group.hpp index 9fbf3f4..bf294dc 100644 --- a/xtransmit/srt_socket_group.hpp +++ b/xtransmit/srt_socket_group.hpp @@ -70,7 +70,7 @@ private: void on_connect_callback(SRTSOCKET sock, int error, const sockaddr*, int token); static void connect_callback_fn(void* opaq, SRTSOCKET sock, int error, const sockaddr* peer, int token); int on_listen_callback(SRTSOCKET sock); - static int listen_callback_fn(void* opaq, SRTSOCKET sock, int hsversion, + static int listen_callback_fn(void* opaq, SRTSOCKET sock, SRTSOCKET grpid, int hsversion, const struct sockaddr* peeraddr, const char* streamid); using options = std::map; ```
ethouris commented 3 months ago

I think there's not much we can do about that corner case when connections to the same group are being added perfectly simultaneously. Although I think that the probability of the number of simultaneous connections reporting simultaneously decreases with the number of connections.

Note that you can request yourself to be notified with the new member connection and you can simply forcefully break the connection if it exceeds the limit.

ethouris commented 3 months ago

BTW Changes in the callback function signature is a bad idea. It introduces hard backward ABI incompatibility.