webrtc-sdk / Specs

WebRTC SDK for iOS/mac (Cocopods Specs)
MIT License
26 stars 18 forks source link

Data loss occurs in the RTCDataChannel #7

Closed Gaubee closed 9 months ago

Gaubee commented 9 months ago

webrtc-sdk/Specs/114.5735.08 my demo is simple, open RTCDataChannel by webrtc-kmp and safari. then 10MB of data is sent, 64kb at one time. In theory, 157 times need to be sent to complete the transmission, but on the native side, often only 42 to 44 times can be received.

The same DEMO is correct on Android. I've read the source code for webrtc-kmp and I don't find it problematic. So one can only suspect that something is wrong here.

suspend fun connectDataChannel(): DataChannel {
    val peer = PeerConnection()
    val dataChannel = peer.createDataChannel(
      "default",
      ordered = true,
      /** maxRetransmits 和 maxRetransmitTimeMs 不能同时配置 */
      maxRetransmits = 2,
//      maxRetransmitTimeMs = 3000
    )
      ?: throw Exception("fail to create data channel")
    val offer = peer.createOffer(OfferAnswerOptions())
//
//    dataChannel.onError.onEach {
//      println("data channel error:$it")
//    }.launchIn(scope)
//    dataChannel.onOpen.onEach {
//      println("data channel open")
//    }.launchIn(scope)
//    dataChannel.onClose.onEach {
//      println("data channel close")
//    }.launchIn(scope)
//    dataChannel.onMessage.onEach {
//      println("data channel message: ${it.size}")
//    }.launchIn(scope)
    peer.setLocalDescription(offer)
    var lock = CompletableDeferred<Unit>()
    ws.onMessage { msg ->
      if (msg.text?.startsWith("icecandidate:") == true) {
        val iceCandidateJson = msg.text.split(':', limit = 2)[1]
        val iceCandidate =
          Json { ignoreUnknownKeys = true }.decodeFromString<IceCandidateData>(iceCandidateJson)
            .toIceCandidate()
        peer.addIceCandidate(iceCandidate)
      } else if (msg.text?.startsWith("remote:") == true) {
        val des = Json.decodeFromString<SessionDescriptionData>(msg.text.split(':', limit = 2)[1])
          .toSessionDescription()
        peer.setRemoteDescription(des)
      } else if (msg.text == "open-channel") {
        lock.complete(Unit)
      }
    }
    ws.postMessage("data-channel:${Json.encodeToString(SessionDescriptionData.from(offer))}")
    peer.onIceCandidate.onEach { iceCandidate ->
      ws.postMessage("icecandidate:${Json.encodeToString(IceCandidateData.from(iceCandidate))}")
    }.launchIn(scope)
//    dataChannel.onOpen.first()
    lock.await()
    println("okk:${dataChannel.readyState}")
    return dataChannel
  }

// ...
  suspend fun testNative2Js2() {
    val dataChannel = channel2Deferred.await()
    var sendSize = 0;
    println("Native2Js2: ${dataChannel.readyState}/${dataChannel.bufferedAmount}")
    val dur = measureTime {
      while (sendSize < totalSize) {
        dataChannel.send(ByteArray(unitSize))
        sendSize += unitSize
      }
      val lastMsg = "echo:${randomUUID()}".toUtf8ByteArray();
      dataChannel.send(lastMsg)
    }

    println("native2js2:[$dur] ${mbpres(sendSize, dur)}")
  }

  suspend fun testJs2Native2(
  ) {
    val dataChannel = channel2Deferred.await()
    var gotSize = 0;
    val endMessageContent = "end-channel-post".toUtf8ByteArray()
    val dur = measureTime {
      var lock = CompletableDeferred<Unit>()
      val job = dataChannel.onMessage.onEach { msg ->
        println("endMessageContentsize: ${msg.size}")
        if (msg.contentEquals(endMessageContent)) { /// !!!! HERE !!!! I have not been able to receive this message, and I have only received 42-44 of the 157 data.
          lock.complete(Unit)
        } else {
          gotSize += msg.size
          dataChannel.send(gotSize.toLittleEndianByteArray())
        }
      }.launchIn(scope)
      ws.postMessage("start-channel-post")
      lock.await()
      job.cancel()
    }
    println("js2native2:[$dur] ${mbpres(gotSize, dur)}")
  }

js code:

ws.addEventListener("message", async (event) => {
  const data = event.data;
//  ... 
if (typeof data === "string") {
    if (data.startsWith("data-channel:")) {
      const peer = new RTCPeerConnection();
      peerRs.resolve(peer);
      await peer.setRemoteDescription(JSON.parse(data.substring("data-channel:".length)));
      peer.ondatachannel = (e) => {
        channel = e.channel;
        channel.bufferedAmountLowThreshold = Math.ceil(unitSize / 2);
        console.log("open-channel", channel);
        channel.onmessage = (e) => {
          const { data: arraybuffer } = e;
          if (arraybuffer instanceof ArrayBuffer) {
            if (arraybuffer.byteLength < 1024) {
              const data = decoder.decode(arraybuffer);
              if (data.startsWith("echo:")) {
                channel?.send(arraybuffer);
              }
            }
          }
        };
        ws.send("open-channel");
      };
      peer.onicecandidate = (e) => {
        const candidate = e.candidate;
        if (candidate) {
          ws.send(`icecandidate:${JSON.stringify(candidate)}`);
        }
      };

      const answer = await peer.createAnswer();
      peer.setLocalDescription(answer);
      ws.send(`remote:${JSON.stringify(answer)}`);
    } else if (data.startsWith("icecandidate:")) {
      const peer = await peerRs.promise;
      const candidate = JSON.parse(data.substring("icecandidate:".length));
      peer.addIceCandidate(candidate);
    } else if (data === "start-channel-post" && channel !== null) {
     /// !!!!! HERE !!!! send message to native
      let i = 1;
      for (let sendSize = 0; sendSize < totalSize; sendSize += unitSize) {
        channel.send(new Uint8Array(unitSize));
        console.log("channel.bufferedAmount", channel.bufferedAmount);
        if (channel.bufferedAmount > channel.bufferedAmountLowThreshold) {
          const { promise, resolve } = promiseWithResolvers();
          channel.onbufferedamountlow = resolve;
          await promise;
        }
        const cur = i++;
        if (cur === 44) {
          await new Promise((cb) => setTimeout(cb, 1000));
        }
      }
      channel.send(encoder.encode("end-channel-post"));
      console.log("end channel post");
    }

I don't think there's anything wrong with this code, and on Android, it works correctly, so I'm just Posting it here for reference.

Can you give me some inspiration?

davidzhao commented 9 months ago

Sorry, we are not familiar with this problem. if it's an issue with libwebrtc or webrtc-kmp, raising it with their respective repos would be the best.