Monibuca / engine

Monibuca 核心引擎,包含流媒体核心转发逻辑,需要配合功能插件一起组合运行
MIT License
937 stars 199 forks source link

megts 包的顺序可能是乱的,目前的解析存在问题 MpegTsStream.Feed #65

Closed mars79668 closed 1 year ago

mars79668 commented 1 year ago
diff --git a/codec/mpegts/mpegts.go b/codec/mpegts/mpegts.go
index 2b1b4e4..9f0298a 100644
--- a/codec/mpegts/mpegts.go
+++ b/codec/mpegts/mpegts.go
@@ -71,15 +71,15 @@ const (
    STREAM_TYPE_PRIVATE_DATA     = 0x06
    STREAM_TYPE_MHEG             = 0x07

-   STREAM_TYPE_H264  = 0x1B
-   STREAM_TYPE_H265  = 0x24
-   STREAM_TYPE_AAC   = 0x0F
-   STREAM_TYPE_G711A = 0x90
-   STREAM_TYPE_G711U = 0x91
+   STREAM_TYPE_H264   = 0x1B
+   STREAM_TYPE_H265   = 0x24
+   STREAM_TYPE_AAC    = 0x0F
+   STREAM_TYPE_G711A  = 0x90
+   STREAM_TYPE_G711U  = 0x91
    STREAM_TYPE_G722_1 = 0x92
    STREAM_TYPE_G723_1 = 0x93
-   STREAM_TYPE_G726  = 0x94
-   STREAM_TYPE_G729  = 0x99
+   STREAM_TYPE_G726   = 0x94
+   STREAM_TYPE_G729   = 0x99

    STREAM_TYPE_ADPCM = 0x11
    STREAM_TYPE_PCM   = 0x0A
@@ -543,50 +543,74 @@ func (s *MpegTsStream) ReadPMT(packet *MpegTsPacket, pr io.Reader) (err error) {
    }
    return
 }
+
+type streamStatus struct {
+   firstTsPkt *MpegTsPacket
+   frame      int64
+   tsPktArr   []MpegTsPacket
+}
+
 func (s *MpegTsStream) Feed(ts io.Reader, onStream func(MpegTsPmtStream), onPES func(MpegTsPESPacket)) error {
-   var frame int64
-   var tsPktArr []MpegTsPacket
+
+   streamsStatus := make(map[uint16]*streamStatus)
    for {
        packet, err := ReadTsPacket(ts)
        if err == io.EOF {
            // 文件结尾 把最后面的数据发出去
-           pesPkt, err := TsToPES(tsPktArr)
-           if err != nil {
-               return err
+           for _, ss := range streamsStatus {
+
+               if len(ss.tsPktArr) > 0 {
+                   pesPkt, err := TsToPES(ss.tsPktArr)
+                   if err != nil {
+                       return err
+                   }
+                   onPES(pesPkt)
+               }
            }
-           onPES(pesPkt)
            return nil
        }
        if err != nil {
            return err
        }
        pr := bytes.NewReader(packet.Payload)
-       err = s.ReadPAT(&packet, pr)
-       if err != nil {
-           return err
+
+       if PID_PAT == packet.Header.Pid {
+           err = s.ReadPAT(&packet, pr)
+           if err != nil {
+               return err
+           }
        }
        err = s.ReadPMT(&packet, pr)
        if err != nil {
            return err
        }
+
        // 在读取PMT中已经将所有的音视频PES的索引信息全部保存了起来
        // 接着读取所有TS包里面的PID,找出PID==elementaryPID的TS包,就是音视频数据
        for _, v := range s.pmt.Stream {
            if v.ElementaryPID == packet.Header.Pid {
+               ss := streamsStatus[v.ElementaryPID]
+               if ss == nil {
+                   ss = &streamStatus{}
+                   streamsStatus[v.ElementaryPID] = ss
+               }
                if packet.Header.PayloadUnitStartIndicator == 1 {
-                   if frame != 0 {
-                       pesPkt, err := TsToPES(tsPktArr)
+                   if ss.frame != 0 {
+                       pesPkt, err := TsToPES(ss.tsPktArr)
                        if err != nil {
                            return err
                        }
                        onPES(pesPkt)
-                       tsPktArr = nil
+                       ss.tsPktArr = nil
+                   }
+
+                   if ss.firstTsPkt == nil {
+                       ss.firstTsPkt = &packet
+                       onStream(v)
                    }
-                   s.firstTsPkt = &packet
-                   onStream(v)
-                   frame++
+                   ss.frame++
                }
-               tsPktArr = append(tsPktArr, packet)
+               ss.tsPktArr = append(ss.tsPktArr, packet)
            }
        }
    }
langhuihui commented 1 year ago

顺序是乱的是指什么?你的意思需要在feed里面排序?

mars79668 commented 1 year ago

TsPacket 可能不是连续的, 及 当前TsPacket是stream1(audio)的包,下一个TsPacket有些视频里可能是 stream2(video)的包, 所以解析的时候 需要 使用 ElementaryPID 判断当前包是那个stream 的包,无需进行排序

mars79668 commented 1 year ago

另外还有个问题,就是ts包的处理没有考虑和视频流播放速度,应保存1x 处理速度,目前方式处理速度太快, 播放会出现丢帧卡顿,没有声音,应该要根据pts 做延时处理,我还在学习中,不知对不对; ffmpeg 推流可以使用 -re 参数保持1x 速度推流

langhuihui commented 1 year ago

可以配置SpeedLimit 全局参数来保持流速,TsPacket前后确实可能是不同的Stream的,但是PES包应该是连续的,在onPES里面会对类型做判断

mars79668 commented 1 year ago

tsPktArr 应该保存的是 同一个 stream 的 ,否则 两个 stream 的数据混再一起, 后续解析就会有问题

mars79668 commented 1 year ago

TsPacket 不连续,会导致 pes包解析会乱, 我这里遇到的问题是 将视频数据的解析到音频数据,导致音频后续解析出错 我的解决方法是通过 v.ElementaryPID 记录pes包解析状态,保证乱序包切换 可以将ts包解析到正确的pes 包去

langhuihui commented 1 year ago

什么情况下tspacket会不连续呢?除非udp包,hls都是tcp的啊

mars79668 commented 1 year ago

我们这里有比较老的编码设备输出的 hls 就有这种情况

langhuihui commented 1 year ago

已经兼容这种情况了