pion / webrtc

Pure Go implementation of the WebRTC API
https://pion.ly
MIT License
13.33k stars 1.62k forks source link

Audio/Video configuration issues detected when streaming to Twitch WebRTC #2498

Closed onthegit closed 1 year ago

onthegit commented 1 year ago

When I try the following code to stream to twitch via webrtc the stream appears on twitch inspector and after 5-10 seconds it fails with an error (on the inpsector twitch website ) . Can someone help?

Audio/Video configuration issues detected. 

Code:

package main

import (
    "bytes"
    "errors"
    "fmt"
    "io"
    "log"
    "math/rand"
    "net/http"
    "net/url"
    "os"
    "os/signal"
    "syscall"

    lksdk "github.com/livekit/server-sdk-go"
    "github.com/pion/interceptor"
    "github.com/pion/interceptor/pkg/intervalpli"
    "github.com/pion/webrtc/v3"
)

const API_KEY string = "devkey"
const API_SECRET string = "secret"

var (
    host      = "ws://127.0.0.1:7880"
    apiKey    = API_KEY
    apiSecret = API_SECRET
    roomName  = "room"
    identity  = "identity2"
)

func main() {

    if host == "" || apiKey == "" || apiSecret == "" || roomName == "" || identity == "" {
        fmt.Println("invalid arguments.")
        return
    }

    //establish webrtc whip connection
    wh := New("https://g.webrtc.live-video.net:4443/v2/offer", "")

    err := wh.Connect()

    if err != nil {
        log.Fatal(err)
    }

    defer wh.Close()

    room, err := lksdk.ConnectToRoom(host, lksdk.ConnectInfo{
        APIKey:              apiKey,
        APISecret:           apiSecret,
        RoomName:            roomName,
        ParticipantIdentity: identity,
    }, &lksdk.RoomCallback{
        ParticipantCallback: lksdk.ParticipantCallback{
            OnTrackSubscribed: wh.onTrackSubscribed,
        },
    })
    if err != nil {
        panic(err)
    }

    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT)

    <-sigChan
    room.Disconnect()
}

func New(endpoint string, token string) *whip {
    wh := &whip{
        endpoint: endpoint,
        token:    token,
    }

    return wh
}

type whip struct {
    pc *webrtc.PeerConnection
    //resourceURL from the whip endpoint used to delete it later
    resourceURL string
    //whip endpoint
    endpoint string
    token    string
}

func (w *whip) setupWhip() error {
    log.Println(w.pc.LocalDescription().SDP)

    var sdp = []byte(w.pc.LocalDescription().SDP)

    //TODO: change
    client := &http.Client{}

    req, err := http.NewRequest("POST", w.endpoint, bytes.NewBuffer(sdp))
    if err != nil {
        return errors.New("Unexpected error building http request. " + err.Error())
    }

    req.Header.Add("Content-Type", "application/sdp")
    if w.token != "" {
        req.Header.Add("Authorization", "Bearer "+w.token)
    }

    resp, err := client.Do(req)
    if err != nil {
        return errors.New("Failed http POST request. " + err.Error())
    }

    defer resp.Body.Close()

    body, err := io.ReadAll(resp.Body)

    if err != nil {
        return errors.New("Failed http POST request. " + err.Error())
    }
    // log.Println(string(body))

    if resp.StatusCode != 201 {
        return errors.New(fmt.Sprintf("Non Successful POST: %d", resp.StatusCode) + string(body))
    }

    resourceUrl, err := url.Parse(resp.Header.Get("Location"))
    if err != nil {
        return errors.New("Failed to parse resource url. " + err.Error())
    }

    base, err := url.Parse(w.endpoint)
    if err != nil {
        return errors.New("Failed to parse base url. " + err.Error())
    }

    w.resourceURL = base.ResolveReference(resourceUrl).String()

    answer := webrtc.SessionDescription{}
    answer.Type = webrtc.SDPTypeAnswer
    answer.SDP = string(body)

    err = w.pc.SetRemoteDescription(answer)
    if err != nil {
        return errors.New("PeerConnection could not SetRemoteDescription answer: " + err.Error())
    }

    return nil
}

// adds a track to currently established webrtc whip connection
func (w *whip) AddTrack(remoteTrack *webrtc.TrackRemote) error {
    if w.pc == nil {
        return errors.New("peer connection is nil")
    }

    // if (w.pc.ConnectionState()) != webrtc.PeerConnectionStateConnected {
    //  return errors.New("peer connection not connected")
    // }

    outputTrack, err := webrtc.NewTrackLocalStaticRTP(remoteTrack.Codec().RTPCodecCapability, remoteTrack.Kind().String(), "pion_q")

    if err != nil {
        return err
    }

    rtpSender, err := w.pc.AddTrack(outputTrack)
    if err != nil {
        return err
    }

    w.processRTCP(rtpSender)

    err = w.renegotiate()
    if err != nil {
        return err
    }

    go w.forwardTrack(remoteTrack, outputTrack)

    return nil
}

func (w *whip) processRTCP(rtpSender *webrtc.RTPSender) {
    if rtpSender != nil {
        // Read incoming RTCP packets
        // Before these packets are returned they are processed by interceptors. For things
        // like NACK this needs to be called.
        go func() {
            rtcpBuf := make([]byte, 1500)
            for {
                if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
                    return
                }
            }
        }()
    }
}

// https://github.com/pion/webrtc/blob/master/examples/play-from-disk-renegotation/main.go#L162
// incomingTrack is webrtc.TrackRemote because it comes from another webrtc connection (that is not whep/whip), livekit sends TrackRemote too.
func (w *whip) forwardTrack(incomingTrack *webrtc.TrackRemote, outputTrack *webrtc.TrackLocalStaticRTP) {

    for {
        rtp, _, err := incomingTrack.ReadRTP()
        if err != nil {
            log.Println(err.Error())
            return
        }

        if err := outputTrack.WriteRTP(rtp); err != nil {
            log.Println(err.Error())
            return
        }
        // fmt.Println("writing rtp", rtp.PayloadType)
    }
}

// renegotiate will sync local peerconnection state with the remote peer connection
func (w *whip) renegotiate() error {
    //renegotiate
    if w.pc == nil {
        return errors.New("peer connection is nil")
    }

    // if (w.pc.ConnectionState()) != webrtc.PeerConnectionStateConnected {
    //  return errors.New("peer connection not connected")
    // }

    offer, err := w.pc.CreateOffer(nil)

    if err != nil {
        return err
    }

    if err := w.pc.SetRemoteDescription(offer); err != nil {
        return err
    }

    // Create channel that is blocked until ICE Gathering is complete
    gatherComplete := webrtc.GatheringCompletePromise(w.pc)

    answer, err := w.pc.CreateAnswer(nil)
    if err != nil {
        return err
    }

    if err = w.pc.SetLocalDescription(answer); err != nil {
        return err
    }

    // Block until ICE Gathering is complete, disabling trickle ICE
    // we do this because we only can exchange one signaling message
    // in a production application you should exchange ICE Candidates via OnICECandidate
    <-gatherComplete
    return nil
}

// closes whip and then the peerconnection
func (w *whip) Close() error {
    req, err := http.NewRequest("DELETE", w.resourceURL, nil)
    if err != nil {
        return errors.New("Unexpected error building http request. " + err.Error())
    }
    req.Header.Add("Authorization", "Bearer "+w.token)

    client := &http.Client{}

    _, err = client.Do(req)
    if err != nil {
        return errors.New("Failed http DELETE request. " + err.Error())
    }

    return w.pc.Close()
}

// Cponnect will create the peer connection for the WHIP endpoint and setup and connect the webrtc connection for WHIP.
func (w *whip) Connect() error {

    if w.pc != nil {
        err := w.pc.Close()
        if err != nil {
            return err
        }
    }

    mediaEngine := webrtc.MediaEngine{}

    if err := mediaEngine.RegisterCodec(webrtc.RTPCodecParameters{
        RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 2, SDPFmtpLine: "minptime=10;useinbandfec=1", RTCPFeedback: nil},
        PayloadType:        111,
    }, webrtc.RTPCodecTypeAudio); err != nil {
        return err
    }

    //copied from mediaengine registerdefaultcodecs
    videoRTCPFeedback := []webrtc.RTCPFeedback{{"goog-remb", ""}, {"ccm", "fir"}, {"nack", ""}, {"nack", "pli"}}

    //VP8 not supported by twitch webrtc
    // if err := mediaEngine.RegisterCodec(webrtc.RTPCodecParameters{
    //  RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8, ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: videoRTCPFeedback},
    //  PayloadType:        96,
    // },
    //  webrtc.RTPCodecTypeVideo); err != nil {
    //  return err
    // }

    if err := mediaEngine.RegisterCodec(webrtc.RTPCodecParameters{
        RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, Channels: 0, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", RTCPFeedback: videoRTCPFeedback},
        PayloadType:        125,
    },
        webrtc.RTPCodecTypeVideo); err != nil {
        return err
    }

    // mediaEngine.RegisterDefaultCodecs()

    config := webrtc.Configuration{
        ICEServers: []webrtc.ICEServer{
            {
                URLs: []string{"stun:stun.l.google.com:19302"},
            },
        },
    }
    // settings := webrtc.SettingEngine{}
    // settings.SetNetworkTypes([]webrtc.NetworkType{webrtc.NetworkTypeUDP4})

    // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
    // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
    // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
    // for each PeerConnection.
    i := &interceptor.Registry{}

    // Use the default set of Interceptors
    if err := webrtc.RegisterDefaultInterceptors(&mediaEngine, i); err != nil {
        return err
    }

    // Register a intervalpli factory
    // This interceptor sends a PLI every 3 seconds. A PLI causes a video keyframe to be generated by the sender.
    // This makes our video seekable and more error resilent, but at a cost of lower picture quality and higher bitrates
    // A real world application should process incoming RTCP packets from viewers and forward them to senders
    intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
    if err != nil {
        return err
    }
    i.Add(intervalPliFactory)

    pc, err := webrtc.NewAPI(
        webrtc.WithMediaEngine(&mediaEngine),
        webrtc.WithInterceptorRegistry(i),
    ).NewPeerConnection(config)

    if err != nil {
        return errors.New("PeerConnection could not set local offer: " + err.Error())
    }

    _, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RtpTransceiverInit{
        Direction: webrtc.RTPTransceiverDirectionSendonly,
    })

    if err != nil {
        return errors.New("PeerConnection could not set local offer: " + err.Error())
    }

    _, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RtpTransceiverInit{
        Direction: webrtc.RTPTransceiverDirectionSendonly,
    })

    if err != nil {
        return errors.New("PeerConnection could not set local offer: " + err.Error())
    }

    offer, err := pc.CreateOffer(nil)
    if err != nil {
        return errors.New("PeerConnection could not create offer: " + err.Error())
    }

    err = pc.SetLocalDescription(offer)
    if err != nil {
        return errors.New("PeerConnection could not set local offer. " + err.Error())
    }

    // Block until ICE Gathering is complete, disabling trickle ICE
    // we do this because we only can exchange one signaling message
    // in a production application you should exchange ICE Candidates via OnICECandidate
    gatherComplete := webrtc.GatheringCompletePromise(pc)
    <-gatherComplete

    w.pc = pc

    return w.setupWhip()
}

func (w *whip) onTrackSubscribed(track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) {

    log.Println("onTrackSubscribed", track.Codec())

    err := w.AddTrack(track)
    if err != nil {
        log.Fatal(err.Error())
    }
}
Sean-Der commented 1 year ago

Sorry @onthegit I have been so busy.

Were you able to make this work? I have some time today to help if not.