nareix / joy4

Golang audio/video library and streaming server
MIT License
2.67k stars 499 forks source link

forward input stream to multiple servers(push ) #8

Open thesyncim opened 8 years ago

thesyncim commented 8 years ago

Hi again!, could you please point out what i'm doing wrong?

package main

import (
    "log"
    "strings"
    "sync"
    "time"

    "github.com/nareix/joy4/av/avutil"
    "github.com/nareix/joy4/av/pktque"
    "github.com/nareix/joy4/av/pubsub"
    "github.com/nareix/joy4/format"
    "github.com/nareix/joy4/format/rtmp"
)

func init() {
    format.RegisterAll()
}

var sm = map[string][]string{
    "azorestv": []string{"rtmp://opinion.azorestv.com:1936/proxy/one", "rtmp://opinion.azorestv.com:1936/proxy/two"},
}

func main() {
    proxyServer()
}

func proxyServer() {
    server := &rtmp.Server{}

    server.HandlePublish = func(conn *rtmp.Conn) {

        segs := strings.Split(conn.URL.Path, "/")
        key := segs[2]
        streamMap, ok := sm[key]
        if !ok {
            log.Println("invalid key")
            return
        }

        que := &pubsub.Queue{}
        que.Close()

        streams, err := conn.Streams()
        if err != nil {
            log.Println(err)
            return
        }

        que = pubsub.NewQueue(streams)
        que.SetMaxDuration(time.Second * 10)

        go func() {
            err := avutil.CopyPackets(que, conn)
            if err != nil {
                log.Println(err)
            }
        }()

        wait := &sync.WaitGroup{}

        for i := range streamMap {
            go func() {
                err := pushStream(que, streamMap[i], wait)
                if err != nil {
                    log.Println(err)
                }

            }()
        }
        wait.Wait()

    }

    server.ListenAndServe()

}

func pushStream(sub *pubsub.Queue, dsturl string, wait *sync.WaitGroup) error {

    defer wait.Done()
    wait.Add(1)

    origin := sub.Latest()
    filters := pktque.Filters{}

    filters = append(filters, &pktque.FixTime{StartFromZero: true, MakeIncrement: true})
    demuxer := &pktque.FilterDemuxer{
        Filter:  filters,
        Demuxer: origin,
    }

    dst, err := rtmp.Dial(dsturl)
    if err != nil {
        return err
    }
    defer dst.Close()

    return avutil.CopyFile(dst, demuxer)
}

thanks again!

nareix commented 8 years ago

there's a closure bug:

        for i := range streamMap {
            go func(i int) {
                err := pushStream(que, streamMap[i], wait)
                if err != nil {
                    log.Println(err)
                }

            }(i)
        }
nareix commented 8 years ago

and wait should start before go func():

        for i := range streamMap {
            wait.Add(1)
            go func(i int) {
                err := pushStream(que, streamMap[i], wait)
                if err != nil {
                    log.Println(err)
                }
            }(i)
        }
thesyncim commented 8 years ago

ups..., thanks a lot! this is very powerful !!

thesyncim commented 8 years ago

Using OBS studio sometimes i get a data race report with -race enabled

WARNING: DATA RACE Write at 0x00c420078670 by main goroutine: github.com/nareix/joy4/format/rtmp.(*Server).ListenAndServe() /home/thesyncim/gocode/src/github.com/nareix/joy4/format/rtmp/rtmp.go:107 +0x2b2 main.proxyServer() /home/thesyncim/gocode/src/github.com/thesyncim/exp/rtmp/proxy.go:79 +0xa7 main.main() /home/thesyncim/gocode/src/github.com/thesyncim/exp/rtmp/proxy.go:24 +0x2f

Previous write at 0x00c420078670 by goroutine 6: github.com/nareix/joy4/format/rtmp.(*Server).ListenAndServe.func1() /home/thesyncim/gocode/src/github.com/nareix/joy4/format/rtmp/rtmp.go:118 +0x81

Goroutine 6 (finished) created at: github.com/nareix/joy4/format/rtmp.(*Server).ListenAndServe() /home/thesyncim/gocode/src/github.com/nareix/joy4/format/rtmp/rtmp.go:122 +0xa6f main.proxyServer() /home/thesyncim/gocode/src/github.com/thesyncim/exp/rtmp/proxy.go:79 +0xa7 main.main()

/home/thesyncim/gocode/src/github.com/thesyncim/exp/rtmp/proxy.go:24 +0x2f

thesyncim commented 8 years ago

using the code from example above i'm unable to push a stream to youtube/srs server (only go-oryx can handle the stream).

wireshark file: push_youtube.pcapng.zip

nareix commented 8 years ago

thanks, I'll check it

nareix commented 8 years ago

fixed datarace https://github.com/nareix/joy4/commit/4918d07822bbeb50f2c1396078f1a88d6a5f8a95

thesyncim commented 8 years ago

you can replicate the issue with:

ffmpeg -re -i bbb_sunflower_1080p_60fps_normal.mp4 -c:v libx264 -b:v 2M -pix_fmt yuv420p -c:a:0 aac -strict -2 -b:a:0 480k -f flv rtmp://localhost/live/azorestv

package main

import (
    "log"
    "strings"
    "sync"

    "github.com/nareix/joy4/av/avutil"
    "github.com/nareix/joy4/av/pubsub"
    "github.com/nareix/joy4/format"
    "github.com/nareix/joy4/format/rtmp"

)

func init() {
    format.RegisterAll()
    log.SetFlags(log.Lshortfile|log.Ltime)
}

var sm = map[string][]string{
    "azorestv": []string{"rtmp://opinion.azorestv.com:1936/live/test"},
}

func main() {
    proxyServer()
}

func proxyServer() {
    server := &rtmp.Server{}
    //rtmp.Debug=true

    server.HandlePublish = func(conn *rtmp.Conn) {

        segs := strings.Split(conn.URL.Path, "/")

        key := segs[2]
        streamMap, ok := sm[key]
        if !ok {
            log.Println("invalid key")
            return
        }

        que := &pubsub.Queue{}

        streams, err := conn.Streams()
        if err != nil {
            log.Println(err)
            return
        }

        que = pubsub.NewQueue(streams)

        wait := &sync.WaitGroup{}

        for i := range streamMap {
            wait.Add(1)
            go func(i int,que *pubsub.Queue,wait *sync.WaitGroup) {
                err := pushStream(que, streamMap[i], wait)
                if err != nil {
                    log.Println(err)
                }
            }(i,que,wait)
        }

        go avutil.CopyPackets(que, conn)

        wait.Wait()
    }

    server.ListenAndServe()

}

func pushStream(sub *pubsub.Queue, dsturl string, wait *sync.WaitGroup) error {
    defer wait.Done()

    origin := sub.Latest()

    dst, err := rtmp.Dial(dsturl)
    if err != nil {
        return err
    }

    defer dst.Close()

    return avutil.CopyFile(dst, origin)
}

feel free to use this server(srs 3.0) to debug the issue thank you