ethand91 / mediasoup-ios-client

Mediasoup 3 iOS Client
ISC License
131 stars 65 forks source link

onProduce will block the application thread and freeze the application if callback not invoked immediately #67

Open xplatsolutions opened 4 years ago

xplatsolutions commented 4 years ago

Having the below SendTransportListener.

class MediasoupSendTransportHandler : NSObject, SendTransportListener {

  fileprivate weak var delegate: SendTransportListener?
  private var parent: MediasoupRoomManager

  init(parent: MediasoupRoomManager) {
    self.parent = parent
  }

  func onProduce(_ transport: Transport!, kind: String!, rtpParameters: String!, appData: String!, callback: ((String?) -> Void)!) {
    print("SendTransport::onProduce " + kind + " rtpParameters = " + rtpParameters)
    parent.onLocalTransportProduce(transport: transport, kind: kind, rtpParameters: rtpParameters, appData: appData, callback: callback) // Passing the callback so that when async response from server received to invoke with the producer ID
//    callback("**") // Uncomment this line with a random string and it will unblock everything fine
  }

  func onConnect(_ transport: Transport!, dtlsParameters: String!) {
    print("SendTransport::onConnect dtlsParameters = " + dtlsParameters)
    parent.onLocalTransportConnect(transport: transport, dtlsParameters: dtlsParameters) // Async HTTP REST request, irrelevant for this problem since it is always working fine
  }

  func onConnectionStateChange(_ transport: Transport!, connectionState: String!) {
    print("SendTransport::onConnectionStateChange connectionState = " + connectionState)
  }

}

onLocalTransportProduce method to send async HTTP REST request to the server and retrieve the created producer ID.

func onLocalTransportProduce(transport: Transport, kind: String, rtpParameters: String, appData: String, callback: ((String?) -> Void)!) {
    let transportId = transport.getId()
    let jsonBody: JSON = ["kind": kind, "rtpParameters": JSON(parseJSON: rtpParameters)]
    // This is a Combine publisher using AlamoFire, for more details check below
    mediaApi.createProducer(roomId: roomId, broadcasterId: broadcasterId, transportId: transportId!, bodyJson: jsonBody)
      .sink(receiveCompletion: { completion in
        self.onReceiveCompletion(label: "mediaApi.createProducer \(transportId!)", completion: completion)
      }, receiveValue: { result in
        let producerId = result["id"].stringValue
       callback(producerId) // if you call this twice it will lead to an error regarding the state of the promise etc...see below. comment to avoid the error but ID will never passed to libMediasoupClient
      })
      .store(in: &disposables)
  }

libc++abi.dylib: terminating with uncaught exception of type std::__1::future_error: The state of the promise has already been set.

AlamoFire HTTP request to create producer.

func createProducer(roomId: String, broadcasterId: String, transportId: String, bodyJson: JSON) -> AnyPublisher<JSON, MediasoupServerError> {
    let parameters : Parameters = bodyJson.dictionaryObject ?? [:]
    return session
      .request(
        makeCreateProducerUrlPath(roomId: roomId, broadcasterId: broadcasterId, transportId: transportId),
        method: .post,
        parameters: parameters,
        encoding: JSONEncoding.default)
      .publishData()
      .tryMap { result -> JSON in
        try JSON(data: result.data!)
    }
    .mapError { (error) -> MediasoupServerError in
      .network(description: error.localizedDescription)
    }
    .eraseToAnyPublisher()
  }
}

Any idea of how to invoke the callback in the subscriber without getting the promise state changed error and of course not block the main thread?

xplatsolutions commented 4 years ago

I was taking a look at the mediasoup-broadcaster-demo and it looks like in the handler they are using an async HTTP request returning a future, maybe we can change the design of the SendTransportListener somehow to avoid blocking the thread and supporting async requests;

Disclaimer: I am not a C++ programmer :)

ethand91 commented 4 years ago

The OnProduce listener already returns a promise https://github.com/ethand91/mediasoup-ios-client/blob/master/mediasoup-client-ios/include/wrapper/TransportWrapper.h#L80

You would need to make it so that the callback is only called once. I'm assuming AlomaFire sends requests on a background thread?

xplatsolutions commented 4 years ago

@ethand91 the problem is with that promise callback, let me explain further to make sure you understand so we can find a solution.

Yes, AlamoFire or URLSession will create a background thread, technically all async requests will.

I have tried with URLSession and Alamofire the result is the same as soon as a background thread is created and callback future is passed in another delegate. If no background thread (async request is initiated) not started the completion handler is invoked fine, so this is a threading race condition or some other condition between the future and context changed in Swift threads with the future.

I even tried a couple of URLSession sync solutions out there with no luck, application freeze.

  1. onProduce method invoked
  2. Invoke an async request to the server inside onProduce
func onProduce(_ transport: Transport!, kind: String!, rtpParameters: String!, appData: String!, callback: ((String?) -> Void)!) {
     self.parent.onLocalTransportProduce(transport: transport, kind: kind, rtpParameters: rtpParameters, appData: appData, completionHandler: {(res: JSON) in
      let producerId = res["id"].stringValue
      callback(producerId) **// callback passed in completion handler will freeze the application**
    })
    // callback(producerId) **// callback invoked in onProduce context and application will not freeze**
  }

I tried the solution with semaphore like below but the problem remains the same as soon as you pass the callback to another delegate function application will freeze forever. Also, the semaphore will make the application super slow while invoking onConsume and onProduce.

func onProduce(_ transport: Transport!, kind: String!, rtpParameters: String!, appData: String!, callback: ((String?) -> Void)!) {
    print("SendTransport::onProduce " + kind + " rtpParameters = " + rtpParameters)
    let semaphore: DispatchSemaphore = DispatchSemaphore.init(value: 0)
    let queue: DispatchQueue = DispatchQueue.global()
    queue.async {
      self.parent.onLocalTransportProduce(transport: transport, kind: kind, rtpParameters: rtpParameters, appData: appData, completionHandler: {(res: JSON) in
        let producerId = res["id"].stringValue
        callback(producerId)
        print("Completion onLocalTransportProduce")
        semaphore.signal()
      })
    }

    // callback("**") **// again...uncomment the callback invocation here and everything will work fine, but application with semaphore will be super slow**
    _ = semaphore.wait(timeout: .now() + 100.0)
  }

Not passing the callback anywhere or invoked in method context results in the obvious error below.

[ERROR] transport_wrapper::+[TransportWrapper nativeProduce:listener:track:encodings:codecOptions:appData:]() | The associated promise has been destructed prior to the associated state becoming ready.
2020-07-15 03:54:34.122181-0400 Phone[2934:1160855] *** Terminating app due to uncaught exception 'RuntimeException', reason: 'The associated promise has been destructed prior to the associated state becoming ready.'
*** First throw call stack:
(0x1bfad5794 0x1bf7f7bcc 0x1034feefc 0x10237a2a0 0x102379b70 0x1023795f0 0x10238336c 0x102377a10 0x1e9745bec 0x1e9745dd0 0x1e97cb728 0x1e97cbca4 0x10244ce24 0x102511f7c 0x1024457c0 0x102ca6338 0x102ca7730 0x102cb5710 0x1bfa537fc 0x1bfa4e6d0 0x1bfa4dce8 0x1c9b9838c 0x1c3b7c444 0x10236af10 0x1bf8d58f0)
libc++abi.dylib: terminating with uncaught exception of type NSException

I'm out of ideas but keep researching how we can make this work, open to ideas :)

ethand91 commented 4 years ago

background thread is created and callback future is passed in another delegate

Ok, now I understand. Thanks for the explanation.

I'm assuming the below also fails?

func onProduce(_ transport: Transport!, kind: String!, rtpParameters: String!, appData: String!, callback: ((String?) -> Void)!) {
     self.parent.onLocalTransportProduce(transport: transport, kind: kind, rtpParameters: rtpParameters, appData: appData, completionHandler: {(res: JSON) in
      let producerId = res["id"].stringValue

      // FAIL?
      DispatchQueue.main.async {
        callback(producerId) **// callback passed in completion handler will freeze the application**
      }
    })
    // callback(producerId) **// callback invoked in onProduce context and application will not freeze**
  }

The sample project I created uses sockets, but in order to debug this I'll create another sample using REST.

xplatsolutions commented 4 years ago

Yes, tried that before, and just now again, it freezes the application.

OK, let me know if you need any help, I'll be happy to assist you. I'd suggest just literally use this sample media server. You join using web socket but then you have a REST API to work with.

I can provide assistance with code for this if you have any problems.

xplatsolutions commented 4 years ago

To add more context although I don't think it matters I am using SwiftUI and pass a model around to set the RTCVideoTrack when created. But that seems to be fine since local Transport.Produce is completed fine.

NullIsOne commented 4 years ago

Same issue for me. Freezing after onProduce.

I'm using Socket.IO wrapped in RxSwift for async callbacks.

tspecht commented 3 years ago

Has anyone had success working around this issue?

darnfish commented 3 years ago

Still facing this, any luck?

darnfish commented 3 years ago

Found a solution using Alamofire! Using request.response(queue: someQueue) on the Alamofire Request body lets you run on a thread that works :~)

Here's my full implementation, you'll need to change around a couple things to work with your Mediasoup server / setup, but this is a good place to start if you're facing this issue:

func handleOnProduce(_ transport: Transport!, kind: String!, rtpParameters: String!, appData: String!) -> String? {
  guard let transport = transport else {
    print("transport not found")

    return nil
  }

  guard let transportId = transport.getId() else {
    print("transport id not found")

    return nil
  }

  guard let rtpParameters = rtpParameters else {
    print("rtp parameters not found")

    return nil
  }

  guard let rtp = parseStringToDict(rtpParameters) else {
    print("error parsing rtp para,eters")

    return nil
  }

  guard let kind = kind else {
    print("kind not found")

    return nil
  }

  let queue: DispatchQueue = .init(label: "produce", qos: .userInitiated)
  let globalQueue: DispatchQueue = .global()

  let semaphore: DispatchSemaphore = .init(value: 0)

  let parameters: [String: Any] = ["rtp": rtp, "kind": kind, "transportId": transportId]
  var producerId: String?

  globalQueue.async {
    AF.request("http://192.168.1.132:4321/transport/produce", method: .post, parameters: parameters, encoding: JSONEncoding.default).response(queue: queue) { response in
      guard let data = response.data else {
        print("response data is sus")

        return
      }

      guard let producerIdStr = String.init(data: data, encoding: .utf8) else {
        print("error serializing string")

        return
      }

      producerId = producerIdStr

      semaphore.signal()
    }
  }

  _ = semaphore.wait(timeout: .now() + 3)

  guard let producerId = producerId else {
    print("producer id not found")

    return nil
  }

  return producerId
}

func onProduce(_ transport: Transport!, kind: String!, rtpParameters: String!, appData: String!, callback: ((String?) -> Void)!) {
  print("SendTransportHandler::onProduce")

  guard let producerId = handleOnProduce(transport, kind: kind, rtpParameters: rtpParameters, appData: appData) else {
    callback(nil)

    return
  }

  callback(producerId)
}