Open ibc opened 8 months ago
I've added some console logs: https://github.com/versatica/mediasoup/pull/1353/commits/069f78e62c618afa37622e119b4ea0d522fc9da1
The test fails because first sent message is not later received by the data consumer:
npx jest --testPathPattern node/src/test/test-node-sctp.ts
console.log
TODO: Revert numMessages to 200
at node/src/test/test-node-sctp.ts:116:10
console.log
---- sending id 1
at node/src/test/test-node-sctp.ts:143:12
console.log
---- sending id 2
at node/src/test/test-node-sctp.ts:143:12
console.log
---- sending id 3
at node/src/test/test-node-sctp.ts:143:12
console.log
---- received id 2
at SCTPStreamReadable.<anonymous> (node/src/test/test-node-sctp.ts:175:13)
FAIL test/test-node-sctp.ts (7.075 s)
✕ ordered DataProducer delivers all SCTP messages to the DataConsumer (4267 ms)
● ordered DataProducer delivers all SCTP messages to the DataConsumer
id 2 in message should match numReceivedMessages 1
177 | if (id !== numReceivedMessages) {
178 | reject(
> 179 | new Error(
| ^
180 | `id ${id} in message should match numReceivedMessages ${numReceivedMessages}`
181 | )
182 | );
at SCTPStreamReadable.<anonymous> (node/src/test/test-node-sctp.ts:179:7)
UPDATE: Issue found. Problem is that when usrsctp send callback is called, there we store sending data into a map and then invoke uv async (which will happen time later) and such a uv async will read from that storage to send the data. Problem is that, if two sequential messages must be sent to a peer, the second one will override the first one in the storage so when uv async callback is executed it will only read the second one. Here the issue (see how onSendSctpData()
is called twice sequentially):
---- test | sending id 1
---- test | sending id 2
RTC::Transport::OnSctpAssociationMessageReceived() | ------ [streamId:123, ppid:53, msg:'1']
RTC::DataProducer::ReceiveMessage() | ------ [msg:'1']
RTC::Router::OnTransportDataProducerMessageReceived() | ------ [ppid:53, msg:'1']
RTC::DataConsumer::SendMessage() | ------ [ppid:53, msg:'1']
RTC::Transport::OnDataConsumerSendMessage() | ------ [ppid:53, msg:'1']
RTC::PlainTransport::SendMessage() | ------ [ppid:53, msg:'1']
RTC::SctpAssociation::SendSctpMessage() | ------ [ppid:53, msg:'1']
DepUsrSCTP::onSendSctpData() | ****
DepUsrSCTP::SendSctpData() | **** previous store.data: 0x0 [len:0]
DepUsrSCTP::onSendSctpData() | ****
DepUsrSCTP::SendSctpData() | **** previous store.data: 0x600001a88040 [len:32]
DepUsrSCTP::onAsync() | ****
RTC::Transport::OnSctpAssociationMessageReceived() | ------ [streamId:123, ppid:53, msg:'2']
RTC::DataProducer::ReceiveMessage() | ------ [msg:'2']
RTC::Router::OnTransportDataProducerMessageReceived() | ------ [ppid:53, msg:'2']
RTC::DataConsumer::SendMessage() | ------ [ppid:53, msg:'2']
RTC::Transport::OnDataConsumerSendMessage() | ------ [ppid:53, msg:'2']
RTC::PlainTransport::SendMessage() | ------ [ppid:53, msg:'2']
RTC::SctpAssociation::SendSctpMessage() | ------ [ppid:53, msg:'2']
DepUsrSCTP::onSendSctpData() | ****
DepUsrSCTP::SendSctpData() | **** previous store.data: 0x0 [len:0]
DepUsrSCTP::onAsync() | ****
---- test | received id 2
RUNS test/test-node-sctp.ts
FAIL test/test-node-sctp.ts
✕ ordered DataProducer delivers all SCTP messages to the DataConsumer (1645 ms)
● ordered DataProducer delivers all SCTP messages to the DataConsumer
id 2 in message should match numReceivedMessages 1
Solution is: Store all pending messages to be sent in a container.
Maybe related to this?:
https://docs.libuv.org/en/v1.x/async.html
libuv will coalesce calls to uv_async_send(), that is, not every call to it will yield an execution of the callback. For example: if uv_async_send() is called 5 times in a row before the callback is called, the callback will only be called once. If uv_async_send() is called again after the callback was called, it will be called again.
Ok, so I've solved the problem as follows: https://github.com/versatica/mediasoup/pull/1353/commits/1a25bedd503f2a96f3d745f7f9af203e02688ab9
Test now works (for simplicity is temporary reduced to just 2 messages) and all logs are good:
DepUsrSCTP::SendSctpDataStore() | ---------- store constructor [fooId:1]
DepUsrSCTP::onSendSctpData() | **** onSendSctpData() called
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back()... [items.size:0]
DepUsrSCTP::SendSctpDataItem() | ---------- item constructor
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back() DONE [item.fooId:1, items.size:1]
DepUsrSCTP::onSendSctpData() | **** onSendSctpData() called
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back()... [items.size:1]
DepUsrSCTP::SendSctpDataItem() | ---------- item constructor
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:1, data:0x6000014fc620]
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back() DONE [item.fooId:2, items.size:2]
DepUsrSCTP::onAsync() | **** onAsync() called
DepUsrSCTP::onAsync() | ------------- sending pening messages [items.size:2]
DepUsrSCTP::ClearItems() | ---------- store ClearItems() [fooId:1, items.size():2]
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:2, data:0x7f78ff705690]
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:1, data:0x6000014fc620]
DepUsrSCTP::onSendSctpData() | **** onSendSctpData() called
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back()... [items.size:0]
DepUsrSCTP::SendSctpDataItem() | ---------- item constructor
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back() DONE [item.fooId:3, items.size:1]
DepUsrSCTP::onAsync() | **** onAsync() called
DepUsrSCTP::onAsync() | ------------- sending pening messages [items.size:1]
DepUsrSCTP::ClearItems() | ---------- store ClearItems() [fooId:1, items.size():1]
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:3, data:0x600003df8090]
TODO: Revert numMessages to 200
---- test | sending id 1
---- test | sending id 2
RTC::Transport::OnSctpAssociationMessageReceived() | ------ [streamId:123, ppid:53, msg:'1']
RTC::DataProducer::ReceiveMessage() | ------ [msg:'1']
RTC::DataConsumer::SendMessage() | ------ [ppid:53, msg:'1']
RTC::SctpAssociation::SendSctpMessage() | ------ [ppid:53, msg:'1']
DepUsrSCTP::onSendSctpData() | **** onSendSctpData() called
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back()... [items.size:0]
DepUsrSCTP::SendSctpDataItem() | ---------- item constructor
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back() DONE [item.fooId:4, items.size:1]
DepUsrSCTP::onSendSctpData() | **** onSendSctpData() called
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back()... [items.size:1]
DepUsrSCTP::SendSctpDataItem() | ---------- item constructor
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back() DONE [item.fooId:5, items.size:2]
DepUsrSCTP::onAsync() | **** onAsync() called
DepUsrSCTP::onAsync() | ------------- sending pening messages [items.size:2]
DepUsrSCTP::ClearItems() | ---------- store ClearItems() [fooId:1, items.size():2]
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:5, data:0x600003fe0c00]
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:4, data:0x600003fe08e0]
---- test | received id 1
RTC::Transport::OnSctpAssociationMessageReceived() | ------ [streamId:123, ppid:53, msg:'2']
RTC::DataProducer::ReceiveMessage() | ------ [msg:'2']
RTC::DataConsumer::SendMessage() | ------ [ppid:53, msg:'2']
RTC::SctpAssociation::SendSctpMessage() | ------ [ppid:53, msg:'2']
DepUsrSCTP::onSendSctpData() | **** onSendSctpData() called
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back()... [items.size:0]
DepUsrSCTP::SendSctpDataItem() | ---------- item constructor
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back() DONE [item.fooId:6, items.size:1]
DepUsrSCTP::onAsync() | **** onAsync() called
DepUsrSCTP::onAsync() | ------------- sending pening messages [items.size:1]
DepUsrSCTP::ClearItems() | ---------- store ClearItems() [fooId:1, items.size():1]
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:6, data:0x600003ffeec0]
---- test | received id 2
PASS test/test-node-sctp.ts (5.395 s)
✓ ordered DataProducer delivers all SCTP messages to the DataConsumer (1608 ms)
And OF COURSE a double free bug that just happens in Rust in CI in Ubuntu and not in MacOS, to make me spend yet another 2 extra days on this: https://github.com/versatica/mediasoup/actions/runs/8172985292/job/22344414084?pr=1353
Are you making sure that threadA is not writing to a SendSctpDataStore
at the when at the same time the uv
main thread is reading it?
Are you making sure that threadA is not writing to a
SendSctpDataStore
at the when at the same time theuv
main thread is reading it?
ClassDestroy() method should also use the mutex since it clears the storage map so every store is deallocated so it deletes all items in the items array. It maybe that.
Also take into account that this PR doesn't yet implement the thing about running the Checker in a separate thread.
Are you making sure that threadA is not writing to a
SendSctpDataStore
at the when at the same time theuv
main thread is reading it?ClassDestroy() method should also use the mutex since it clears the storage map so every store is deallocated so it deletes all items in the items array. It maybe that.
No, false alarm, it already calls const std::lock_guard<std::mutex> lock(GlobalSyncMutex);
.
I've done many changes in https://github.com/versatica/mediasoup/pull/1353/commits/ca5f222e914cde25622de368ca927a9b3dbaf1a8:
DepUsrSCTP::Checker
from Worker
constructor/destructor to DepUsrSCTP::ClassInit()
and DepUsrSCTP::ClassDestroy()
. This comes with an issue (see Issue 2 in next comment).DepUsrSCTP::Checker
singleton be static
rather than thread_local static
. Why? Because there must be only one and not one per thread (in Rust).When running any test in Node:
npx jest --testPathPattern node/src/test/test-node-sctp.ts
mediasoup:ERROR:Worker (stderr) DepLibUV::onWalk() | alive UV handle found (this shouldn't happen) [type:timer, active:0, closing:1, has_ref:1] +0ms
492 | for (const line of buffer.toString('utf8').split('\n')) {
493 | if (line) {
> 494 | workerLogger.error(`(stderr) ${line}`);
| ^
495 | }
496 | }
497 | });
This is because DepUsrSCTP::CloseChecker()
is now called from DepUsrSCTP::ClassDestroy()
rather than from Worker::Close()
method. Not sure why it happens honestly, not sure what the difference is. I've added logs to file in /tmp/ms_log.txt
and the problem is clear:
---- Timer created
---- Timer created
---- onCloseTimer
---- WORKER CLOSED
---- DepUsrSCTP::ClassDestroy()---- DepUsrSCTP::ClassDestroy() | calling mapAsyncHandlerSendSctpData.clear()
---- onWalk
---- onCloseTimer
As you can see, when onWalk()
runs (which is defined in DepLibUV.cpp
and it's called by lib.cpp
when the Worker
instance ends) the timer of the Checker
singleton hasn't been yet freed. It's done later.
After a meeting with Jose we have decided that, as originally planned, the timer of the usrsctp Checker
singleton must run in a separate thread X and when the first SctpAssociation
is created or when the last one is destroyed (in any Worker
thread) then DepUsrSctp
must use uv_async_send
to tell the Checker
singleton to start, stop or restart the timer. And same when DepUsrSctp::HandleUsrSctpTimers()
is called if PR https://github.com/versatica/mediasoup/pull/1351 is merged.
Are you making sure that threadA is not writing to a SendSctpDataStore at the when at the same time the uv main thread is reading it?
Running a test with ThreadSanitizer enabled may help confirm that.
WIP
Fixes https://github.com/versatica/mediasoup/issues/1352
Details
TODO
None of these changes should take effect when in Node, so we need to pass (or to NOT pass) some
define
only from Rust to enable this in the C++ code. We don't want to deal with UV async stuff when in Node because it's not needed at all, so let's see how to do it.Missing thread X to initialize usrsctp and run the
Checker
singleton. And many other things.