bytedeco / javacv

Java interface to OpenCV, FFmpeg, and more
Other
7.6k stars 1.59k forks source link

cpu utilization is not up transcoding shortage is very slow #2291

Open nocarethancare opened 1 month ago

nocarethancare commented 1 month ago

When I am using grabber and recorder to codec the rtmp live stream, the cpu utilization of the java program is not running up. How can I improve the CPU utilization of javacv when I am using version 1.5.10 my code : package com.mango5g.media.cloud.service.listener;

import lombok.extern.slf4j.Slf4j; import me.zhyd.oauth.utils.StringUtils; import org.bytedeco.ffmpeg.global.avcodec; import org.bytedeco.ffmpeg.global.avutil; import org.bytedeco.javacv.*; import org.bytedeco.opencv.global.opencv_imgcodecs; import org.bytedeco.opencv.opencv_core.Mat;

import java.io.IOException; import java.nio.ShortBuffer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j public class StreamEngine { private final long restartLength; private String MAJOR_STREAM_URL;

private String READY_STREAM_URL;
private String SLAVE_STREAM_URL;
private String OUTPUT_STREAM_URL;
private FFmpegFrameRecorder recorder = null;
private volatile Boolean stopFlag = false;
private volatile Boolean allreadyFlag = false;
private String stopStr = "false";
private Boolean flag = true;
private FFmpegFrameGrabber readyGrabber;
private FFmpegFrameGrabber slaveGrabber;
private Boolean slaveFlag = false;
private static final String r1_441_25 = "/migu/nosignal/r1_441_25";
private static final String r2_441_30 = "/migu/nosignal/r2_441_30";
private static final String r3_480_25 = "/migu/nosignal/r3_480_25";
private static final String r4_480_30 = "/migu/nosignal/r4_480_30";

private FFmpegStreamer fFmpegStreamer;
private String liveName;

private String slaveStreamRtmpUrl;

private String ffmpegbin;

public boolean getAllreadyFlag() {
    return this.allreadyFlag;
}

public StreamEngine(String inputUrl, String outputUrl, String readyUrl, String readyU, String slaveU, String liveName, String ffmpegbin, Long restartLength) {
    this.MAJOR_STREAM_URL = inputUrl;
    this.OUTPUT_STREAM_URL = outputUrl;
    this.SLAVE_STREAM_URL = readyUrl;
    this.READY_STREAM_URL = readyU;
    this.liveName = liveName;
    this.ffmpegbin = ffmpegbin;
    this.restartLength = restartLength;
    if (StringUtils.isNotEmpty(this.liveName)) {
        this.slaveStreamRtmpUrl = slaveU + "/" + liveName;
    }
    log.info("=======inputUrl:{}===outputUrl:{}==readyUrl:{}===readyU:{}=========", inputUrl, outputUrl, readyUrl, readyU);
}

public void run() {
    log.info("=========liveName:{}== stopFlag:{}==hash:{}================", liveName, stopFlag, stopFlag.hashCode());

    ExecutorService executor = Executors.newSingleThreadExecutor();
    AtomicBoolean majorStreamActive = new AtomicBoolean(true);
    FFmpegFrameGrabber majorGrabber = null;
    try {
        majorGrabber = new FFmpegFrameGrabber(MAJOR_STREAM_URL);
        slaveFlag = StringUtils.isNotEmpty(SLAVE_STREAM_URL);
             /*            // 设置帧过滤器模板,将应用于每个源
                         String filterStr = String.format("fps=fps=%d", majorGrabber.getFrameRate());
                         frameFilter = new FFmpegFrameFilter(filterStr, recorder.getImageWidth(), recorder.getImageHeight());
                         frameFilter.setPixelFormat(recorder.getPixelFormat());*/
        //  readyGrabber.setFrameRate(30);
        majorGrabber.setOption("stimeout", "5000000");
        majorGrabber.setOption("threads", "8");
        majorGrabber.start();
        log.info("========majorStream==准备就绪=====");
        if (slaveFlag) {
            fFmpegStreamer = new FFmpegStreamer();
            fFmpegStreamer.startStreaming(ffmpegbin, SLAVE_STREAM_URL, slaveStreamRtmpUrl, majorGrabber);
            Thread.sleep(2000);
            slaveGrabber = new FFmpegFrameGrabber(slaveStreamRtmpUrl);
            try {
                slaveGrabber.setOption("stimeout", "3000000");
                slaveGrabber.start();
                log.info("============用户垫片备用流连接成功========");
            } catch (FrameGrabber.Exception e) {
                slaveFlag = false;
                log.info("============用户垫片备用流不存在========");
            }
        }
        int sampleRate = majorGrabber.getSampleRate();
        double frameRate = majorGrabber.getFrameRate();
        log.info("========main:sampleRate->{}===frameRate:{}=========", sampleRate, frameRate);
        //todo 待开放
        if (sampleRate == 48000) {
            if (frameRate == 25) {
                readyGrabber = new FFmpegFrameGrabber(READY_STREAM_URL + r3_480_25);
            } else {
                readyGrabber = new FFmpegFrameGrabber(READY_STREAM_URL + r4_480_30);
            }
        } else {
            if (frameRate == 25) {
                readyGrabber = new FFmpegFrameGrabber(READY_STREAM_URL + r1_441_25);
            } else {
                readyGrabber = new FFmpegFrameGrabber(READY_STREAM_URL + r2_441_30);
            }

        }
        readyGrabber = new FFmpegFrameGrabber(READY_STREAM_URL);
        try {
            readyGrabber.setOption("stimeout", "3000000");
            readyGrabber.start();
            log.info("========readyStream连接成功==");
        } catch (Exception e) {
            log.error("========readyStream连接失败==:{}", e.getMessage());
        }
        int audioChannels = majorGrabber.getAudioChannels();
        log.info("====majorGrabber.getAudioChannels:{}", audioChannels);
        recorder = new FFmpegFrameRecorder(OUTPUT_STREAM_URL, majorGrabber.getImageWidth(), majorGrabber.getImageHeight(), audioChannels);
        //recorder.setInterleaved(true);
        recorder.setVideoOption("preset", "ultrafast - tune fastdecode");
        recorder.setOption("vsync", "1");
        recorder.setVideoOption("allow_sw", "1");
        // 关键帧间隔,可能影响启动速度和视频质量
        recorder.setGopSize(15);
        recorder.setVideoOption("tune", "zerolatency");
        //添加缓存
        recorder.setVideoOption("bufsize", "10485760");
        recorder.setVideoOption("crf", "30");
        recorder.setSampleRate(majorGrabber.getSampleRate());
        recorder.setFrameRate(majorGrabber.getFrameRate());
        recorder.setVideoBitrate(majorGrabber.getVideoBitrate());
        recorder.setFormat("flv");
        recorder.setOption("threads", "8");
        int videoCodec = majorGrabber.getVideoCodec();
        String videoCodecName = majorGrabber.getVideoCodecName();
        log.info("=====源头 编码:{}=={}==", videoCodecName, videoCodec);
        // recorder.setVideoCodec(majorGrabber.getVideoCodec());
        if (videoCodecName.equals("hevc") || videoCodec == 173) {
            recorder.setVideoCodecName("libx265"); // 使用软件编码器
            log.info("====h265 软编====");
        } else {
            recorder.setVideoCodec(majorGrabber.getVideoCodec());
        }
        int audioBitrate = majorGrabber.getAudioBitrate();
        log.info("===1majorGrabber 音频参数:Channels:{}==AudioBitrate:{}==AudioCodec:{}===", audioChannels, audioBitrate, majorGrabber.getAudioCodecName());
        recorder.setPixelFormat(avutil.AV_PIX_FMT_YUV420P);
        recorder.setAudioBitrate(audioBitrate);
        recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC);
        if (audioChannels != 0) {
            recorder.setAudioChannels(audioChannels);
            if (audioBitrate == 0) {
                audioBitrate = 128000;
            }
            recorder.setAudioBitrate(audioBitrate);
            recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC);
        }
        log.info("===2majorGrabber 音频参数:Channels:{}==AudioBitrate:{}==AudioCodec:{}===", audioChannels, audioBitrate, majorGrabber.getAudioCodecName());
        recorder.start();
        String tbn = recorder.getVideoOption("preset");
        log.info("======推流编码器已经启动==preset:{}===", tbn);
        this.Execute(majorGrabber, recorder, "major", majorStreamActive, executor);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try {
            log.info("===============释放资源中=============");
            if (recorder != null) {
                log.info("===============释放资源中= recorder.stop============");
                recorder.stop();
                log.info("===============释放资源中= recorder.release============");
                recorder.release();
            }
            log.info("===============释放资源中= majorGrabber.stop============");
            majorGrabber.stop();
            log.info("===============释放资源中= readyGrabber.stop============");
            readyGrabber.stop();
            log.info("===============java成功释放cv资源=============");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

private void Execute(FFmpegFrameGrabber majorGrabber, FFmpegFrameRecorder recorder, String major, AtomicBoolean majorStreamActive, ExecutorService executor) throws InterruptedException, IOException {
    if (majorStreamActive.get()) {
        //majorStream
        while (true) {
            if (stopFlag || (!majorStreamActive.get())) {
                break;
            }
            processStream(majorGrabber, "major", majorStreamActive, executor);
        }
    } else {
        //processImageAndSilentAudioStream(recorder,majorGrabber,majorStreamActive);
        //readyStream
        while (true) {

            if (stopFlag || (majorStreamActive.get())) {
                break;
            }
            if (slaveFlag) {
                processStream(slaveGrabber, "slave", majorStreamActive, executor);
            } else {
                processStream(readyGrabber, "ready", majorStreamActive, executor);
            }
        }
    }
    if (!stopFlag) {
        Execute(majorGrabber, recorder, "major", majorStreamActive, executor);
    }
}

private void processStream(FFmpegFrameGrabber grabber, String streamType, AtomicBoolean majorStreamActive, ExecutorService executor) {
    try {
        Frame frame = grabber.grabFrame();
        if (frame == null) {
            throw new Exception("========Failed to grab frame from " + streamType + " stream");
        }
        this.allreadyFlag = true;
        if (streamType.equals("slave")) {
            log.info("==slave===:{}", frame);
        }
        recorder.record(frame);
    } catch (Exception e) {
        log.error("========Error recording frame from " + streamType + " stream:{}", e.getMessage());
        flag = true;
        if (e.getMessage().contains("-32")) {
            try {
                recorder.stop();
                recorder.start();
            } catch (Exception ex) {
                log.info("==restart recorder stop error===:{}", ex.getMessage());
                recorder = null;
                stopFlag = true;
                stopStr = "true";
            }
        }
        if ("major".equals(streamType)) {
            log.info("========Major stream failed, attempting to restart...");
            if (slaveFlag) {
                log.info("======slaveGrabber.restart start===");
                try {
                    slaveGrabber.restart();
                } catch (Exception ex) {
                    log.info("==restart slave stream error===:{}", ex.getMessage());
                }
                log.info("======slaveGrabber.restart end===");
            } else {
                new Thread(() -> {
                    try {
                        log.info("======readyGrabber.restart start===");
                        readyGrabber.restart();
                        log.info("======readyGrabber.restart end===");
                    } catch (Exception ex) {
                        log.info("==restart ready stream error===:{}", ex.getMessage());
                    }
                }).start();
            }
            majorStreamActive.set(false);
            restartMajorStream(grabber, majorStreamActive, executor, slaveFlag);
        } else {
            if (slaveFlag) {
                log.info("======slaveGrabber.restart start===");
                try {
                    slaveGrabber.restart();
                } catch (Exception ex) {
                    log.info("==restart slave stream error===:{}", ex.getMessage());
                }
                log.info("======slaveGrabber.restart end===");
            } else {
                new Thread(() -> {
                    try {
                        log.info("======readyGrabber.restart start===");
                        readyGrabber.restart();
                        log.info("======readyGrabber.restart end===");
                    } catch (Exception ex) {
                        log.info("==restart ready stream error===:{}", ex.getMessage());
                    }
                }).start();
            }
        }
    }
}

private long restartTime_start = 0;

private void restartMajorStream(FFmpegFrameGrabber grabber, AtomicBoolean active, ExecutorService executor, boolean slaveFlag) {
    executor.submit(() -> {
        try {
            if (restartTime_start == 0) {
                restartTime_start = System.currentTimeMillis();
            }
            grabber.restart();
            active.set(true);
            flag = true;
            restartTime_start = 0;
            Thread.sleep(1000);
            // readyGrabber.restart();
            log.info("========Major stream restarted successfully.");
        } catch (Exception e) {
            log.info("========Failed to restart major stream, will try again...");
            log.info("=========liveName:{}== stopFlag:{}==hash:{}================", liveName, stopFlag, stopFlag.hashCode());
            if ((!slaveFlag) && (System.currentTimeMillis() - restartTime_start) > restartLength) {
                log.info("======major stream restart for whole day ====");
                log.info("======now to stop ====");
                return;
            }
            if (this.stopFlag || stopStr.equals("true")) {
                return;
            }
            restartMajorStream(grabber, active, executor, slaveFlag);
        }
    });
}

private void processImageAndSilentAudioStream(FFmpegFrameRecorder recorder, FFmpegFrameGrabber grabber, AtomicBoolean majorStreamActive) throws InterruptedException, IOException {
    // 加载静态图片
    String imageFile = "/Users/nocare/Desktop/imgtestpro/J48HSW25_511_1709715180016.jpeg";
    // 创建视频帧的转换器
    OpenCVFrameConverter.ToMat converter = new OpenCVFrameConverter.ToMat();

    // 读取图片转换为视频帧
    Mat image = opencv_imgcodecs.imread(imageFile);
    Frame frame = converter.convert(image);

    //视频
    new Thread(() -> {
        while (!majorStreamActive.get()) {
            if (stopFlag) break;
            try {
                long s = System.currentTimeMillis();
                log.info("=========video=======");
                log.info("=========video ok=======");
                long e = System.currentTimeMillis();
                Thread.sleep(40); // 按帧率等待
                log.info("Using frame from ready stream (static image) initTime:" + (e - s));
                recorder.record(frame);
                log.info("=======recorder.timestamp:{}==", recorder.getTimestamp());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }).start();
    //音频
    new Thread(() -> {
        ShortBuffer sBuf = ShortBuffer.allocate(1024);
        sBuf.clear();
        while (!majorStreamActive.get()) {
            if (stopFlag) break;
            try {
                long s = System.currentTimeMillis();
                log.info("=========audio=======");
                log.info("=========audio  ok=======");
                long e = System.currentTimeMillis();
                Thread.sleep(23); // 按帧率等待
                log.info("Using frame from ready stream (silent audio) initTime:" + (e - s));
                sBuf.flip();
                recorder.recordSamples(44100, 2, sBuf);
                log.info("=======recorder.timestamp:{}==", recorder.getTimestamp());
                sBuf.clear();

                // recorder.record(silentAudioFrame);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }).start();

}

public Frame createSilentAudioFrame(int sampleRate, int numChannels, int frameSize) {
    Frame silentFrame = new Frame();
    short[] audioData = new short[frameSize]; // 音频缓冲区,全部初始化为0
    silentFrame.samples = new ShortBuffer[]{ShortBuffer.wrap(audioData)};
    silentFrame.sampleRate = sampleRate;
    silentFrame.audioChannels = numChannels;
                   /*  // 创建一个短整型缓冲区,用于存储静默音频数据
                     ShortBuffer silentBuffer = ShortBuffer.allocate(frameSize * numChannels);

                     // 填充缓冲区为零,表示静默
                     for (int i = 0; i < silentBuffer.capacity(); i++) {
                         silentBuffer.put((short) 0);
                     }
                     silentBuffer.rewind();

                     // 创建一个音频帧,并设置相应的属性
                     Frame audioFrame = new Frame();
                     audioFrame.sampleRate = sampleRate;
                     audioFrame.audioChannels = numChannels;
                     audioFrame.samples = new ShortBuffer[]{silentBuffer};*/

    return silentFrame;
}

public void stop() {
    this.stopFlag = true;
    stopStr = "true";
    log.info("============停止 落地任务 stopFlag:{}============", stopFlag);
    if (fFmpegStreamer != null) {
        fFmpegStreamer.stopStreaming();
    }
}

}