AgoraIO-Extensions / Agora-Golang-Server-SDK

5 stars 0 forks source link

内存泄漏问题: 使用Agora Golang Server SDK后观察到持续的内存增长 #4

Open conanlh opened 4 days ago

conanlh commented 4 days ago

在使用Agora Golang Server SDK后,我的应用程序出现了内存泄漏。通过长时间运行和监控,发现内存使用量持续增长且没有释放的迹象。使用的sdk版本是release 1.3。

我的项目主要是对聊天中的视频帧进行处理,使用到了RtcConnectionEventHandler和RtcConnectionVideoFrameObserver。

我之前使用的C++ sdk进行监听,但是发现存在内存泄漏问题,我以为是自己代码内存没管理好,就换成了golang的sdk,发现还是有内存泄漏问题。 image 我使用pprof分析过我的项目,go端并没有发现明显的内存泄漏。 所以怀疑是不是c++端的sdk存在内存泄漏问题,或者是不是我的使用方法存在问题。 下面是我的项目内存监控截图: image

下面是我的代码主要逻辑:

func AgoraListen(input *AgoraListenInput) error {
    logger.Info("ConnectChannel connect_channel start")

    channelMapLock.Lock()
    existingChannel, exists := channelMap[*input.ChannelId]
    logger.Info("ConnectChannel connect_channel channel exists: %v", exists)

    if exists {
        logger.Info("ConnectChannel connect_channel channel exists, restart: %s", *input.ChannelId)

        // 停止现有的释放定时器和监控 goroutine
        if existingChannel.ReleaseTimer != nil {
            existingChannel.ReleaseTimer.Stop()
        }
        if existingChannel.done != nil {
            close(existingChannel.done) // 发送停止信号给旧的监控 goroutine
        }

        // 创建新的释放定时器和 done 通道
        existingChannel.ReleaseTimer = time.NewTimer(2 * time.Minute)
        existingChannel.done = make(chan struct{})

        // 重新启动监控
        go monitorChannel(*input.ChannelId, existingChannel.ReleaseTimer, existingChannel.DisconnectSignal, existingChannel.done)

        channelMapLock.Unlock()
        logger.Info("Channel %s already exists, restarted release timer", *input.ChannelId)
        return nil
    }
    channelMapLock.Unlock()

    conCfg := agoraservice.RtcConnectionConfig{
        SubAudio:       false,
        SubVideo:       true,
        ClientRole:     2,
        ChannelProfile: 1,

        SubAudioConfig: &agoraservice.SubscribeAudioConfig{
            SampleRate: 16000,
            Channels:   1,
        },
    }

    conSignal := make(chan struct{})
    disconnectSignal := make(chan struct{})
    lastFrameTime := time.Now()
    var mutex sync.Mutex

    conHandler := agoraservice.RtcConnectionEventHandler{
        OnConnected: func(con *agoraservice.RtcConnection, info *agoraservice.RtcConnectionInfo, reason int) {
            logger.Info("已连接")
            conSignal <- struct{}{}
        },
        OnDisconnected: func(con *agoraservice.RtcConnection, info *agoraservice.RtcConnectionInfo, reason int) {
            logger.Info("已断开连接")
        },
        OnUserJoined: func(con *agoraservice.RtcConnection, uid string) {
            logger.Info("用户加入 uid: %s", uid)
            UserJoined(uid)
        },
        OnUserLeft: func(con *agoraservice.RtcConnection, uid string, reason int) {
            logger.Info("用户离开 uid: %s, 原因: %d", uid, reason)
            UserLeft(uid)
        },
    }
    conCfg.ConnectionHandler = &conHandler
    conCfg.VideoFrameObserver = &agoraservice.RtcConnectionVideoFrameObserver{
        OnFrame: func(con *agoraservice.RtcConnection, channelId string, userId string, frame *agoraservice.VideoFrame) {
            mutex.Lock()
            lastFrameTime = time.Now()
            mutex.Unlock()
            VideoOnFrame(con, channelId, userId, frame)
        },
    }
    conCfg.AudioFrameObserver = &agoraservice.RtcConnectionAudioFrameObserver{
        OnPlaybackAudioFrameBeforeMixing: func(con *agoraservice.RtcConnection, channelId string, userId string, frame *agoraservice.PcmAudioFrame) {
            mutex.Lock()
            lastFrameTime = time.Now()
            mutex.Unlock()
            logger.Info("播放混音前的音频帧,来自用户 %s\n", userId)
        },
    }
    con := agoraservice.NewConnection(&conCfg)
    logger.Info("ConnectChannel connect_channel 开始连接")

    conResult := con.Connect(*input.Token, *input.ChannelId, "0")
    logger.Info("ConnectChannel connect_channel 连接结果: %d", conResult)

    timer := time.NewTimer(2 * time.Minute)
    defer timer.Stop()

    select {
    case <-conSignal:
        logger.Info("ConnectChannel connect_channel 已连接")
    case <-timer.C:
        logger.Info("ConnectChannel connect_channel 连接超时")
        return fmt.Errorf("connection timeout after 2 minutes")
    }

    channelMapLock.Lock()
    defer channelMapLock.Unlock()

    releaseTimer := time.NewTimer(2 * time.Minute)
    done := make(chan struct{})
    channelInfo := &ChannelInfo{
        Conn: con,
        Params: &ConnectChannelParams{
            Input:       input,
            LeftUsers:   []string{},
            JoinedUsers: []string{},
        },
        DisconnectSignal: disconnectSignal,
        ReleaseTimer:     releaseTimer,
        done:             done,
        AllJoinedTime:    nil,
    }
    channelMap[*input.ChannelId] = channelInfo
    // 立即开始监控,不等待 UserJoined
    go monitorChannel(*input.ChannelId, releaseTimer, disconnectSignal, done)

    go func() {
        ticker := time.NewTicker(10 * time.Second)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                channelMapLock.RLock()
                channelInfo, exists := channelMap[*input.ChannelId]
                if !exists {
                    channelMapLock.RUnlock()
                    return
                }
                allJoinedTime := channelInfo.AllJoinedTime
                channelMapLock.RUnlock()

                mutex.Lock()
                var elapsed time.Duration
                if allJoinedTime != nil {
                    if lastFrameTime.Before(*allJoinedTime) {
                        elapsed = time.Since(*allJoinedTime)
                    } else {
                        elapsed = time.Since(lastFrameTime)
                    }
                }
                mutex.Unlock()

                if allJoinedTime != nil && elapsed > time.Minute {
                    logger.Info("超过一分钟没有收到帧,断开连接")
                    err := ReleaseChannel(*input.ChannelId)
                    if err != nil {
                        logger.Error("断开连接失败: %v", err)
                    }
                    return
                }
            case <-disconnectSignal:
                return
            }
        }
    }()

    logger.Info("ConnectChannel connect_channel 完成")

    return nil
}

func ReleaseChannel(channelId string) error {
    channelMapLock.Lock()
    defer channelMapLock.Unlock()

    channel, exists := channelMap[channelId]

    if !exists || channel == nil {
        return fmt.Errorf("channel %s does not exist", channelId)
    }

    if channel.DisconnectSignal != nil {
        close(channel.DisconnectSignal)
    }

    // 断开连接
    result := channel.Conn.Disconnect()
    if result != 0 {
        logger.Error("Failed to disconnect channel %s: %v", channelId, result)
        return fmt.Errorf("failed to disconnect channel %s: %v", channelId, result)
    }

    // 释放连接资源
    channel.Conn.Release()

    // 从channelMap中删除
    delete(channelMap, channelId)

    C.auto_malloc_trim()
    logger.Info("Channel %s released and removed from channelMap", channelId)

    return nil
}

func VideoOnFrame(con *agoraservice.RtcConnection, channelId string, userId string, frame *agoraservice.VideoFrame) {
    channelInfo := channelMap[channelId]
    if channelInfo == nil || channelInfo.Params == nil || channelInfo.Params.Input == nil {
        return
    }
    now := time.Now().UnixMilli()
    starChatId := strconv.Itoa(*channelInfo.Params.Input.StarChatId)
    if userId == starChatId && (channelInfo.LastFrameProcessorTime == nil || now-*channelInfo.LastFrameProcessorTime > 3*1000) {
        logger.Info("VideoOnFrame %s, %d\n", userId, frame.Timestamp)
        image_channel.ImageProcessChan <- &image_channel.ChannelFrame{
            Frame:     frame,
            UserId:    *channelInfo.Params.Input.UserId,
            ChannelId: *channelInfo.Params.Input.ChannelId,
            StarId:    *channelInfo.Params.Input.StarId,
            Product:   *channelInfo.Params.Input.Product,
            OrderId:   *channelInfo.Params.Input.OrderId,
        }
        channelInfo.LastFrameProcessorTime = &now
    }
}
package frame_process

func I420ToRGBA(y, u, v []byte, width, height, yStride, uStride, vStride int, rgba []byte) {
    C.I420ToARGB(
        (*C.uint8_t)(unsafe.Pointer(&y[0])),
        C.int(yStride),
        (*C.uint8_t)(unsafe.Pointer(&v[0])),
        C.int(uStride),
        (*C.uint8_t)(unsafe.Pointer(&u[0])),
        C.int(vStride),
        (*C.uint8_t)(unsafe.Pointer(&rgba[0])),
        C.int(width*4),
        C.int(width),
        C.int(height),
    )
}

func imageProcessingPipeline(inputChan <-chan *image_channel.ChannelFrame) {

    for channelFrame := range inputChan {
        logger.Info("imageProcessingPipeline YuvToPngInMemory start: %v", channelFrame.ChannelId)

        frame := channelFrame.Frame
        // 对收到的 VideoFrame 进行 YUV 到 BGR 转换
        buffer, error := YuvToPngInMemory(frame)
        logger.Info("imageProcessingPipeline YuvToPngInMemory finish: %v", channelFrame.ChannelId)

        if error != nil {
            logger.Info("imageProcessingPipeline YuvToPngInMemory error: %v", error)
            break
        }

        go image_process.ProcessImage(buffer, frame.Timestamp, channelFrame.ChannelId, channelFrame.UserId, channelFrame.StarId, channelFrame.Product, channelFrame.OrderId)

    }
}

// YuvToPngInMemory 函数:将 YUV 数据直接转换为内存中的 PNG 数据
func YuvToPngInMemory(yuvFrame *agoraservice.VideoFrame) ([]byte, error) {
    width := yuvFrame.Width
    height := yuvFrame.Height
    yStride := yuvFrame.YStride
    uStride := yuvFrame.UStride
    vStride := yuvFrame.VStride
    yBuffer, uBuffer, vBuffer := extractYUVBuffers(yuvFrame)
    channels := 4
    var pngSize C.int

    rgbaPool := sync.Pool{
        New: func() interface{} {
            return make([]byte, width*height*4)
        },
    }

    rgba := rgbaPool.Get().([]byte)
    defer rgbaPool.Put(rgba)
    I420ToRGBA(yBuffer, uBuffer, vBuffer, width, height, yStride, uStride, vStride, rgba)

    cImg := C.stbi_write_png_to_mem(
        (*C.uchar)(unsafe.Pointer(&rgba[0])), // 确保rgba是C类型的unsigned char*
        C.int(width*channels),                // 确保stride_in_bytes是C类型的int
        C.int(width),                         // 确保width是C类型的int
        C.int(height),                        // 确保height是C类型的int
        C.int(channels),                      // 确保channels是C类型的int
        &pngSize,                             // &C.int类型的指针
    )
    defer C.free(unsafe.Pointer(cImg)) // 确保 C 分配的内存被释放

    // 将C返回的img转换为Go的[]byte
    img := C.GoBytes(unsafe.Pointer(cImg), pngSize)

    return img, nil
}
lihuize123123 commented 3 days ago

您的业务上,只用SDK收video frame吗?您可以先注释掉业务处理video frame的部分,看看只接收video frame,不做处理时,随着channel创建和销毁,内存的增长情况;预期应该是增加到一定程度就会相对稳定(因为go的GC机制)。

conanlh commented 3 days ago

您的业务上,只用SDK收video frame吗?您可以先注释掉业务处理video frame的部分,看看只接收video frame,不做处理时,随着channel创建和销毁,内存的增长情况;预期应该是增加到一定程度就会相对稳定(因为go的GC机制)。

感谢回答。我的项目暂时只接受video frame并进行处理。之前我有试过注释掉video frame的处理逻辑,发现内存也是随着连接的创建而增长,不知道是不是我的连接数太少,而没有增长到稳定的内存消耗,后面我再试一下注释掉业务逻辑处理video frame的逻辑看看内存情况。另外,根据你们的预期,内存大概会增长到多少后保持稳定?

lihuize123123 commented 2 days ago

release1.x的版本没有做过类似测试,不过我们一些比较大的客户都没有反馈过类似问题。

lihuize123123 commented 2 days ago

release2.x我们是做过类似测试,内存没有显著提高。如果您还在集成阶段,建议您用2.x版本。

conanlh commented 2 days ago

release2.x我们是做过类似测试,内存没有显著提高。如果您还在集成阶段,建议您用2.x版本。

好的,谢谢