Closed ivanjaros closed 4 years ago
I have created this splitter. it works, kinda. it will split the stream but only some files are playable. i think the issue is that it is not being split when GOP is done so the segment without key frame is unplayable. i have tried to use the FixmeFilter but I got the same result.
package main
import (
"bytes"
"errors"
"github.com/nareix/joy4/av"
"github.com/nareix/joy4/format/ts"
"io"
"sync"
"time"
)
// muxer that splits the continuous stream into individual TS segments
type StreamSplitter struct {
src av.Demuxer
headers []av.CodecData
name string
seq uint
files chan StreamFile
muxer *ts.Muxer
file *StreamFile
lock sync.Mutex
ticker *time.Ticker
done chan struct{}
}
// name identifies the video stream id(user account id or stream id if one account can run multiple streams)
func NewSplitter(name string) *StreamSplitter {
return &StreamSplitter{
name: name,
files: make(chan StreamFile, 1),
muxer: ts.NewMuxer(nil), // nil since we do not have active "file", yet
done: make(chan struct{}, 1),
}
}
func (ss *StreamSplitter) WritePacket(p av.Packet) error {
ss.lock.Lock()
defer ss.lock.Unlock()
if err := ss.ensureFile(); err != nil {
return err
}
return ss.muxer.WritePacket(p)
}
// caller has to handle lock on its own
func (ss *StreamSplitter) ensureFile() error {
if ss.file == nil {
ss.seq++
ss.file = &StreamFile{
Sequence: ss.seq,
Name: ss.name,
Data: bytes.NewBuffer(nil),
}
ss.muxer.SetWriter(ss.file.Data)
if err := ss.muxer.WriteHeader(ss.headers); err != nil {
return err
}
}
return nil
}
// caller has to handle lock on its own
func (ss *StreamSplitter) closeFile() {
if ss.file == nil {
return
}
ss.muxer.WriteTrailer()
select {
case ss.files <- *ss.file:
default:
}
ss.file = nil
}
func (ss *StreamSplitter) Files() <-chan StreamFile {
return ss.files
}
func (ss *StreamSplitter) Listen(src av.Demuxer) error {
if ss.src != nil {
return errors.New("already listening")
}
// streams are essentially the headers identifying a/v/data(?) stream type
// don't know why it is mislabeled as "streams"
streams, err := src.Streams()
if err != nil {
return err
}
ss.src = src
ss.headers = streams
ss.ticker = time.NewTicker(time.Second)
go ss.run()
return nil
}
func (ss *StreamSplitter) rotate() {
ss.lock.Lock()
defer ss.lock.Unlock()
ss.closeFile()
ss.ensureFile()
}
func (ss *StreamSplitter) run() {
for {
select {
case <-ss.Done():
return
case <-ss.ticker.C:
ss.rotate()
default:
packet, err := ss.src.ReadPacket()
if err == nil {
ss.WritePacket(packet)
continue
}
if err == io.EOF {
ss.stop()
return
}
// if we are here, log the unknown error
}
}
}
func (ss *StreamSplitter) stop() {
ss.lock.Lock()
defer ss.lock.Unlock()
ss.ticker.Stop()
ss.closeFile()
close(ss.files)
close(ss.done)
}
func (ss *StreamSplitter) Done() <-chan struct{} {
return ss.done
}
type StreamFile struct {
Sequence uint
Name string
Data *bytes.Buffer
}
package main
import (
"fmt"
"github.com/nareix/joy4/format"
"github.com/nareix/joy4/format/rtmp"
"os"
"path/filepath"
"sync"
)
func init() {
format.RegisterAll()
}
func main() {
server := &rtmp.Server{}
wg := new(sync.WaitGroup)
server.HandlePublish = func(conn *rtmp.Conn) {
splitter := NewSplitter(filepath.Base(conn.URL.RequestURI()))
splitter.Listen(conn)
wg.Add(1)
go func() {
defer wg.Done()
for {
data, ok := <-splitter.Files()
if ok == false {
return
}
f, err := os.OpenFile(fmt.Sprintf("%s-%010d.ts", data.Name, data.Sequence), os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
fmt.Println(err)
}
if _, err := data.Data.WriteTo(f); err != nil {
fmt.Println(err)
}
if err := f.Close(); err != nil {
fmt.Println(err)
}
}
}()
}
server.ListenAndServe()
// wait for all files to be written into storage
wg.Wait()
}
Are you trying to make hls ?
Are you trying to make hls ?
i want to chunk the stream into 1 second segments and push them to cdn where they can be consumed by clients as hls or dash, later on.
Interesting idea, but why you need cdn ? easy way to make some cache in you app and store only few segments.
Interesting idea, but why you need cdn ? easy way to make some cache in you app and store only few segments.
i am working on a video streaming platform as a pet project. something like dlive.
i can help with some features if you want.
ok, si I have figured it out. instead of using ticker, i just check if the packet is key frame and if it, then i will close the active file and open up a new one, making a new ts chunk. so each ts video file is one gop. i am not sure what dictates this but it equals to 2 seconds per chunk with the last one having the remaining data. so it is not on 1 second basis, as i wanted, but this works for me just fine.
i am using obs so maybe the 2 seconds per gop is most likely due to the way obs is sending out data. maybe changing bitrate or encoding settings witl change this result. anyway, so far i am fine with this.
I would like to chunk the incoming stream into 1 second long TS files, so I can then publish them as HLS playlist for clients to consume over HTTP. What would be the correct way to do this? The avutil.CopyFile will just process the entire stream and CopyPackets will be missing the header and trailer so I am not exactly sure how to approach this.
PS: thanks for this library. I was looking for a well written RTMP server and I found only todostreaming/rtmp which lacks any kind of frame/packet handling. I have looked at the joy5 but that one seems quite unfamiliar to joy4 so I am sticking with this version for now.
This is my testing code:
I tried splitting the packets in various ways but it always failed.