asticode / go-astiav

Golang ffmpeg and libav C bindings
MIT License
393 stars 45 forks source link

Request for help on astiav.IOInterrupter #80

Closed tihomirptotev closed 3 weeks ago

tihomirptotev commented 3 weeks ago

I have an application that spawns several goroutines, each reading from an HTTP URL using go-astiav v0.21.0 and saving the data to a buffer for further processing. For a particular URL, the connection remains open, but no data is received, causing the inputFormatContext.OpenInput method to block for several minutes before returning. To solve this, I create an interrupter using inputFormatContext.SetInterruptCallback() and call interrupter.Interrupt() in another goroutine if the OpenInput method has not returned after a timeout period. This solution works fine.

However, for other HTTP streams, I often encounter an "Immediate exit requested" error from inputFormatContext.ReadFrame without any FFmpeg logs. If I do not set the interrupter callback, no errors are returned from the ReadFrame method. Can you help me with this error, please?

Thanks!

This is the application code:

func (r *StreamReader) readHttpSource(url string) error {

astiav.SetLogLevel(astiav.LogLevelDebug)
astiav.SetLogCallback(func(c astiav.Classer, l astiav.LogLevel, fmt, msg string) {
    //  implementation here
})

inputFormatContext := astiav.AllocFormatContext()
if inputFormatContext == nil {
    return errors.New("ffmpeg input format context is nil")
}
defer inputFormatContext.Free()

inputFlags := astiav.NewFormatContextFlags(astiav.FormatContextFlagDiscardCorrupt, astiav.FormatContextFlagGenPts, astiav.FormatContextFlagNonblock)
inputFormatContext.SetFlags(inputFlags)

outputFormatContext, err := astiav.AllocOutputFormatContext(nil, "mpegts", "")
if err != nil {
    return errs.NewWithMsg("alloc output format context", err)
}
defer outputFormatContext.Free()

ioCtx, err := astiav.AllocIOContext(int(r.contentBuffer.Capacity()), true, nil, nil, r.write)
if err != nil {
    return errs.NewWithMsg("alloc io context", err)
}
defer ioCtx.Free()
outputFormatContext.SetPb(ioCtx)

var errTimeout error
openReturned := make(chan struct{}, 1)
defer close(openReturned)

interrupter := inputFormatContext.SetInterruptCallback()

go func() {
    t := time.NewTimer(r.cfg.Runtime.SourceReader.ConnectTimeout)
    defer t.Stop()
    select {
    case <-openReturned:
        log.Info().Str("name", r.cfg.Name).Msg("source open returned")
        return
    case <-t.C:
        log.Warn().Str("name", r.cfg.Name).Str("source", urlStr).Msg("open input timeout")
        errTimeout = ErrReadSourceTimeout
        interrupter.Interrupt()
        return
    }
}()

err = inputFormatContext.OpenInput(url, astiav.FindInputFormat(OutputFormatName), nil)
openReturned <- struct{}{}
if err != nil {
    err = handleError(err)
    return errs.NewWithMsg("open input", err)
}
defer inputFormatContext.CloseInput()

if errTimeout != nil {
    return errTimeout
}

// Find stream info
if err = inputFormatContext.FindStreamInfo(nil); err != nil {
    return errs.NewWithMsg("find stream info", err)
}

inputStreams := make(map[int]*astiav.Stream)
outputStreams := make(map[int]*astiav.Stream)

for _, is := range inputFormatContext.Streams() {
    inputStreams[is.Index()] = is
    outStream := outputFormatContext.NewStream(nil)
    if outStream == nil {
        return errors.New("can not add stream to output format context")
    }

    if err = is.CodecParameters().Copy(outStream.CodecParameters()); err != nil {
        return errs.NewWithMsg("copy codec parameters", err)
    }

    outStream.CodecParameters().SetCodecTag(0)

    outputStreams[is.Index()] = outStream
}

if err = outputFormatContext.WriteHeader(nil); err != nil {
    return errs.NewWithMsg("write header", err)
}

pkt := astiav.AllocPacket()
defer pkt.Free()

for {
    if err = inputFormatContext.ReadFrame(pkt); err != nil {
        log.Error().Err(err).Type("error type", err).Msg("read frame error type")
        return errs.NewWithMsg("read frame", err)
    }
    //  more code here
}

}

asticode commented 3 weeks ago

I'll be honest I don't see anything wrong in the code you've shared 🤔 Are you using interrupter anywhere else?

tihomirptotev commented 3 weeks ago

This function is the only place I create and use the interrupter.

asticode commented 3 weeks ago

Immediate exit requested can come from different places, but it can come from interrupter.Interrupt() being called. Therefore what seems to be happening is that interrupter.Interrupt() is called for your other HTTP streams. But in the code you've shared, I don't see how it would happen 🤔