asticode / go-astiav

Golang ffmpeg and libav C bindings
MIT License
351 stars 38 forks source link

Could not get rid of a blocking ReadFrame. #72

Closed Mr3h1v closed 1 week ago

Mr3h1v commented 1 week ago

Even i have set inputFormatCtx.SetInterruptCallback() and called Interrupt method of the result.

asticode commented 1 week ago

Can you share some code? It's hard to help you without having more information

Mr3h1v commented 1 week ago

Here is example code. udp ts stream will end after about 30s, and call Interrupt at 35th second, but app still blocking.

package main

import (
    "errors"
    "fmt"
    "log"
    "net"
    "time"

    "github.com/asticode/go-astiav"
)

var (
    cb     astiav.IOInterrupter
    input  = "0.0.0.0:10001"
    output = "output.flv"
)

func main() {
    time.AfterFunc(35*time.Second, func() {
        log.Println("try to interrupt")
        cb.Interrupt()
    })

    addr, _ := net.ResolveUDPAddr("udp", input)
    conn, _ := net.ListenUDP("udp", addr)
    defer conn.Close()

    // Alloc packet
    pkt := astiav.AllocPacket()
    defer pkt.Free()

    // Alloc input format context
    inputFormatContext := astiav.AllocFormatContext()
    if inputFormatContext == nil {
        log.Fatal(errors.New("main: input format context is nil"))
    }
    defer inputFormatContext.Free()

    // Set interrupt callback
    cb = inputFormatContext.SetInterruptCallback()

    // Alloc io context
    ioContext, err := astiav.AllocIOContext(4096, false, conn.Read, nil, nil)
    if err != nil {
        log.Fatal(fmt.Errorf("main: allocating io context failed: %w", err))
    }
    defer ioContext.Free()

    // Store io context
    inputFormatContext.SetPb(ioContext)

    // Open input
    if err := inputFormatContext.OpenInput("", astiav.FindInputFormat("mpegts"), nil); err != nil {
        log.Fatal(fmt.Errorf("main: opening input failed: %w", err))
    }
    defer inputFormatContext.CloseInput()

    // Find stream info
    if err := inputFormatContext.FindStreamInfo(nil); err != nil {
        log.Fatal(fmt.Errorf("main: finding stream info failed: %w", err))
    }

    // Alloc output format context
    outputFormatContext, err := astiav.AllocOutputFormatContext(nil, "flv", output)
    if err != nil {
        log.Fatal(fmt.Errorf("main: allocating output format context failed: %w", err))
    }
    if outputFormatContext == nil {
        log.Fatal(errors.New("main: output format context is nil"))
    }
    defer outputFormatContext.Free()

    // Loop through streams
    inputStreams := make(map[int]*astiav.Stream)  // Indexed by input stream index
    outputStreams := make(map[int]*astiav.Stream) // Indexed by input stream index
    for _, is := range inputFormatContext.Streams() {
        // Only process audio or video
        if is.CodecParameters().MediaType() != astiav.MediaTypeAudio &&
            is.CodecParameters().MediaType() != astiav.MediaTypeVideo {
            continue
        }

        // Add input stream
        inputStreams[is.Index()] = is

        // Add stream to output format context
        os := outputFormatContext.NewStream(nil)
        if os == nil {
            log.Fatal(errors.New("main: output stream is nil"))
        }

        // Copy codec parameters
        if err = is.CodecParameters().Copy(os.CodecParameters()); err != nil {
            log.Fatal(fmt.Errorf("main: copying codec parameters failed: %w", err))
        }

        // Reset codec tag
        os.CodecParameters().SetCodecTag(0)

        // Add output stream
        outputStreams[is.Index()] = os
    }

    // If this is a file, we need to use an io context
    if !outputFormatContext.OutputFormat().Flags().Has(astiav.IOFormatFlagNofile) {
        // Open io context
        ioContext, err := astiav.OpenIOContext(output, astiav.NewIOContextFlags(astiav.IOContextFlagWrite))
        if err != nil {
            log.Fatal(fmt.Errorf("main: opening io context failed: %w", err))
        }
        defer ioContext.Close() //nolint:errcheck

        // Update output format context
        outputFormatContext.SetPb(ioContext)
    }

    // Write header
    if err = outputFormatContext.WriteHeader(nil); err != nil {
        log.Fatal(fmt.Errorf("main: writing header failed: %w", err))
    }

    // Loop through packets
    for {
        // Read frame
        if err = inputFormatContext.ReadFrame(pkt); err != nil {
            if errors.Is(err, astiav.ErrEof) {
                break
            }
            log.Fatal(fmt.Errorf("main: reading frame failed: %w", err))
        }

        // Get input stream
        inputStream, ok := inputStreams[pkt.StreamIndex()]
        if !ok {
            pkt.Unref()
            continue
        }

        // Get output stream
        outputStream, ok := outputStreams[pkt.StreamIndex()]
        if !ok {
            pkt.Unref()
            continue
        }

        // Update packet
        pkt.SetStreamIndex(outputStream.Index())
        pkt.RescaleTs(inputStream.TimeBase(), outputStream.TimeBase())
        pkt.SetPos(-1)

        // Write frame
        if err = outputFormatContext.WriteInterleavedFrame(pkt); err != nil {
            log.Fatal(fmt.Errorf("main: writing interleaved frame failed: %w", err))
        }
    }

    // Write trailer
    if err = outputFormatContext.WriteTrailer(); err != nil {
        log.Fatal(fmt.Errorf("main: writing trailer failed: %w", err))
    }

    // Success
    log.Println("success")
}
asticode commented 1 week ago

FYI the very same code (minus the udp binding) works properly if you replace inputFormatContext.OpenInput("", astiav.FindInputFormat("mpegts"), nil) with inputFormatContext.OpenInput("udp://"+input, astiav.FindInputFormat("mpegts"), nil) and remove the custom io context. Therefore if that fits your original use case, I'd recommend using this instead.

However, if you can't, the reason why this is blocking is because the conn.Read method is blocking and therefore the interrupt callback is never called (the interrupt callback is not called in parallel of the read function). To fix this problem, you need to find a way to provide a read function to AllocIOContext that doesn't block when the stream has stopped.

asticode commented 1 week ago

For instance replacing cb.Interrupt() with conn.Close() in your example actually unblocks everything 👍 Needless to say that you can remove the IO interrupter logic in that case

Mr3h1v commented 1 week ago

Thanks a lot. That is just a simplified example above. I must to use AllocIOContext because i only got an io.Reader In my real problem. I will find a way to close it.