yapingcat / gomedia

golang library for rtmp, mpeg-ts,mpeg-ps,flv,mp4,ogg,rtsp
MIT License
402 stars 69 forks source link

Rtmp Client Fallback Stream #159

Open hasnhasan opened 1 week ago

hasnhasan commented 1 week ago

I am sending the incoming broadcast to the RTMP Server to another client. If the incoming broadcast to the Server is interrupted, I send the fallback video until the broadcast comes back. I get the following error.

[h264 @ 0x7f79a20414c0] cabac_init_idc 32 overflow [h264 @ 0x7f79a20414c0] decode_slice_header error [h264 @ 0x7f79a20414c0] no frame! [h264 @ 0x7f79b2046c80] co located POCs unavailable [NULL @ 0x7f79617045c0] illegal reordering_of_pic_nums_idc 4 [h264 @ 0x7f7982219d40] illegal modification_of_pic_nums_idc 4 [h264 @ 0x7f7982219d40] decode_slice_header error [h264 @ 0x7f7982219d40] no frame!

package main

import (
    "fmt"
    "io"
    "log"
    "net"
    "net/url"
    "os"
    "sync"

    "github.com/yapingcat/gomedia/go-codec"
    "github.com/yapingcat/gomedia/go-flv"
    "github.com/yapingcat/gomedia/go-rtmp"
)

type RTMPRelay struct {
    conn net.Conn
    server     *rtmp.RtmpServerHandle
    client     *rtmp.RtmpClient
    fallbackFLV string
    mutex      sync.Mutex
    isFallback bool
    lastPts, lastDts uint32
}

type Broadcast map[string]*RTMPRelay

var Broadcasts Broadcast = make(map[string]*RTMPRelay)
var mtx sync.Mutex

func (b *Broadcast) Find(name string) *RTMPRelay {
    mtx.Lock()
    defer mtx.Unlock()
    if p, found := (*b)[name]; found {
        return p
    } else {
        return nil
    }
}

func NewRtmpClient(rtmpUrl string) *rtmp.RtmpClient {
    u, err := url.Parse(rtmpUrl)
    if err != nil {
        panic(err)
    }
    host := u.Host
    if u.Port() == "" {
        host += ":1935"
    }
    clientConn, err := net.Dial("tcp4", host)
    if err != nil {
        fmt.Println("connect failed", err)
    }

    client := rtmp.NewRtmpClient(rtmp.WithComplexHandshake(), rtmp.WithEnablePublish())
    client.SetOutput(func(data []byte) error {
        _,err := clientConn.Write(data)
        return err
    })

    client.Start(rtmpUrl)
    go func ()  {
        buf := make([]byte, 4096)
        n := 0
        for err == nil {
            n, err = clientConn.Read(buf)
            if err != nil {
                continue
            }
            client.Input(buf[:n])
        }
    }()
    return client
}

func NewRTMPRelay(conn net.Conn) (*RTMPRelay, error) {
    relay := &RTMPRelay{}

    handle := rtmp.NewRtmpServerHandle()
    handle.OnPublish(func(app, streamName string) rtmp.StatusCode {
        relay = Broadcasts.Find(streamName)
        if relay == nil {
            relay = &RTMPRelay{
                conn: conn,
                server: handle,
                client: NewRtmpClient("rtmp://127.0.0.1/live/SyXTJSpx1e"),
                isFallback: false,
                fallbackFLV: "videos/test.flv",
                lastPts: uint32(0),
                lastDts: uint32(0),
            }
        } else {
            relay.stopFallback()
            relay.conn = conn
            relay.server = handle
        }
        Broadcasts[streamName] = relay
        return rtmp.NETSTREAM_PUBLISH_START
    })

    handle.SetOutput(func(b []byte) error {
            _, err := conn.Write(b)
            return err
    })

    handle.OnFrame(func(cid codec.CodecID, pts, dts uint32, frame []byte) {
        if cid == codec.CODECID_VIDEO_H264 || cid == codec.CODECID_VIDEO_H265 {
            relay.lastDts = dts
            relay.lastPts = pts
        }
        relay.client.WriteFrame(cid,frame,pts,dts)
    })

    go func ()  {
        buf := make([]byte, 4096)
        for {
            n, err := conn.Read(buf)
            if err == io.EOF {
                log.Println("Broadcast interrupted, switching to fallback")
                relay.startFallback()
                return
            }
            err = handle.Input(buf[0:n])
            if err != nil {
                log.Printf("Package reading error: %v", err)
                break
            }
        }
        conn.Close()
    }()

    return relay, nil
}

func (r *RTMPRelay) startFallback() {
    r.mutex.Lock()
    r.isFallback = true
    r.mutex.Unlock()

    f := flv.CreateFlvReader()

    _lastPts := uint32(0)
    _lastDts := uint32(0)

    f.OnFrame = func(cid codec.CodecID, frame []byte, pts, dts uint32) {
        if !r.isFallback {
            return
        }

        if cid == codec.CODECID_VIDEO_H264 || cid == codec.CODECID_VIDEO_H265 {
            pts += r.lastPts
            dts += r.lastDts
            _lastPts = pts
            _lastDts = dts
        }

        r.client.WriteFrame(cid,frame,pts,dts)
    }

    fd, _ := os.Open(r.fallbackFLV)
    defer fd.Close()
    cache := make([]byte, 4096)
    for {
        if !r.isFallback {
            return
        }
        n, err := fd.Read(cache)
        if err != nil {
            fmt.Println(err)
            break
        }
        f.Input(cache[0:n])
        //err = r.server.Input(cache[0:n])
    }
    r.mutex.Lock()
    r.lastDts = _lastDts
    r.lastPts = _lastPts
    r.mutex.Unlock()
}

func (r *RTMPRelay) stopFallback() {
    r.mutex.Lock()
    r.isFallback = false
    r.mutex.Unlock()
}

func main() {
    listen, _ := net.Listen("tcp4", ":1936")
    for {
        conn, _ := listen.Accept()
        go NewRTMPRelay(conn)
    }
}
hasnhasan commented 1 week ago

@yapingcat I think you need to change “picture numbers” in f.OnFrame. I would be very grateful if you could help me with this 🙏