pion / webrtc

Pure Go implementation of the WebRTC API
https://pion.ly
MIT License
13.43k stars 1.63k forks source link

Cannot recieve Audio #2895

Open fear-the-reaper opened 2 weeks ago

fear-the-reaper commented 2 weeks ago

Your environment.

What did you do?

I'm trying to get a basic example running using pion + gofiber. Where I just want to receive the audio from my browser and echo it back. Here's the go code:

package webrtc

import (
    "encoding/json"
    "log"

    "github.com/gofiber/contrib/websocket"
    "github.com/pion/webrtc/v3"
)

type Client struct {
    Id         string
    Conn       *websocket.Conn
    PeerConn   *webrtc.PeerConnection
    AudioTrack *webrtc.TrackLocalStaticRTP
}

type SDPType struct {
    Type string `json:"type"`
    Sdp  string `json:"sdp"`
    // Add other fields here if necessary
}

type Message struct {
    Type string                  `json:"type"`
    SDP  SDPType                 `json:"sdp,omitempty"`
    ICE  webrtc.ICECandidateInit `json:"ice,omitempty"`
}

type IceCandidateResponse struct {
    Type string `json:"type"`
    ICE  string `json:"ice"`
}

type AnswerResponse struct {
    Type string `json:"type"`
    SDP  string `json:"sdp"`
}

var Peers = make(map[string]*Client)

func Signaling(c *websocket.Conn) {

    m := &webrtc.MediaEngine{}
    if err := m.RegisterCodec(webrtc.RTPCodecParameters{
        RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil},
    }, webrtc.RTPCodecTypeAudio); err != nil {
        panic(err)
    }

    log.Println("New WebSocket connection established")

    config := webrtc.Configuration{
        ICEServers: []webrtc.ICEServer{
            {
                URLs: []string{"stun:stun.l.google.com:19302"},
            },
        },
    }

    peerConnection, err := webrtc.NewAPI(webrtc.WithMediaEngine(m)).NewPeerConnection(config)
    if err != nil {
        log.Println("Error creating peer connection:", err)
        return
    }

    if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil {
        panic(err)
    }

    // Create a new track for sending audio back
    audioTrack, _ := webrtc.NewTrackLocalStaticRTP(
        webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus},
        "audio",
        "pion",
    )

    peerConnection.AddTrack(audioTrack)
    defer peerConnection.Close()

    peerConnection.OnICECandidate(func(candidate *webrtc.ICECandidate) {
        if candidate == nil {
            log.Println("ICE Gathering complete")
            return
        }
        log.Println("New ICE candidate:", candidate.String())

        candidateJSON, err := json.Marshal(candidate.ToJSON())
        if err != nil {
            log.Println("Error marshalling ICE candidate:", err)
            return
        }

        iceCandidateMsg := IceCandidateResponse{
            Type: "ice",
            ICE:  string(candidateJSON),
        }
        if err := c.WriteJSON(iceCandidateMsg); err != nil {
            log.Println("Error sending ICE candidate:", err)
        }
    })

    peerConnection.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
        log.Println("Received track:", track.ID(), "of kind:", track.Kind())

        if track.Kind() == webrtc.RTPCodecTypeAudio {
            log.Println("Received audio track")

            // Send the received audio back to the client
            go func() {
                buf := make([]byte, 1400)
                for {
                    n, _, err := track.Read(buf)
                    if err != nil {
                        log.Println(err)
                        log.Println("Error reading track data:", err)
                        return
                    }

                    // Write the audio data back to the outgoing track
                    if _, err := audioTrack.Write(buf[:n]); err != nil {
                        log.Println("Error writing to audio track:", err)
                        return
                    }
                }
            }()
        }

        // Here you would typically handle the incoming audio data
        // For example, you could forward it to other peers or process it
    })

    peerConnection.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
        log.Println("ICE Connection State changed:", state.String())
    })

    peerConnection.OnSignalingStateChange(func(state webrtc.SignalingState) {
        log.Println("Signaling State changed:", state.String())
    })

    peerConnection.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) {
        log.Println("Connection State changed:", pcs.String())
    })

    for {
        var msg Message
        if err := c.ReadJSON(&msg); err != nil {
            log.Println("Error reading message:", err)
            return
        }
        log.Printf("Received message of type: %s", msg.Type)

        switch msg.Type {
        case "offer":
            log.Println("Received offer")
            offer := webrtc.SessionDescription{
                Type: webrtc.SDPTypeOffer,
                SDP:  msg.SDP.Sdp,
            }
            if err := peerConnection.SetRemoteDescription(offer); err != nil {
                log.Println("Error setting remote description:", err)
                continue
            }
            log.Println("Set remote description successfully")

            answer, err := peerConnection.CreateAnswer(nil)
            if err != nil {
                log.Println("Error creating answer:", err)
                continue
            }
            log.Println("Created answer")

            if err := peerConnection.SetLocalDescription(answer); err != nil {
                log.Println("Error setting local description:", err)
                continue
            }
            log.Println("Set local description successfully")

            answerMsg := AnswerResponse{
                Type: "answer",
                SDP:  answer.SDP,
            }
            if err := c.WriteJSON(answerMsg); err != nil {
                log.Println("Error sending answer:", err)
            } else {
                log.Println("Sent answer to client")
            }

        case "ice":
            log.Println("Received ICE candidate")
            if err := peerConnection.AddICECandidate(msg.ICE); err != nil {
                log.Println("Error adding ICE candidate:", err)
            } else {
                log.Println("Added ICE candidate successfully")
            }
        }
    }
}

Here's the client js

const startButton = document.getElementById('startButton');
let pc;
let ws;
startButton.onclick = start;

function start() {
    try {
        startButton.disabled = true;

        ws = new WebSocket('ws://localhost:3000/ws');

        ws.onopen = () => {
            console.log("WebSocket connection established");
            setupPeerConnection().then(() => {
                console.log("Peer connection setup completed");
            }).catch(error => {
                console.error("Error in setupPeerConnection:", error);
                startButton.disabled = false;
            });
        };

        ws.onmessage = handleWebSocketMessage;
        ws.onerror = (error) => {
            console.error("WebSocket error:", error);
        };
        ws.onclose = () => {
            console.log("WebSocket connection closed");
        };
    } catch (error) {
        console.error("Error in start function:", error);
        startButton.disabled = false;
    }
}

function setupPeerConnection() {
    return navigator.mediaDevices.getUserMedia({ audio: true, video: false })
        .then(stream => {
            console.log("Got user media stream:", stream);

            const configuration = {
                iceServers: [{ urls: 'stun:stun.l.google.com:19302' }]
            };
            pc = new RTCPeerConnection(configuration);
            console.log("Created RTCPeerConnection");

            stream.getTracks().forEach(track => {
                console.log("Adding track to peer connection:", track);
                pc.addTrack(track, stream);
            });

            pc.onicecandidate = event => {
                console.log("ICE candidate event:", event.candidate);
                if (event.candidate) {
                    ws.send(JSON.stringify({
                        type: 'ice',
                        ice: event.candidate
                    }));
                }
            };

            pc.oniceconnectionstatechange = () => {
                console.log("ICE connection state:", pc.iceConnectionState);
            };

            pc.onicegatheringstatechange = () => {
                console.log("ICE gathering state:", pc.iceGatheringState);
            };

            pc.onsignalingstatechange = () => {
                console.log("Signaling state:", pc.signalingState);
            };

            pc.ontrack = event => {
                console.log("Received remote track:", event.track);
                const audio = new Audio();
                audio.srcObject = event.streams[0];
                audio.onloadedmetadata = () => {
                    console.log("Audio metadata loaded, attempting to play...");
                    audio.play()
                        .then(() => console.log("Remote audio playback started"))
                        .catch(e => console.error("Error playing remote audio:", e));
                };
            };

            return pc.createOffer();
        })
        .then(offer => {
            console.log("Created offer:", offer);
            return pc.setLocalDescription(offer);
        })
        .then(() => {
            console.log("Set local description");
            ws.send(JSON.stringify({ type: 'offer', sdp: pc.localDescription }));
            console.log("Sent offer to server");
        })
        .catch(error => {
            console.error("Error in setupPeerConnection:", error);
            throw error; // Rethrow the error so it can be caught in the start function
        });
}

function handleWebSocketMessage(event) {
    console.log("Received WebSocket message:", event.data);
    const message = JSON.parse(event.data);
    switch (message.type) {
        case 'answer':
            console.log("Received answer from server:", message.sdp);
            const answerDescription = new RTCSessionDescription({
                type: 'answer',
                sdp: message.sdp
            });
            pc.setRemoteDescription(answerDescription)
                .then(() => console.log("Set remote description successfully"))
                .catch(error => console.error("Error setting remote description:", error));
            break;
        case 'ice':
            console.log("Received ICE candidate from server:", message.ice);
            const candidate = JSON.parse(message.ice);
            pc.addIceCandidate(new RTCIceCandidate(candidate))
                .then(() => console.log("Added ICE candidate successfully"))
                .catch(error => console.error("Error adding ICE candidate:", error));
            break;
        default:
            console.warn("Unhandled message type:", message.type);
    }
}

What did you expect?

I expected that I would receive the audio but that was not the case

What happened?

All the connections happen i.e it gathers all the ICE candidates, accepts the offer and answer but the OnTrack method is only called once. I read in this issue that it is supposed to be called once but now my question is how do I invoke it again?

Thank you for helping me out! Sorry for the noob question.

fear-the-reaper commented 1 week ago

Okay so with a help of a friend I made a basic echo webRTC that works

package webrtc

import (
    "encoding/json"
    "fmt"
    "log"
    "sync"
    "time"

    "github.com/gofiber/contrib/websocket"
    "github.com/pion/rtcp"
    "github.com/pion/webrtc/v3"
)

type Message struct {
    Event string `json:"event"`
    Data  string `json:"data"`
}

type Client struct {
    Id         string
    Conn       *websocket.Conn
    Pc         *webrtc.PeerConnection
    AudioTrack *webrtc.TrackLocalStaticRTP
}

var (
    clients      = make(map[string]*Client)
    clientsMutex sync.Mutex
)

func Signaling(conn *websocket.Conn) {

    clientID := fmt.Sprintf("client-%d", time.Now().UnixNano())
    client := &Client{
        Id:   clientID,
        Conn: conn,
    }

    clientsMutex.Lock()
    clients[clientID] = client
    clientsMutex.Unlock()

    defer func() {
        clientsMutex.Lock()
        delete(clients, clientID)
        clientsMutex.Unlock()
    }()

    for {
        var msg Message
        err := conn.ReadJSON(&msg)
        if err != nil {
            log.Println("Error reading message:", err)
            return
        }

        switch msg.Event {
        case "offer":
            handleOffer(client, msg.Data)
        case "candidate":
            handleIceCandidate(client, msg.Data)
        }

    }

}

// handles the offer from the client!
func handleOffer(client *Client, offerString string) {
    // Create a new RTCPeerConnection
    config := webrtc.Configuration{
        ICEServers: []webrtc.ICEServer{
            {
                URLs: []string{"stun:stun.l.google.com:19302"},
            },
        },
    }

    pc, err := webrtc.NewPeerConnection(config)
    if err != nil {
        log.Println(err)
        return
    }

    client.Pc = pc

    // Create an audio track
    audioTrack, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: "audio/opus"}, "audio", "pion")
    if err != nil {
        log.Println(err)
        return
    }

    client.AudioTrack = audioTrack

    _, err = pc.AddTrack(audioTrack)
    if err != nil {
        log.Println(err)
        return
    }

    // Sending an ICE candidate to the client
    pc.OnICECandidate(func(c *webrtc.ICECandidate) {
        if c == nil {
            return
        }

        candidateString, err := json.Marshal(c.ToJSON())
        if err != nil {
            log.Println(err)
            return
        }

        client.Conn.WriteJSON(Message{
            Event: "candidate",
            Data:  string(candidateString),
        })
    })

    // Checking the connection state at each step
    pc.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
        log.Printf("Client %s Peer Connection State has changed: %s\n", client.Id, s.String())
        if s == webrtc.PeerConnectionStateFailed {
            if err := pc.Close(); err != nil {
                log.Println(err)
            }
        }
    })

    // Recveive the remote track from the client!
    pc.OnTrack(func(remoteTrack *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
        log.Printf("Client %s new track: %d\n", client.Id, remoteTrack.SSRC())

        go func() {
            ticker := time.NewTicker(time.Second * 3)
            for range ticker.C {
                errSend := pc.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(remoteTrack.SSRC())}})
                if errSend != nil {
                    log.Println(errSend)
                }
            }
        }()

        for {
            rtp, _, readErr := remoteTrack.ReadRTP()
            if readErr != nil {
                log.Println(readErr)
                return
            }

            clientsMutex.Lock()
            for _, c := range clients {
                if writeErr := c.AudioTrack.WriteRTP(rtp); writeErr != nil {
                    log.Printf("Error writing RTP to client: %v\n", writeErr)
                }
            }
            clientsMutex.Unlock()
        }
    })

    // Set the remote SessionDescription
    offer := webrtc.SessionDescription{}
    if err := json.Unmarshal([]byte(offerString), &offer); err != nil {
        log.Println(err)
        return
    }

    if err := pc.SetRemoteDescription(offer); err != nil {
        log.Println(err)
        return
    }

    // Create answer
    answer, err := pc.CreateAnswer(nil)
    if err != nil {
        log.Println(err)
        return
    }

    // Set the local SessionDescription
    if err := pc.SetLocalDescription(answer); err != nil {
        log.Println(err)
        return
    }

    // Send the answer
    answerString, err := json.Marshal(answer)
    if err != nil {
        log.Println(err)
        return
    }

    client.Conn.WriteJSON(Message{
        Event: "answer",
        Data:  string(answerString),
    })

}

// handles adding the ice candidate from the client!
func handleIceCandidate(client *Client, candidateString string) {
    var candidate webrtc.ICECandidateInit
    if err := json.Unmarshal([]byte(candidateString), &candidate); err != nil {
        log.Println(err)
        return
    }

    if err := client.Pc.AddICECandidate(candidate); err != nil {
        log.Println(err)
    }
}

But I still don't know what I did wrong in my previous implementation other than making the client map since I was testing this out locally and just a single connection. lmk if anyone can help!