tvkitchen / countertop

The entry point for developers who want to set up a TV Kitchen.
https://tv.kitchen
GNU Lesser General Public License v3.0
6 stars 2 forks source link

Track stream time #45

Closed slifty closed 4 years ago

slifty commented 4 years ago

Task

Description

Ingestion engines need to be able to insert time to payloads.

There is some discussion at #42 around handling the concept of time, and this can be done in parallel with that exploration.

The point of this issue is far more narrow: how do we know how much content has been processed so far from a given input. (e.g. for a given payload of data added to kafka, how can we know how far into a stream or file that payload is linked to).

It seems ffmpeg will output "progress" on a regular cadence to a different pipe than the data This is explored a bit in the discussion of Issue #44, and is probably going to be our best bet.

Another option would be to buffer the ffmpeg out and run it through ffprobe on a regular basis. That feels... horrible!

Relevant Resources / Research

None yet.

Related Issues

Issue #42 Issue #44 is an exploration of libraries that can be used to track time

slifty commented 4 years ago

Looks like progress is not nearly granular enough.

e.g.


frame=  366 fps=0.0 q=31.0 size=    1187kB time=00:00:13.59 bitrate= 715.2kbits/s speed=27.1x
frame=  779 fps=777 q=31.0 size=    2456kB time=00:00:27.36 bitrate= 735.1kbits/s speed=27.3x
frame= 1192 fps=792 q=31.0 size=    3660kB time=00:00:41.15 bitrate= 728.5kbits/s speed=27.4x
frame= 1634 fps=815 q=24.8 size=    4873kB time=00:00:55.86 bitrate= 714.5kbits/s speed=27.9x
frame= 2102 fps=839 q=24.8 size=    6229kB time=00:01:11.46 bitrate= 714.0kbits/s speed=28.5x
frame= 2486 fps=827 q=24.8 size=    7507kB time=00:01:24.28 bitrate= 729.7kbits/s speed=  28x
frame= 2944 fps=840 q=31.0 size=    8827kB time=00:01:39.54 bitrate= 726.5kbits/s speed=28.4x
frame= 3043 fps=840 q=31.0 Lsize=    9134kB time=00:01:41.53 bitrate= 737.0kbits/s speed=  28x
Laurian commented 4 years ago

maybe this would not be an issue if the payload would be fixed

slifty commented 4 years ago

Indeed -- if we could have each payload represent a single frame (and specify the frame rate) I think we would be in fine shape. I'll look into that now.

Laurian commented 4 years ago

and have some metadata attached just to know what's up; btw look at how AWS does this https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/how-data.html

slifty commented 4 years ago

the metadata filter may be of interest.

slifty commented 4 years ago

I made some progress with metadata exploration, and indeed it will spew out frame numbers, the problem is that the output is buffered so node gets like 80 frames updates at once.

    getFfmpegSettings = () => [
        '-y',
        '-fflags', 'nobuffer',
        '-loglevel', 'info',
        '-i', this.getInputLocation(),
        '-f', 'mpegts',
        '-vf', 'fps=fps=30,signalstats,metadata=print:key=lavfi.signalstats.YDIF:file=\'pipe\\:3\'',
        '-',
    ]

I spoke with @traceypooh over slack since she is truly an expert on this stuff, and she mentioned that we could / should look into piping the ffmpeg output through ffprobe to split the stream into packets (which will have the presentation timestamp (PTS) and decoded timestamp (DTS) as part of it)

slifty commented 4 years ago

I've been digging into ffprobe this morning (and have a followup conversation with @traceypooh this afternoon), but here's what I've faced so far.

First: simply piping ffmpeg output into ffprobe isn't working as I would expect. I'm not sure if I'm missing something fundamental, or if maybe mpegts is just not something ffprobe knows how to work with.

Second the output of ffprobe seems to split up the streams. This is not something I want to do at this point in the pipeline, since it would make certain types of appliance much harder (or rather we would have to make an appliance that re-combines the stream).


Given the above, my next experiment is the following:

  1. Ingestion engines should never pass an input string to ffmpeg.

  2. Instead, ingestion engines will always pipe data to ffmpeg.

  3. Ingestion engines will FIRST pipe data to ffprobe, while also locally buffering the data sent so far. When the ingestion engine gets packet data from ffprobe, it will not the time, and flush the buffer to ffmpeg.

This makes me very uncomfortable and feels gross -- I'm worried that things will be too far out of sync -- but at this point I honestly just want some kind of baseline of a data stream that has timestamps.

slifty commented 4 years ago

ffprobe moves much faster than ffmpeg (no surprise), so I have to be a little more thoughtful about control flows.

I did a really nasty test (that works) that would throttle data being passed to both ffmpeg and ffprobe so that it would only send new data every 50ms. The ffprobe output was tracked and the PTS was parsed. The PTS was then used in the ffmpeg-to-kafka handler to timestamp the messages.

This worked! But it's a little too horrible and fragile (50 ms is arbitrary for instance).

Here's my plan now:

  1. Read data from input stream. Write it to ffmpeg AND a buffer.
  2. Once ffmpeg returns a value, PAUSE the input stream, flush the buffer to ffprobe.
  3. Once ffprobe returns a value, RESUME the input stream.

Repeat.

Lets see if that works too.

reefdog commented 4 years ago

If you have any need to throttle the input of a local file, you can use the -re input flag (must go before the -i) to read in the video's native realtime rather than as fast as the data can be loaded from disk. That's not ideal for the final version of course, but may help during exploration.

slifty commented 4 years ago

Appreciated @reefdog though I suspect that will throttle a bit too much. ffmpeg is able to process this file at around 26x speed so I would want to throttle to exactly that amount (so ffprobe is moving at 26x speed too, instead of the 100x speed I think it's moving at)

chriszs commented 4 years ago

Okay, jammed together some Beamcoder examples until they worked. The syntax could be prettier (i.e. stream semantics instead of a loop), but you get the gist:

const beamcoder = require('beamcoder');
const fs = require('fs');

(async function () {
    const inStream = fs.createReadStream('in.ts');
    const demuxStream = await beamcoder.demuxerStream({});
    inStream.pipe(demuxStream);

    const demuxer = await demuxStream.demuxer({ options: {} });

    const outStream = fs.createWriteStream('out.ts');
    const muxerStream = beamcoder.muxerStream({});
    muxerStream.pipe(outStream);

    const muxer = muxerStream.muxer({ format_name: 'mpegts' });

    for (stream of demuxer.streams) {
        muxer.newStream(stream);
    }

    await muxer.openIO();
    await muxer.writeHeader();

    let packet = {};
    while (packet !== null) {
        packet = await demuxer.read();
        if (packet) {
            console.log(JSON.stringify(packet));
            await muxer.writeFrame(packet);
        }
    }

    await muxer.writeTrailer();
})();

(Edit: Cleaned this up a bit. Now requires ts files for streaming. Made a sample one with ffmpeg -i in.mp4 -c copy in.ts.)

For a given in.ts it outputs an out.ts that looks identical, as well as a line like this for every packet (where packets are typically frames or audio):

{
  "type": "Packet",
  "pts": 6222848,
  "dts": 6222848,
  "size": 396,
  "stream_index": 0,
  "flags": {
    "KEY": true,
    "CORRUPT": false,
    "DISCARD": false,
    "TRUSTED": false,
    "DISPOSABLE": false
  },
  "duration": 1029,
  "pos": 9722577
}
traceypooh commented 4 years ago

ooh niiiiiice! 😍

traceypooh commented 4 years ago

testing for mpegts is key - nice job!

chriszs commented 4 years ago

Thanks, I had to switch to ts because the mp4 muxer choked on streaming output. Turns out it needs random access. Supposedly you can configure it to work with fragments, but I wasn't getting very far and figured ts was what you're using anyway.

reefdog commented 4 years ago

This is so cool @chriszs!

The MPEG-TS input requirement is nbd; we'd just put a remuxer step in front of it to convert other container formats into MPEG-TS.

Do we knew if this solution would have similar limitations on encoding formats of the underlying audio/video streams? We'd originally planned to normalize the encoding formats of the A/V streams to a single consistent standard, so that's still on the table, but I was (naively?) hoping that every part of the pipeline could basically accept any encoding format (within reason) so we could skip a normalizing pass.

That discussion is outside of the scope of this issue, but I'm just curious if your experimentation revealed any obvious encoding format limitations using Beamcoder.

chriszs commented 4 years ago

Seems like there aren't inherent format limits in Beamcoder, it should handle most of what ffmpeg handles.

In this case, I switched to ts output because I couldn't get it to produce a usable mp4 while streaming ("muxer does not support non seekable output" was the error I got). Then I switched to ts input because the output was glitching otherwise, presumably there's some difference between an mp4 and mpegts just copying the frames wasn't accounting for (the result was hilarious but not usable).

So assuming the format can survive streaming, it should pass through this okay. If it's like the default, non-fragmented mp4 format and needs "seekable output," there may be some way to either change the format to make it work or, failing that, do some sort of conversion. Not sure what's involved in that.

There's also a non-streaming Beamcoder method designed for file writing, which the first revision of my example used. That worked fine with mp4.

slifty commented 4 years ago

Chris that's amazing!

The past 24 hours have been very good for this issue... there are four paths, I'm going to try to summarize them and the risks / tradeoffs I see in them:

1) Beamcoder

As @chriszs showed above, it is possible to use beamcoder to directly work with ffmpeg libraries for muxing and demuxing. This means we wouldn't have to fight with pipes and buffers when trying to extract the PTS from the mpegts stream. Chris's exploration dramatically de-risked this path! (thank you again)

Questions / Tradeoffs

The answer to this sounds like it's "nope!" but even if it's "maybe" I think in the worst case we could fall back on using an ffmpeg command line to do the transcoding and wrapping in mpegts, and then beamcoder to just demux -> PTS parse -> remux the ffmpeg output.

This doesn't make me worry!

It's GPL 3.0 -- I don't know what that means exactly, to be honest. Would we have to be GPL 3.0 too?

2) ffmpeg filter

Thanks to @mhirsch's advice I looked into the metadata ffmpeg filter, and did get it to pipe out frame level data in parallel to the ffmpeg output. The issue I faced was that the metadata output was being buffered to the point that I was getting dozens / hundreds of frames of data at a time, which was not granular enough for this problem.

@mhirsh mentioned that there are tools to force data events from pipes more frequently.

Questions / Tradeoffs

Assuming the buffer problem can be solved, the big downside of this approach is that the two pipes are not explicitly synchronized. We would be using the "latest" output from the metadata pipe to parse the DTS, but it is not actually driven by the "current" mpegts packet that our recorder is sending to kafka.

This opens the door for drift, and makes accuracy of the extracted times forever dubious.

3) ffprobe pipe

After facing the buffer problem, I looked into an ffprobe pipe solution, in the same spirit of (2) above -- reading the DTS from a parallel pipe.

This worked better than my first experiment with filter since it didn't have the buffer problems, but it has all the same problems as (2) plus it takes a few moments for ffprobe to start spewing out data.

4) Custom C MPEGTS parser

@traceypooh is also amazing and has a little module she wrote for parsing mpegts streams. She took this issue as an excuse to refine it a bit and might walk out the other end with a little tool that can parse streams and spit back data!

Assuming this tool can take parts of streams (instead of having to have data piped to it) we might be able to create a synchronized pipeline that avoids the sync concerns of (3) and (4).

5) Node implementation of custom C

Lol we probably don't want to do this, but listing it anyway. We could write our own nodeJs mpegts parser (or find one already written) and pipe the ffmpeg output through that to extract DTS.

slifty commented 4 years ago

TL;DR it's looking like beamcoder is the most promising and robust at this point, and won't have the kinds of sync concerns that (2) and (3) face. My biggest concern is the license (?) -- can we use it while still publishing apache2? I just don't actually know...

I'm gonna try integrating beamcoder into AbstractIngestionEngine and see how it plays. @chriszs would it be OK if I mark you as a co-author for whatever commits come out of it?

(4) and (5) aren't ready today so they are things we would probably want to consider a little more down the line.

chriszs commented 4 years ago

Sure, that’s fine.

slifty commented 4 years ago

Oh, leaving this here just so it isn't lost for the ages:


        this.ffmpegProcess = spawn('ffmpeg', this.getFfmpegSettings())
        this.ffprobeProcess = spawn('ffprobe', this.getFfprobeSettings())
        this.inputStream = this.getInputStream()

        this.inputStream.on('data', (rawData) => {
            this.inputStream.pause()
            if(rawData) {
                this.ffmpegProcess.stdin.write(rawData)
            }
            this.inputStream.resume()
        })

        this.ffmpegProcess.stdout.on('data', (mpegtsData) => {
            this.ffmpegProcess.stdout.pause()
            this.ffprobeProcess.stdin.write(mpegtsData)
            console.log(this.currentPts)
            this.ffmpegProcess.stdout.resume()
        })

        this.ffprobeProcess.stdout.on('data', (probeData) => {
            this.ffprobeProcess.stdout.pause()
            const lastLine = probeData.toString().trim().split('\n').slice(-1).pop()
            const lastPacket = lastLine.split(',')
            const pts = lastPacket[4]
            console.log(`FFPROBE: ${pts}`)
            this.currentPts = pts
            this.ffprobeProcess.stdout.resume()
        })

This is the ffprobe pipe experiment I mentioned in (3) above.

mhirsch commented 4 years ago

If synchronizing pipes is a challenge, why not not put the output into two pipes? Use stdout and parse out the different messages with some kind of regex.

slifty commented 4 years ago

@mhirsch that's a great point!

... but wait, what if the mpegts encoding happens to randomly contain a perfect metadata string. (this is a very funny joke)

Blast. Now I'm a little more torn on the tradeoffs...

I'm going to implement both, starting with beamcoder. We can compare the PRs.

chriszs commented 4 years ago

Another thing to mention is the packet object returned by Beamcoder includes the packet data in a field called data. It's not represented in the JSON, but it's there.

slifty commented 4 years ago

What a saga.

So this morning Chris flagged that we should look at node-based demuxers one more time.

Lo and behold, ts-demuxer does exactly what we want, without quite the weight of beamcoder.

        const demuxer = new TSDemuxer((packet) => {
          switch (packet.stream_id) {
            case 0xE0: // Video packet
                this.currentDts = packet.dts
                break;
          }
        });

I think this is the winning strategy for now -- though we may well find beamcoder useful down the line (especially in appliances).

I'm going to finally create a PR to knock this out for good.