shepeliev / webrtc-kmp

WebRTC Kotlin Multiplatform SDK
Apache License 2.0
167 stars 34 forks source link

Data loss occurs in the RTCDataChannel #99

Open Gaubee opened 10 months ago

Gaubee commented 10 months ago

COPY ISSUES FROM webrtc-sdk/Specs/issues/7

webrtc-sdk/Specs/114.5735.08 my demo is simple, open RTCDataChannel by webrtc-kmp and safari. then 10MB of data is sent, 64kb at one time. In theory, 157 times need to be sent to complete the transmission, but on the native side, often only 42 to 44 times can be received.

The same DEMO is correct on Android. I've read the source code for webrtc-kmp and I don't find it problematic. So one can only suspect that something is wrong here.

suspend fun connectDataChannel(): DataChannel {
    val peer = PeerConnection()
    val dataChannel = peer.createDataChannel(
      "default",
      ordered = true,
      /** maxRetransmits 和 maxRetransmitTimeMs 不能同时配置 */
      maxRetransmits = 2,
//      maxRetransmitTimeMs = 3000
    )
      ?: throw Exception("fail to create data channel")
    val offer = peer.createOffer(OfferAnswerOptions())
//
//    dataChannel.onError.onEach {
//      println("data channel error:$it")
//    }.launchIn(scope)
//    dataChannel.onOpen.onEach {
//      println("data channel open")
//    }.launchIn(scope)
//    dataChannel.onClose.onEach {
//      println("data channel close")
//    }.launchIn(scope)
//    dataChannel.onMessage.onEach {
//      println("data channel message: ${it.size}")
//    }.launchIn(scope)
    peer.setLocalDescription(offer)
    var lock = CompletableDeferred<Unit>()
    ws.onMessage { msg ->
      if (msg.text?.startsWith("icecandidate:") == true) {
        val iceCandidateJson = msg.text.split(':', limit = 2)[1]
        val iceCandidate =
          Json { ignoreUnknownKeys = true }.decodeFromString<IceCandidateData>(iceCandidateJson)
            .toIceCandidate()
        peer.addIceCandidate(iceCandidate)
      } else if (msg.text?.startsWith("remote:") == true) {
        val des = Json.decodeFromString<SessionDescriptionData>(msg.text.split(':', limit = 2)[1])
          .toSessionDescription()
        peer.setRemoteDescription(des)
      } else if (msg.text == "open-channel") {
        lock.complete(Unit)
      }
    }
    ws.postMessage("data-channel:${Json.encodeToString(SessionDescriptionData.from(offer))}")
    peer.onIceCandidate.onEach { iceCandidate ->
      ws.postMessage("icecandidate:${Json.encodeToString(IceCandidateData.from(iceCandidate))}")
    }.launchIn(scope)
//    dataChannel.onOpen.first()
    lock.await()
    println("okk:${dataChannel.readyState}")
    return dataChannel
  }

// ...
  suspend fun testNative2Js2() {
    val dataChannel = channel2Deferred.await()
    var sendSize = 0;
    println("Native2Js2: ${dataChannel.readyState}/${dataChannel.bufferedAmount}")
    val dur = measureTime {
      while (sendSize < totalSize) {
        dataChannel.send(ByteArray(unitSize))
        sendSize += unitSize
      }
      val lastMsg = "echo:${randomUUID()}".toUtf8ByteArray();
      dataChannel.send(lastMsg)
    }

    println("native2js2:[$dur] ${mbpres(sendSize, dur)}")
  }

  suspend fun testJs2Native2(
  ) {
    val dataChannel = channel2Deferred.await()
    var gotSize = 0;
    val endMessageContent = "end-channel-post".toUtf8ByteArray()
    val dur = measureTime {
      var lock = CompletableDeferred<Unit>()
      val job = dataChannel.onMessage.onEach { msg ->
        println("endMessageContentsize: ${msg.size}")
        if (msg.contentEquals(endMessageContent)) { /// !!!! HERE !!!! I have not been able to receive this message, and I have only received 42-44 of the 157 data.
          lock.complete(Unit)
        } else {
          gotSize += msg.size
          dataChannel.send(gotSize.toLittleEndianByteArray())
        }
      }.launchIn(scope)
      ws.postMessage("start-channel-post")
      lock.await()
      job.cancel()
    }
    println("js2native2:[$dur] ${mbpres(gotSize, dur)}")
  }

js code:

ws.addEventListener("message", async (event) => {
  const data = event.data;
//  ... 
if (typeof data === "string") {
    if (data.startsWith("data-channel:")) {
      const peer = new RTCPeerConnection();
      peerRs.resolve(peer);
      await peer.setRemoteDescription(JSON.parse(data.substring("data-channel:".length)));
      peer.ondatachannel = (e) => {
        channel = e.channel;
        channel.bufferedAmountLowThreshold = Math.ceil(unitSize / 2);
        console.log("open-channel", channel);
        channel.onmessage = (e) => {
          const { data: arraybuffer } = e;
          if (arraybuffer instanceof ArrayBuffer) {
            if (arraybuffer.byteLength < 1024) {
              const data = decoder.decode(arraybuffer);
              if (data.startsWith("echo:")) {
                channel?.send(arraybuffer);
              }
            }
          }
        };
        ws.send("open-channel");
      };
      peer.onicecandidate = (e) => {
        const candidate = e.candidate;
        if (candidate) {
          ws.send(`icecandidate:${JSON.stringify(candidate)}`);
        }
      };

      const answer = await peer.createAnswer();
      peer.setLocalDescription(answer);
      ws.send(`remote:${JSON.stringify(answer)}`);
    } else if (data.startsWith("icecandidate:")) {
      const peer = await peerRs.promise;
      const candidate = JSON.parse(data.substring("icecandidate:".length));
      peer.addIceCandidate(candidate);
    } else if (data === "start-channel-post" && channel !== null) {
     /// !!!!! HERE !!!! send message to native
      let i = 1;
      for (let sendSize = 0; sendSize < totalSize; sendSize += unitSize) {
        channel.send(new Uint8Array(unitSize));
        console.log("channel.bufferedAmount", channel.bufferedAmount);
        if (channel.bufferedAmount > channel.bufferedAmountLowThreshold) {
          const { promise, resolve } = promiseWithResolvers();
          channel.onbufferedamountlow = resolve;
          await promise;
        }
        const cur = i++;
        if (cur === 44) {
          await new Promise((cb) => setTimeout(cb, 1000));
        }
      }
      channel.send(encoder.encode("end-channel-post"));
      console.log("end channel post");
    }

I don't think there's anything wrong with this code, and on Android, it works correctly, so I'm just Posting it here for reference.

Can you give me some inspiration?

shepeliev commented 10 months ago

@Gaubee sorry, I'm not able to look deeper your demo right now. However, there PR #98 has been merged recently. It is not published yet. So it might be a cause if you were using 0.114.4.

Gaubee commented 10 months ago

@shepeliev I have review this commit. This commit is just a question of the js platform, but in my example, I wrote the js-code myself. The problem I found was on IOS, not JS.

Gaubee commented 10 months ago

I did this test because I was trying to set up faster communication between WKWebView and native instead of WKScriptMessageHandler based communication. I have tested that WebSocket's speed does not meet the demand, and it has a large speed gap between up and down, which seems to be a hard-coded limitation. That's why I'm trying to give WebRTC point-to-point connectivity.

Maybe I should be suspicious of the WKWebView problem?

tamimattafi commented 2 months ago

Having the same issue with iOS data channel, most of the time the data channel is closed with no events or feedback

tamimattafi commented 2 months ago

iOS data channel (From onDataChannel callback) stops receiving messages in a short time after the connection was established. While debugging, I found out that didReceiveMessageWithBuffer which is inside DataChannel, stops getting called. No signs of any crash or error.

Maybe this is related https://groups.google.com/g/discuss-webrtc/c/9NObqxnItCg

tamimattafi commented 2 weeks ago

Hello @shepeliev! Can we please have a patch release with this fix https://github.com/shepeliev/webrtc-kmp/pull/123? I'm confident it will fix @Gaubee's problem as well

shepeliev commented 2 weeks ago

@tamimattafi, I've just published it. @Gaubee, could you check if it solves the issue, please?

tamimattafi commented 2 weeks ago

@tamimattafi, I've just published it. @Gaubee, could you check if it solves the issue, please?

Thank you! @shepeliev