ffmpeginteropx / FFmpegInteropX

FFmpeg decoding library for Windows 10 UWP and WinUI 3 Apps
Apache License 2.0
212 stars 53 forks source link

Decode Live H264 Byte Arrays #358

Closed briotto-deloitte closed 1 year ago

briotto-deloitte commented 1 year ago

Hi, I am trying to decode raw h264 data that is being sent as part of FlatBuffer messages sent over WebSocket (this is required because there is other information about those frames embedded within the overall message). Because of that, I essentially need to create a method that takes in chunks (byte[]) of h264 data and feeds them to a decoder that will process it into a displayable format. Overall, this feels similar to https://github.com/microsoft/FFmpegInterop/issues/165 and I've tried to implement a solution based off of that. My current process is to

However, I am running into the same issue that that person was running into where it will play whatever is in that first packet and then stalling. I assume this is because of what was mentioned here

You absolutely cannot use InMemoryRandomAccessStream for your scenario. As I wrote, you need to get the IInputStream from your WebSocket. Then have ffmpeg read directly from that IInputStream, by providing a custom FileStreamRead implementation. Only this will make sure that ffmpeg will not read beyond what is available.

I also see this as a suggestion

After thinking about it some more, the easiest thing that might be done would be to mimic the way we wrap the RandomAccessStream into an IStream interface and wrap the WebSocket inputstream. We could then have a CreateMediaStreamSource taking a WebSocket as an input parameter.

I could also resend the data to myself over TCP and call CreateFFmpegInteropMSSFromUri on that, but that seems less than ideal for many reasons. Based on my situation, what would be the best path forward?

brabebhin commented 1 year ago

Hi,

The IRandomAccessStream wrapper over the web socket is the way forward. Not much has changed in terms of interfacing since 6 years ago.

briotto-deloitte commented 1 year ago

Hi,

I don't think I can quite do that since I am not using a stream WebSocket and I have to extract out the actual h264 data from the messages. Am I mistaken in that? So I assume I have to create some class that writes the incoming byte[] to an IInputStream (kind of simulating a WebSocket) and then put an IRandomAccessStream around that? Or perhaps just implement my own IRandomAccessStream that provides a method for writing byte[] to it? Thanks for the help, I appreciate it. Do you know of any examples of IRandomAccessStream wrappers that would help in doing that?

briotto-deloitte commented 1 year ago

This is what I have right now:

class MemoryRandomAccessStream : IRandomAccessStream
{
    private Stream m_InternalStream;

    public MemoryRandomAccessStream(Stream stream)
    {
        m_InternalStream = stream;
    }

    public MemoryRandomAccessStream(byte[] bytes)
    {
        m_InternalStream = new MemoryStream(bytes);
    }

    public IInputStream GetInputStreamAt(ulong position)
    {
        m_InternalStream.Seek((long)position, SeekOrigin.Begin);

        return this.m_InternalStream.AsInputStream();
    }

    public IOutputStream GetOutputStreamAt(ulong position)
    {
        this.m_InternalStream.Seek((long)position, SeekOrigin.Begin);

        return this.m_InternalStream.AsOutputStream();
    }

    public ulong Size
    {
        get { return (ulong)this.m_InternalStream.Length; }
        set { this.m_InternalStream.SetLength((long)value); }
    }

    public bool CanRead
    {
        get { return true; }
    }

    public bool CanWrite
    {
        get { return true; }
    }

    public IRandomAccessStream CloneStream()
    {
        throw new NotSupportedException();
    }

    public ulong Position
    {
        get { return (ulong)this.m_InternalStream.Position; }
    }

    public void Seek(ulong position)
    {
        this.m_InternalStream.Seek((long)position, 0);
    }

    public void Dispose()
    {
        this.m_InternalStream.Dispose();
    }

    public Windows.Foundation.IAsyncOperationWithProgress<IBuffer, uint> ReadAsync(IBuffer buffer, uint count, InputStreamOptions options)
    {
        var inputStream = this.GetInputStreamAt(0);
        return inputStream.ReadAsync(buffer, count, options);
    }

    public Windows.Foundation.IAsyncOperation<bool> FlushAsync()
    {
        var outputStream = this.GetOutputStreamAt(0);
        return outputStream.FlushAsync();
    }

    public Windows.Foundation.IAsyncOperationWithProgress<uint, uint> WriteAsync(IBuffer buffer)
    {
        var outputStream = this.GetOutputStreamAt(0);
        return outputStream.WriteAsync(buffer);
    }
}

I instantiate that with a MemoryStream, which I write the byte arrays to as they come in. That seems to stop the FFmpegInteropMSS from erroring out and I seem to keep getting frames output. However, the video display doesn't update past the first frame. I assume there's something wrong with my implementation to where it's not actually getting new frames.

brabebhin commented 1 year ago

MemoryStream can be exposed directly as IRandomAccessStream. My guess is that your FfmpegMediaSource runs out of frames and reports end of stream. You can see debug messages in your output windows.

lukasf commented 1 year ago

It is not going to work like that. See discussion #245. We currently don't have a solution for your scenario.

First, when we use an IRandomAccessStream to create the media source, we expect it to be seekable (hence the name). But your stream cannot be seeked. FFmpeg will try to seek multiple times already during init. This will fail, and you won't even be able to start playing.

Second, we expect a continuous stream, not a stream that returns only a single data sample. Creating a stream only over the first sample does not work.

If we'd want to support this scenario in our lib, as a first step we'd need explicit support for IInputStream. Then we could configure FFmpeg to not use seeking. The IInputStream API is much simpler to implement, compared to IRandomAccessStream.

What remains is the issue that you need to provide a continuous stream of data. So even if we'd provide a IInputStream API, you'd need some infrastructure like e.g an async queue. You push samples to the queue as they arrive. When data is read from the IInputStream, you need to await the async queue until it has data, then return the next chunk of data.

lukasf commented 1 year ago

Please note: Socket APIs are highly optimized and used heavily throughout web and cloud. Pushing your data to a TCP or UDP socket and opening that with our lib should work out-of-the-box and I don't think you will see any performance issues. But this only works on Win32 apps, not for UWP, due to local loopback restrictions.

brabebhin commented 1 year ago

I think it's possible to go down that route, you just need to optimize your stream.

The reads need to be blocking - if you don't have enough data in your stream, you need to block the read until you get it.

As for the seek, you can buffer data and seek back into the stream when needed, or ahead (blocking as for reads). For a demo this can work, but you may want to buffer to the disk instead.

But this essentially equals to a plain socket steam.

briotto-deloitte commented 1 year ago

Interesting, ok, that's helpful. So I think I am making some progress with implementing my own IRandomAccessStream. I am at least able to get it to play all the way through (I build up ~10-20 samples at the beginning to get it going because it definitely seeks out pretty far when it starts up). I made it so that it'll provide previous data if it tries to read past the end and if it tries to seek past the end it just won't do anything. However, the video definitely is extremely distorted doing it that way, so I must need to tinker with how I'm doing it or just block the read until the data gets in there like you said.

briotto-deloitte commented 1 year ago

Here is where my implementation is at right now

class MemoryRandomAccessStream : IRandomAccessStream
{
    private Stream m_InternalStream;

    private ulong _readPosition;

    public MemoryRandomAccessStream(Stream stream)
    {
        m_InternalStream = stream;
    }

    public MemoryRandomAccessStream(byte[] bytes)
    {
        m_InternalStream = new MemoryStream(bytes);
    }

    public IInputStream GetInputStreamAt(ulong position)
    {
        m_InternalStream.Seek((long)position, SeekOrigin.Begin);

        return this.m_InternalStream.AsInputStream();
    }

    public IOutputStream GetOutputStreamAt(ulong position)
    {
        this.m_InternalStream.Seek((long)position, SeekOrigin.Begin);

        return this.m_InternalStream.AsOutputStream();
    }

    public ulong Size
    {
        get { return (ulong)this.m_InternalStream.Length; }
        set { this.m_InternalStream.SetLength((long)value); }
    }

    public bool CanRead
    {
        get { return true; }
    }

    public bool CanWrite
    {
        get { return true; }
    }

    public IRandomAccessStream CloneStream()
    {
        throw new NotSupportedException();
    }

    public ulong Position
    {
        get { return (ulong)this.m_InternalStream.Position; }
    }

    public void Seek(ulong position)
    {
        this.m_InternalStream.Seek((long)(position > Size ? Position : position), 0);
        Debug.Log($"attempted to seek to {position}. Instead went to {_readPosition}");
    }

    public void Dispose()
    {
        this.m_InternalStream.Dispose();
    }

    public Windows.Foundation.IAsyncOperationWithProgress<IBuffer, uint> ReadAsync(IBuffer buffer, uint count, InputStreamOptions options)
    {
        Debug.Log($"requesting {count} bytes from position {Position} from stream with size {Size}");
        if ((Position + count) > Size)
        {
            Debug.Log($"Requesting past end. Providing  bytes provided previously");
            var inputStream = GetInputStreamAt(Size - count);
            return inputStream.ReadAsync(buffer, count, options);
        }
        else
        {
            Debug.Log($"providing requested data");
            var inputStream = this.GetInputStreamAt(Position);
            return inputStream.ReadAsync(buffer, count, options);
        }
    }

    public Windows.Foundation.IAsyncOperation<bool> FlushAsync()
    {
        var outputStream = this.GetOutputStreamAt(0);
        return outputStream.FlushAsync();
    }

    public Windows.Foundation.IAsyncOperationWithProgress<uint, uint> WriteAsync(IBuffer buffer)
    {
        var outputStream = this.GetOutputStreamAt(Size);
        return outputStream.WriteAsync(buffer);
    }
}
brabebhin commented 1 year ago

You may want to implement a producer-consumer queue behind the stream. This can help block the reads until your have enough data.

briotto-deloitte commented 1 year ago

Ok, so blocking on the read allows it to work without messing with the seek or anything.

class MemoryRandomAccessStream : IRandomAccessStream
{
    private Stream m_InternalStream;

    private ulong _readPosition;

    public MemoryRandomAccessStream(Stream stream)
    {
        m_InternalStream = stream;
    }

    public MemoryRandomAccessStream(byte[] bytes)
    {
        m_InternalStream = new MemoryStream(bytes);
    }

    public IInputStream GetInputStreamAt(ulong position)
    {
        m_InternalStream.Seek((long)position, SeekOrigin.Begin);

        return this.m_InternalStream.AsInputStream();
    }

    public IOutputStream GetOutputStreamAt(ulong position)
    {
        this.m_InternalStream.Seek((long)position, SeekOrigin.Begin);

        return this.m_InternalStream.AsOutputStream();
    }

    public ulong Size
    {
        get { return (ulong)this.m_InternalStream.Length; }
        set { this.m_InternalStream.SetLength((long)value); }
    }

    public bool CanRead
    {
        get { return true; }
    }

    public bool CanWrite
    {
        get { return true; }
    }

    public IRandomAccessStream CloneStream()
    {
        throw new NotSupportedException();
    }

    public ulong Position
    {
        get { return (ulong)this.m_InternalStream.Position; }
    }

    public void Seek(ulong position)
    {
        m_InternalStream.Seek((long)position, 0);
    }

    public void Dispose()
    {
        this.m_InternalStream.Dispose();
    }

    public Windows.Foundation.IAsyncOperationWithProgress<IBuffer, uint> ReadAsync(IBuffer buffer, uint count, InputStreamOptions options)
    {
        Debug.Log($"requesting {count} bytes from position {Position} from stream with size {Size}");
        var initialPosition = Position;
        var attempted = false;
        while (count + initialPosition > Size) 
        {
            if (!attempted)
            {
                Debug.Log($"failed to read data, waiting");
                attempted = true;
            }
        }
        Debug.Log($"providing requested data. Required multiple attempts: {attempted}");
        var inputStream = GetInputStreamAt(initialPosition);
        return inputStream.ReadAsync(buffer, count, options);
    }

    public Windows.Foundation.IAsyncOperation<bool> FlushAsync()
    {
        var outputStream = this.GetOutputStreamAt(0);
        return outputStream.FlushAsync();
    }

    public Windows.Foundation.IAsyncOperationWithProgress<uint, uint> WriteAsync(IBuffer buffer)
    {
        var outputStream = this.GetOutputStreamAt(Size);
        return outputStream.WriteAsync(buffer);
    }
}

I'm sure this could be cleaned up by having the producer-consumer queue. I also probably should figure out how to not just have the MemoryStream grow forever. But this at least plays correctly!

brabebhin commented 1 year ago

You may also want to implement the Stream class from Dotnet directly, and use the AsRandomAccessStream converter method to obtain the proper interface.

As for the grow forever part, I think ffmpeg shouldn't need seeks past the initial setup, unless you allow users to seek manually into the stream. This basically allows you to discard old frames to disk.

briotto-deloitte commented 1 year ago

As in rather than implement the IRandomAccessStream inteface, create a class inheriting from System.IO Stream and override the virtual methods to have similar functionality to what I did and pass MyCustomStream.AsRandomAccessStream into FFmpegInteropMSS?

For the grow forever part, the server is already saving data to disk, so I shouldn't really need to do that. So I guess I can probably just toss stuff after the initial setup. If I want to get fancy, I may allow them to seek back some X number of seconds or optionally write to disk.

Thanks for the help!

brabebhin commented 1 year ago

Yes, implementing Stream is more natural in c# world/dotnet. You don't need to deal with the IAsyncOperation stuff.

Thanks for sharing the demo code.

lukasf commented 1 year ago

I think it would still be better if we'd configure ffmpeg to treat the stream as non-seekable. Then you wouldn't need to do any buffering for initialization and just hand out data.

Sooner or later we should add this as a feature. It is a scenario I would like to support in the lib.

briotto-deloitte commented 1 year ago

I made a brief attempt at overriding MemoryStream to accomplish the same thing, but it didn't work as easily as I expected, so I abandoned that for now.

brabebhin commented 1 year ago

You don't need to override MemoryStream, you need to override Stream instead of IRandomAccessStream. Then the rest of the logic remains the same. Your "Stream" will also contain a "MemoryStream" just like your current IRandomAccessStream.

brabebhin commented 1 year ago

@lukasf I can work on that once the current PRs are merged.