jinleileiking / learning-notes

Notes for my learning.
GNU General Public License v3.0
1 stars 2 forks source link

Livekit server #5

Open jinleileiking opened 1 year ago

jinleileiking commented 1 year ago

浏览器连接到room(不推流)

sequenceDiagram
    Browser-->>Server: WS connected
    Server-->>Browser: join
    Server-->>Server: Participant state  JOINING -> JOINED
    Server-->>Server: ICE garther state  -> gathering
    Server-->>Browser: offer 
    Server-->>Server: ICE garther state  -> complete
    Server-->>Browser: trickle * n ?
    Browser-->>Server: answer
    Server-->>Server: ICE connection state  -> checking
    Browser-->>Server: trickle
    Server-->>Server: ICE connection state  -> connected
    Server-->>Server: Peer connection state  -> connected
    Browser-->>Server: UpdateSubscriptionPermissions
    Server-->>Server: Participant state  JOINING -> ACTIVE
    Browser-->>Server: Addtrack

对应关系:

service-redis:6379> hgetall room_participants:my-first-room
1) "user1"
2) "\n\x0fPA_SEay4PEgSJqA\x12\x05user1\x18\x02\"\xb1\x01\n\x11TR_VCYBLYmHHfLWw8\x10\x01(\x80\n0\xd0\x058\x01H\x01R\x12\b\x01\x10\x80\x05\x18\xe8\x02 \xe0\xa7\x12(\xbb\xf0\xe7\xe0\x03R\x12\b\x02\x10\x80\n\x18\xd0\x05 \xa0\xe1g(\xdc\x9d\x95\xf7\tR\x10\x10\xc0\x02\x18\xb4\x01 \xc0\xa9\a(\x82\xd0\xa0\xd3\x01Z\tvideo/VP8b\x012jH\n\tvideo/VP8\x12\x012\"\x12\b\x01\x10\x80\x05\x18\xe8\x02 \xe0\xa7\x12(\xbb\xf0\xe7\xe0\x03\"\x12\b\x02\x10\x80\n\x18\xd0\x05 \xa0\xe1g(\xdc\x9d\x95\xf7\t\"\x10\x10\xc0\x02\x18\xb4\x01 \xc0\xa9\a(\x82\xd0\xa0\xd3\x01\"#\n\x11TR_AMNeTZiyhvC5HyH\x02Z\taudio/redb\x0100\xcb\xff\x91\xa3\x06J\x05user1P\x13Z\x06\b\x01\x10\x01\x18\x01h\x01"
image

room有user的tracks ssrc信息

jinleileiking commented 1 year ago
❯ livekit-cli get-participant  --room my-first-room --identity user1 --api-key mykey --api-secret mysecret --url https://lkali.visionular.com   | jq .
{
  "sid": "PA_SEay4PEgSJqA",
  "identity": "user1",
  "state": 2,
  "tracks": [
    {
      "sid": "TR_VCYBLYmHHfLWw8",
      "type": 1,
      "width": 1280,
      "height": 720,
      "simulcast": true,
      "source": 1,
      "layers": [
        {
          "quality": 1,
          "width": 640,
          "height": 360,
          "bitrate": 300000,
          "ssrc": 1008334907
        },
        {
          "quality": 2,
          "width": 1280,
          "height": 720,
          "bitrate": 1700000,
          "ssrc": 2665828060
        },
        {
          "width": 320,
          "height": 180,
          "bitrate": 120000,
          "ssrc": 443033602
        }
      ],
      "mime_type": "video/VP8",
      "mid": "2",
      "codecs": [
        {
          "mime_type": "video/VP8",
          "mid": "2",
          "layers": [
            {
              "quality": 1,
              "width": 640,
              "height": 360,
              "bitrate": 300000,
              "ssrc": 1008334907
            },
            {
              "quality": 2,
              "width": 1280,
              "height": 720,
              "bitrate": 1700000,
              "ssrc": 2665828060
            },
            {
              "width": 320,
              "height": 180,
              "bitrate": 120000,
              "ssrc": 443033602
            }
          ]
        }
      ]
    },
    {
      "sid": "TR_AMNeTZiyhvC5Hy",
      "source": 2,
      "mime_type": "audio/red",
      "mid": "0"
    }
  ],
  "joined_at": 1684307915,
  "name": "user1",
  "version": 19,
  "permission": {
    "can_subscribe": true,
    "can_publish": true,
    "can_publish_data": true
  },
  "is_publisher": true
}
jinleileiking commented 1 year ago

收到多码率推流,选一路给播放端

redis 收到加入消息, 建立pc

buffer是很早之前用room factory建立的


 (r *RoomManager) StartSession
  rtcConf := *r.rtcConfig
  rtcConf.SetBufferFactory(room.GetBufferFactory())
  SetBufferFactory
    c.BufferFactory = factory
    c.SettingEngine.BufferFactory = factory.GetOrNew
  participant, err = rtc.NewParticipant(rtc.ParticipantParams
  Config:                  &rtcConf

rtc.NewParticipant
  params:                    params

 (p *ParticipantImpl) setupTransportManager()
NewTransportManager
NewPCTransport
 t.createPeerConnection

在收到包后建立reader

(r *ReadStreamSRTP) init r.buffer = r.session.bufferFactory(packetio.RTPBufferPacket, ssrc) = Factory).GetOrNew

runtime/debug.Stack()
        /usr/local/go/src/runtime/debug/stack.go:24 +0x65
runtime/debug.PrintStack()
        /usr/local/go/src/runtime/debug/stack.go:16 +0x19
github.com/livekit/livekit-server/pkg/sfu/buffer.(*Factory).GetOrNew(0xc0001a1840, 0x1, 0x49108d09)
        /workspace/pkg/sfu/buffer/factory.go:53 +0x4b
github.com/pion/srtp/v2.(*ReadStreamSRTP).init(0xc0011d6ac0, {0x1332960?, 0xc0025ee8f0?}, 0x49108d09)
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/stream_srtp.go:53 +0x17d
github.com/pion/srtp/v2.(*session).getOrCreateReadStream(0xc0025ee8f0, 0x1d52000?, {0x1332960, 0xc0025ee8f0}, 0x12022b8)
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/session.go:79 +0x12e
github.com/pion/srtp/v2.(*SessionSRTP).decrypt(0xc0025ee8f0, {0xc001d52000, 0x472, 0x2000})
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/session_srtp.go:170 +0xa5
github.com/pion/srtp/v2.(*session).start.func1()
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/session.go:142 +0x12e
created by github.com/pion/srtp/v2.(*session).start
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/session.go:121 +0x278
goroutine 716 [running]:
runtime/debug.Stack()
        /usr/local/go/src/runtime/debug/stack.go:24 +0x65
runtime/debug.PrintStack()
        /usr/local/go/src/runtime/debug/stack.go:16 +0x19
github.com/livekit/livekit-server/pkg/sfu/buffer.(*Factory).GetOrNew(0xc0001a1840, 0x2, 0x49108d09)
        /workspace/pkg/sfu/buffer/factory.go:53 +0x4b
github.com/pion/srtp/v2.(*ReadStreamSRTCP).init(0xc0011d6b80, {0x1332930?, 0xc001dec000?}, 0x49108d09)
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/stream_srtcp.go:117 +0x17d
github.com/pion/srtp/v2.(*session).getOrCreateReadStream(0xc001dec000, 0x40dc07?, {0x1332930, 0xc001dec000}, 0x12022b0)
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/session.go:79 +0x12e
github.com/pion/srtp/v2.(*SessionSRTCP).OpenReadStream(0x30?, 0x1b19da0?)
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/session_srtcp.go:82 +0x2c
github.com/pion/webrtc/v3.(*DTLSTransport).streamsForSSRC(0xc002796b00, 0x19c0d8?, {{0x0, 0x0}, 0xc000d453e0, 0x49108d09, 0x60, {0xc0025e9590, 0x3, 0x3}, ...})
        /go/pkg/mod/github.com/pion/webrtc/v3@v3.1.50/dtlstransport.go:488 +0x225
github.com/pion/webrtc/v3.(*PeerConnection).handleIncomingSSRC(0xc0004fc480, {0x132b0a0, 0xc0011d6ac0}, 0x49108d09)
        /go/pkg/mod/github.com/pion/webrtc/v3@v3.1.50/peerconnection.go:1565 +0x785
github.com/pion/webrtc/v3.(*PeerConnection).undeclaredRTPMediaProcessor.func1({0x132b0a0?, 0xc0011d6ac0?}, 0x1d6f7b8?)
        /go/pkg/mod/github.com/pion/webrtc/v3@v3.1.50/peerconnection.go:1645 +0x54
created by github.com/pion/webrtc/v3.(*PeerConnection).undeclaredRTPMediaProcessor
        /go/pkg/mod/github.com/pion/webrtc/v3@v3.1.50/peerconnection.go:1644 +0x276

(r *RedisRouter) handleRTCMessage
router.OnNewParticipantRTC(r.StartSession)
(r *RedisRouter) startParticipantRTC
   go func() 
        r.onNewParticipant

 (r *RoomManager) StartSession
NewParticipant
 p := &ParticipantImpl 
params:                    params
(p *ParticipantImpl) setupTransportManager()
NewTransportManager 
Config:                  p.params.Config,
NewTransportManager(params TransportManagerParams)
NewPCTransport(Config:                  params.Config)

t := &PCTransport
   params:                   params

(t *PCTransport) createPeerConnection()
newPeerConnection(t.params
  se := params.Config.SettingEngine
  api := webrtc.NewAPI
    webrtc.WithSettingEngine(se)
 (api *API) NewPeerConnection --------- [pion]

uptrack收到数据ontrack,forward downtrack

tm.OnPublisherTrack(p.onMediaTrack)
   t.publisher.OnTrack(f)   -----------------pc.ontrack

pc.Ontrack = (p *ParticipantImpl) onMediaTrack
(p *ParticipantImpl) mediaTrackReceived
  (p *ParticipantImpl) addMediaTrack
    mt := NewMediaTrack
    BufferFactory:       p.params.Config.BufferFactory --- 这个就是room的buffer factory
    (u *UpTrackManager) AddPublishedTrack(mt)
  (t *MediaTrack) AddReceiver [[[[[[[[[[[]]]]]]]]]]]]]]]]
    buff, rtcpReader := t.params.BufferFactory.GetBufferPair(uint32(track.SSRC()))
    wr.(*sfu.WebRTCReceiver).AddUpTrack(track, buff)
      w.buffers[layer] = buff
      go (w *WebRTCReceiver) forwardRTP

forwardRTP
  for {
       buf := w.buffers[layer]         ----------- 从WebRTCReceiver读取buffer
       buf.ReadExtended(pktBuf)
         w.downTrackSpreader.Broadcast
            (d *DownTrack) WriteRTP
               tp, err := d.forwarder.GetTranslationParams(extPkt, layer)  ----- 这里会看要不要发送,不是想要的layer会丢弃
  }

AddReceiver 配置rtcpreader的onpacket

 rtcpReader.OnPacket(func(bytes []byte)

收rtcp

调用api时指定buffer的回调。。

NewAPI SettingEngine.BufferFactory

---- pion
(pc *PeerConnection) SetRemoteDescription
pc.ops.Enqueue(func() startTransports 

.....  
 (pc *PeerConnection) startTransports
(t *DTLSTransport) Start
(t *DTLSTransport) startSRTP()
--- rtcp
NewSessionSRTCP
 (s *session) start
go (s *SessionSRTCP) decrypt
(r *ReadStreamSRTCP) write
 r.buffer.Write =   buffer.(*RTCPReader).Write
 github.com/livekit/livekit-server/pkg/rtc.(*MediaTrack).AddReceiver.func1({0xc001d44000, 0x28, 0x2000})
        /workspace/pkg/rtc/mediatrack.go:205 +0x55
github.com/livekit/livekit-server/pkg/sfu/buffer.(*RTCPReader).Write(0xc001638dd0?, {0xc001d44000?, 0x13328d0?, 0xc0008a2b80?})
        /workspace/pkg/sfu/buffer/rtcpreader.go:26 +0x7c
github.com/pion/srtp/v2.(*ReadStreamSRTCP).write(0xc001744fd0?, {0xc001d44000?, 0x28, 0xc001744fd0?})
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/stream_srtcp.go:30 +0x30
github.com/pion/srtp/v2.(*SessionSRTCP).decrypt(0xc001744fd0, {0xc001d44000?, 0x10000c000010348?, 0xc001c3a568?})
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/session_srtcp.go:176 +0x17c
github.com/pion/srtp/v2.(*session).start.func1()
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/session.go:142 +0x12e
created by github.com/pion/srtp/v2.(*session).start
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/session.go:121 +0x278
jinleileiking commented 1 year ago

客户端发起换layer

HandleParticipantSignal participant.UpdateSubscribedTrackSettings = (p *ParticipantImpl) UpdateSubscribedTrackSettings

UpdateSubscribedTrackSettings (t SubscribedTrack) UpdateSubscriberSettings (t SubscribedTrack) UpdateVideoLayer() t.MediaTrack().GetQualityForDimension buffer.VideoQualityToSpatialLayer t.DownTrack().SetMaxSpatialLayer

jinleileiking commented 1 year ago

收remb 进行abr

pion....

(d *DownTrack) Bind 注册onpacket

(r *RTCPReader) Write f(p) = Bind

rr.OnPacket d.handleRTCP(pkt) (d DownTrack) handleRTCP (s StreamAllocator) onREMB = (*StreamAllocator).onREMB s.postEvent streamAllocatorSignalEstimate

handleNewEstimateInNonProbe (c ChannelObserver) AddEstimate (t TrendDetector) AddValue (t *TrendDetector) updateDirection()

github.com/livekit/livekit-server/pkg/sfu.(*StreamAllocator).onREMB(0xc0001f6a20, 0xc0002c9b80, 0xc001f14860)
        /workspace/pkg/sfu/streamallocator.go:357 +0x194
github.com/livekit/livekit-server/pkg/sfu.(*DownTrack).handleRTCP(0xc0002c9b80, {0xc001a68000?, 0xc001bcae38?, 0xc0ae66?})
        /workspace/pkg/sfu/downtrack.go:1179 +0x4bf
github.com/livekit/livekit-server/pkg/sfu.(*DownTrack).Bind.func1({0xc001a68000?, 0x10040c0?, 0xc001a55e30?})
        /workspace/pkg/sfu/downtrack.go:321 +0x2e
github.com/livekit/livekit-server/pkg/sfu/buffer.(*RTCPReader).Write(0xc001bcadd0?, {0xc001a68000?, 0x1332790?, 0xc001f8e500?})
        /workspace/pkg/sfu/buffer/rtcpreader.go:26 +0x7c
github.com/pion/srtp/v2.(*ReadStreamSRTCP).write(0xc001e5e6e0?, {0xc001a68000?, 0x38, 0xc001e5e6e0?})
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/stream_srtcp.go:30 +0x30
github.com/pion/srtp/v2.(*SessionSRTCP).decrypt(0xc001e5e6e0, {0xc001a68000?, 0xc001597b30?, 0x100000001?})
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/session_srtcp.go:176 +0x17c
github.com/pion/srtp/v2.(*session).start.func1()
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/session.go:142 +0x12e
created by github.com/pion/srtp/v2.(*session).start
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/session.go:121 +0x278
jinleileiking commented 1 year ago

收到推流rtp,发送 twcc

ithub.com/livekit/livekit-server/pkg/rtc.(*ParticipantImpl).mediaTrackReceived.func1({0xc0003d0f30, 0x2c, 0x2c})
        /workspace/pkg/rtc/participant.go:1668 +0xcf
github.com/livekit/mediatransportutil/pkg/twcc.(*Responder).Push(0xc0025d2180, 0x397, 0x1769fcb3b2bb3af7, 0x1)
        /go/pkg/mod/github.com/livekit/mediatransportutil@v0.0.0-20221007030528-7440725c362b/pkg/twcc/twcc.go:81 +0x1fb
github.com/livekit/livekit-server/pkg/sfu/buffer.(*Buffer).processHeaderExtensions(0xc001333d40, 0xc0013e0230, 0x42d?)
        /workspace/pkg/sfu/buffer/buffer.go:487 +0x1db
github.com/livekit/livekit-server/pkg/sfu/buffer.(*Buffer).calc(0xc001333d40, {0xc00164a000, 0x42d, 0x2000}, 0xc08ff6?)
        /workspace/pkg/sfu/buffer/buffer.go:404 +0x1e5
github.com/livekit/livekit-server/pkg/sfu/buffer.(*Buffer).Write(0xc001333d40, {0xc00164a000, 0x42d, 0x2000})
        /workspace/pkg/sfu/buffer/buffer.go:250 +0x14f
github.com/pion/srtp/v2.(*ReadStreamSRTP).write(0xc001615980?, {0xc00164a000?, 0x42d, 0x2000?})
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/stream_srtp.go:64 +0x30
github.com/pion/srtp/v2.(*SessionSRTP).decrypt(0xc0003e0b00, {0xc00164a000, 0x437, 0x2000})
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/session_srtp.go:187 +0x15d
github.com/pion/srtp/v2.(*session).start.func1()
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/session.go:142 +0x12e
created by github.com/pion/srtp/v2.(*session).start
        /go/pkg/mod/github.com/pion/srtp/v2@v2.0.10/session.go:121 +0x278
jinleileiking commented 1 year ago

接收端,server收到handleRemoteDescriptionReceived

github.com/livekit/livekit-server/pkg/sfu/buffer.(*Factory).GetOrNew(0xc0014f9580, 0x2, 0x88317c71)
        /workspace/pkg/sfu/buffer/factory.go:53 +0x4b
github.com/livekit/livekit-server/pkg/sfu.(*DownTrack).Bind(0xc0002c2580, {{0xc000ccc000, 0x20}, {{0xc0018357a0, 0x2, 0x2}, {0xc00021b340, 0x4, 0x4}}, 0x88317c71, ...})
        /workspace/pkg/sfu/downtrack.go:319 +0x4d6

github.com/pion/webrtc/v3.(*RTPSender).Send(0xc0017f8080, {{{0xc001835320, 0x2, 0x2}, {0xc00021b4a0, 0x4, 0x4}}, {0xc0011b6140, 0x1, 0x1}})
        /go/pkg/mod/github.com/pion/webrtc/v3@v3.1.50/rtpsender.go:305 +0x515
github.com/pion/webrtc/v3.(*PeerConnection).startRTPSenders(0xc001beeb40?, {0xc0000125e8, 0x1, 0xc000012030?})
        /go/pkg/mod/github.com/pion/webrtc/v3@v3.1.50/peerconnection.go:1441 +0x1bd
github.com/pion/webrtc/v3.(*PeerConnection).SetRemoteDescription(0xc0004af8c0, {0x3, {0xc000fcd500, 0x6f6}, 0xc0004ac5a0})
        /go/pkg/mod/github.com/pion/webrtc/v3@v3.1.50/peerconnection.go:1155 +0x719

github.com/livekit/livekit-server/pkg/rtc.(*PCTransport).setRemoteDescription(0xc001174c00, {0x0?, {0xc000fcc700?, 0x0?}, 0x0?})
        /workspace/pkg/rtc/transport.go:1614 +0x31b
github.com/livekit/livekit-server/pkg/rtc.(*PCTransport).handleRemoteAnswerReceived(0xc001174c00, 0x0?)
        /workspace/pkg/rtc/transport.go:1709 +0x34
github.com/livekit/livekit-server/pkg/rtc.(*PCTransport).handleRemoteDescriptionReceived(0x11a0aed?, 0xc001fcb000?)
        /workspace/pkg/rtc/transport.go:1583 +0x49
github.com/livekit/livekit-server/pkg/rtc.(*PCTransport).handleEvent(0xc001174c00, 0xc002b5df68)
        /workspace/pkg/rtc/transport.go:1265 +0x1ee
github.com/livekit/livekit-server/pkg/rtc.(*PCTransport).processEvents(0xc001174c00)
        /workspace/pkg/rtc/transport.go:1237 +0xc8
created by github.com/livekit/livekit-server/pkg/rtc.NewPCTransport
jinleileiking commented 1 year ago

(d *DownTrack) Bind

        /usr/local/go/src/runtime/debug/stack.go:24 +0x65
runtime/debug.PrintStack()
        /usr/local/go/src/runtime/debug/stack.go:16 +0x19
github.com/livekit/livekit-server/pkg/sfu/buffer.(*Factory).GetOrNew(0xc00017dbc0, 0x2, 0x4ef54e6d)
        /workspace/pkg/sfu/buffer/factory.go:53 +0x4b
github.com/livekit/livekit-server/pkg/sfu.(*DownTrack).Bind(0xc001581440, {{0xc0004803f0, 0x20}, {{0xc001a47b00, 0x2, 0x2}, {0xc001ec8c60, 0x4, 0x4}}, 0x4ef54e6d, ...})
        /workspace/pkg/sfu/downtrack.go:357 +0x80a
github.com/pion/webrtc/v3.(*RTPSender).Send(0xc000589400, {{{0xc001a47620, 0x2, 0x2}, {0xc001ec8dc0, 0x4, 0x4}}, {0xc000425460, 0x1, 0x1}})
        /go/pkg/mod/github.com/pion/webrtc/v3@v3.2.9/rtpsender.go:308 +0x515
github.com/pion/webrtc/v3.(*PeerConnection).startRTPSenders(0xc001a40a40?, {0xc000012970, 0x1, 0xc0001344d0?})
        /go/pkg/mod/github.com/pion/webrtc/v3@v3.2.9/peerconnection.go:1458 +0x1bd
github.com/pion/webrtc/v3.(*PeerConnection).SetRemoteDescription(0xc001581200, {0x3, {0xc000af9500, 0x6f5}, 0xc0010e4360})
        /go/pkg/mod/github.com/pion/webrtc/v3@v3.2.9/peerconnection.go:1169 +0x719
github.com/livekit/livekit-server/pkg/rtc.(*PCTransport).setRemoteDescription(0xc00102f880, {0xc000f2de58?, {0xc000af8700?, 0x0?}, 0x0?})
        /workspace/pkg/rtc/transport.go:1783 +0x330
github.com/livekit/livekit-server/pkg/rtc.(*PCTransport).handleRemoteAnswerReceived(0xc00102f880, 0x0?)
        /workspace/pkg/rtc/transport.go:1892 +0x34
github.com/livekit/livekit-server/pkg/rtc.(*PCTransport).handleRemoteDescriptionReceived(0x12ec64b?, 0xc000b23800?)
        /workspace/pkg/rtc/transport.go:1752 +0x49
github.com/livekit/livekit-server/pkg/rtc.(*PCTransport).handleEvent(0xc00102f880, 0xc000f2df68)
        /workspace/pkg/rtc/transport.go:1417 +0x1ee
github.com/livekit/livekit-server/pkg/rtc.(*PCTransport).processEvents(0xc00102f880)
        /workspace/pkg/rtc/transport.go:1389 +0xd5
created by github.com/livekit/livekit-server/pkg/rtc.NewPCTransport
        /workspace/pkg/rtc/transport.go:389 +0x265
goroutine 723 [running]:
runtime/debug.Stack()
        /usr/local/go/src/runtime/debug/stack.go:24 +0x65
runtime/debug.PrintStack()
        /usr/local/go/src/runtime/debug/stack.go:16 +0x19
github.com/livekit/livekit-server/pkg/sfu.NewDownTrack({0xc0018da300, 0x1, 0x1}, {0x1488c38, 0xc00054e1e0}, 0xc000130000, {0xc00004b3f0, 0xf}, 0x1f4, {0x14800e0, ...})
        /workspace/pkg/sfu/downtrack.go:266 +0x192
github.com/livekit/livekit-server/pkg/rtc.(*MediaTrackSubscriptions).AddSubscriber(0xc000d57ce0, {0x1494818, 0xc001863c00}, 0xc00054e1e0)
        /workspace/pkg/rtc/mediatracksubscriptions.go:104 +0x46a
github.com/livekit/livekit-server/pkg/rtc.(*MediaTrackReceiver).AddSubscriber(0xc0018a6e00, {0x1494818, 0xc001863c00})
        /workspace/pkg/rtc/mediatrackreceiver.go:492 +0x70f
github.com/livekit/livekit-server/pkg/rtc.(*SubscriptionManager).subscribe(0xc000c1a180, 0xc0002948c0)
        /workspace/pkg/rtc/subscriptionmanager.go:484 +0x5ec
github.com/livekit/livekit-server/pkg/rtc.(*SubscriptionManager).reconcileSubscription(0xc000c1a180, 0xc0002948c0)
        /workspace/pkg/rtc/subscriptionmanager.go:305 +0x165
github.com/livekit/livekit-server/pkg/rtc.(*SubscriptionManager).reconcileWorker(0xc000c1a180)
        /workspace/pkg/rtc/subscriptionmanager.go:405 +0x1f2
created by github.com/livekit/livekit-server/pkg/rtc.NewSubscriptionManager
        /workspace/pkg/rtc/subscriptionmanager.go:89 +0x1a5
jinleileiking commented 1 year ago

push 

(r *Room) Join
  participant.OnTrackPublished(r.onTrackPublished)
...

(*PeerConnection).onTrack = (p *ParticipantImpl) onMediaTrack
(*ParticipantImpl).mediaTrackReceived
  mt, ok := p.getPublishedTrackBySdpCid(track.ID()).(*MediaTrack)
  mt.AddReceiver(rtpReceiver, track, p.twcc, mid)
     newWR := sfu.NewWebRTCReceiver(receiver, track, ...)
  go p.handleTrackPublished(mt) = (*ParticipantImpl).handleTrackPublished
    (r *Room) onTrackPublished
      (r *RoomTrackManager) AddTrack

在engine里配置buffer,当rtp有包时,自动调用buffer.write写入buffer

created by github.com/pion/srtp/v2.(*session).start
github.com/pion/srtp/v2.(*session).start.func1()
github.com/pion/srtp/v2.(*SessionSRTP).decrypt(0xc00274a300, {0xc001870000, 0x3f9, 0x2000})
github.com/pion/srtp/v2.(*ReadStreamSRTP).write(0xc00196f7a0?, {0xc001870000?, 0x3ef, 0x2000?})
github.com/livekit/livekit-server/pkg/sfu/buffer.(*Buffer).Write(0xc001911c00, {0xc001870000, 0x3ef, 0x2000})

pull

rtc.(*Room).Join.func1({0x1494858, 0xc0015bb400}, 0x1c38900?)
  rtc.(*ParticipantImpl).updateState
    rtc.(*ParticipantImpl).updateState.func1()
      rtc.(*Room).subscribeToExistingTracks(0xc000320000, {0x1494858, 0xc0015bb400})
        for _, track := range op.GetPublishedTracks()
          rtc.(*SubscriptionManager).SubscribeToTrack(track.ID())
            (m *SubscriptionManager) queueReconcile

pull 取得pub track并newdowntrack 挂载到和客户端pc

(r *RoomManager) StartSession
NewParticipant
(p *ParticipantImpl) setupSubscriptionManager()
rtc.NewSubscriptionManager
go {
    rtc.(*SubscriptionManager).reconcileWorker()
        case trackID := <-m.reconcileCh:
        rtc.(*SubscriptionManager).reconcileSubscription()
            rtc.(*SubscriptionManager).subscribe
              res := m.params.TrackResolver(m.params.Participant.Identity(), trackID)
                (r *Room) ResolveMediaTrackForSubscriber
                  info := r.trackManager.GetTrackInfo(trackID) (map[livekit.TrackID]*TrackInfo)
                  res.Track = info.Track
              track := res.Track  (Track                MediaTrack)
              track.AddSubscriber = rtc.(*MediaTrackReceiver).AddSubscriber(0xc0013ca820, {0x134aba8, 0xc00116e800})
                rtc.(*MediaTrackSubscriptions).AddSubscriber(0xc00037c620, {0x134aba8, 0xc00116e800}, 0xc0012093b0)
                  receivers := t.receiversShadow
                  sfu.NewDownTrack ---------------------
                  rtc.(*TransportManager).AddTrackToSubscriber(downTrack, addTrackParams)
                    rtc.(*PCTransport).AddTrack(0xc000ca1800, {0x1337e50, 0xc000222580}, {0x8?})  
    }

pc 建立

(r *RedisRouter) handleRTCMessage
router.OnNewParticipantRTC(r.StartSession)

(r *RedisRouter) startParticipantRTC
  go func() 
    r.onNewParticipant =   (r *RoomManager) StartSession
      NewParticipant  [p := &ParticipantImpl   params: params]
        (p *ParticipantImpl) setupTransportManager() 
          NewTransportManager  [Config: p.params.Config]  [NewTransportManager(params TransportManagerParams)]
            NewPCTransport(Config:                  params.Config)  [t := &PCTransport {  params:                   params}]
              (t *PCTransport) createPeerConnection()
                newPeerConnection(t.params
                    se := params.Config.SettingEngine
                    api := webrtc.NewAPI
                      webrtc.WithSettingEngine(se)
                   (api *API) NewPeerConnection --------- [pion]

receive nack

retransmitPackets
  n, err := d.receiver.ReadRTP(pktBuff, uint8(meta.layer), meta.sourceSeqNo)

------------

(wh *WhepHandler) Subscribe
  (s *Subscriber) subscribe
    trackChan := s.LocalStore.Subscribe(ctx, s)
    go addDownTracks(ctx, pc, trackChan, addTrackDone)
      for tl := range trackChan {
        pc.AddTransceiverFromTrack
jinleileiking commented 1 year ago
pub pc

publish推流
(*PeerConnection).onTrack
(p *ParticipantImpl) onMediaTrack
(p *ParticipantImpl) mediaTrackReceived
(t *MediaTrack) AddReceiver
(t *MediaTrackReceiver) SetupReceiver