staticfloat / Ogg.jl

Julia package to interface with Ogg containers and streams
Other
3 stars 5 forks source link

API changes to support streaming #5

Open ssfrr opened 6 years ago

ssfrr commented 6 years ago

I need to work with some ogg/opus streams decoded on-the-fly from a web server, and also support seeking into some very large files, so I've been thinking about an API that can support streaming. I figured this would also be a good time to see if we can get these packages interoporating with the JuliaAudio stuff.

I have a couple ideas for a new API that I wanted to run by you before I start digging in too deep in implementation.

An OggDecoder could take a filename or IO stream:

dec = OggDecoder("filename.ogg")
# or
dec = OggDecoder(open("filename.ogg"))

Where an OggDecoder is itself a sort of stream, that you could read pages from. Generally though, pages are only used internally, what what we're more interested in is reading packets from a logical stream inside the ogg file and handing them to the decoder.

Internally we'd probably read pages something like this:

page = readpage(dec)
anotherpage = readpage(dec)

You could get a list of all multiplexed streams in a given file, each of which will be a OggPacketStream:

julia> streams(dec)
3-element Array{OggPacketStream,1}:
 OggPacketStream(),
 OggPacketStream(),
 OggPacketStream()

Each OggPacketStream would support readpacket(str::OggPacketStream) which would return the next packet for that stream. In a multiplexed stream I think we'd buffer any stream that wasn't getting read from (maybe a buffer_unused=true kwarg could disable that behavior).

str = streams(dec)[1]
pkt1 = readpacket(str)
pkt2 = readpacket(str)

If its useful it would be pretty easy to also support an iteration API, so you could do for packet in packets(str) or collect(packets(str))

So with that background, a full use-case might look something like this:

using SIUnits
using Ogg
using Opus
using PortAudio

buf = OggDecoder("somefile.opus") do oggdec
    #assume one stream
    str = streams(oggdec)[1]
    OpusDecoder(str) do opdec
        read(opdec)
    end
end

str = PortAudioStream()
write(str, buf)

Integration with FileIO would further simplify the nested decoding bit, so you could replace the first part with just buf = load("somefile.opus")

Some unanswered questions

  1. Seeking
  2. Chained files (ogg physical streams can be concatenated)
  3. How do we know which stream is which in a multiplexed file and which decoder to pass them to? Maybe we parse the skeleton if the file includes it (how many do?)

cc @staticfloat

staticfloat commented 6 years ago

Hey Spencer, this all sounds great. I love the idea of just making this work through the general Iteration API, that definitely seems like the most "Julian" way to approach this.

I don't think we should delve too deeply into the buffering of streams. If the user wants to be able to go backward and look at pages from a different Ogg stream, they should use an IOStream that allows seeking back to the beginning, like a file or something, so that Ogg.jl only decodes the necessary pieces to get the information that the user has asked for.

With regards to seeking, I think we would just need to provide ways to seek to a particular page, then packet, then sample within a file. E.g. if I try to seek to sample N within Opus.jl, it needs a way to tell Ogg.jl to jump to the page containing the proper packet; Hopefully there's an easier way to do this than to iterate through every single page.

Chained files are a tough one; perhaps the best way to do this is to allow for an Ogg decoder to be able to be created from an IOStream with an associated offset. How do other tools deal with this? E.g. does VLC or Audacity recognize concatenated ogg streams?

I have either forgotten or never even knew how to identify streams within an Ogg file. Your guess is as good as mine. :)

ssfrr commented 6 years ago

Regarding buffering, I want to make sure we handle multiplexed streams (e.g. audio+video playback), so I worry about throwing away pages that haven't been explicitly read, as there might be two tasks reading and they could read out-of-order.

Here's a mock of how audio/video playback might work, ignoring synchronization. the audio portion of this is pretty much how things would work, I'm not sure about video playback:

oggdec = OggDecoder("somevideo.ogg")
# conveniently assume we know which stream is which
audio = VorbisDecoder(streams(oggdec)[1])
video = TheoraDecoder(streams(oggdec)[2])

audioout = PortAudioStream()
videoout = SomeVideoWindow()

# for these blocks assume the write side will block the task when its input buffer is full
@async while !eof(audio)
    write(audioout, read(audio, 1024)) # write 1024 frames at a time
end

@async while !eof(video)
    write(videoout, read(video, 10)) # write 10 frames at a time
end

I'm assuming that read(src::VorbisSource, nframes) would internally be reading packets from the underlying ogg stream using readpacket and then decoding them, and readpacket(::OggPacketStream) will in turn be reading pages from the underlying IO stream. So if in reading from the audio stream we come across some pages in the video stream, we can buffer them for later, and then when the video stream needs them we hand them off. Or maybe there's a better way to think about the multiplexed stream case?

It looks like there have been a few issues with VLC handling chained files, and valid use-cases, so it's probably something we should support. They also include some example .ogg files that use chaining, so we can use one as a test case to make sure we handle it properly.

I think seeking in Ogg files is often done with binary search, looking for the page with the right granulepos. In this case seeking the Opus decoder (to a particular audio frame) would seek the stream (to a given page and packet) which in turn would seek the underlying IO stream. I guess this is another case where the stream abstraction is a little iffy - If you ask one stream to seek, it shouldn't affect other streams within a multiplexed file.

Here's another way it could work:

oggdec = OggDecoder("somevideo.ogg")
streammap = Dict{Int32, PageSink}()

for page in pages(oggdec) # this could block the task (esp. if we're streaming from the net)
    if bos(page) # beginning-of-stream
        # isvorbis etc. are placeholders for some future codec detection fanciness. also
        # we'd need some handling for multiple audio tracks, etc.
        # PageSink{T} takes pages and writes packets to its argument::T
        # VorbisSink takes packets and writes audio samples to its argument
        # TheoraSink takes packets and writes video frames to its argument
        if isvorbis(page)
            streammap[page.serial] = PageSink(VorbisSink(PortAudioStream()))
        elseif isopus(page)
            streammap[page.serial] = PageSink(OpusSink(PortAudioStream()))
        elseif istheora(page)
            streammap[page.serial] = PageSink(TheoraSink(SomeVideoWindow()))
        end
    elseif eos(page) # end-of-stream
        # unmap stream, ready for chained stream if it exists
    end
    if page.serial in keys(streammap)
        write(streammap[page.serial], page) # this could block the task if the underlying audio/video buf is full
    end
end

Another thought - I'm thinking that we should represent packets as just Vector{UInt8} when they get handed to the codec, so that the codec packages don't need to depend on Ogg.jl and could be getting their packets from anywhere.

staticfloat commented 6 years ago

Coming back to this after an incredibly long time of the notification email sitting at the bottom of my "julia stuff" email folder, I'm finally ready to re-engage.

After reading through this again, I think the underlying abstraction that we need for the "buffering" issues is a buffered stream that supports multiple read positions; e.g. I have one underlying StreamIO object that I receive from the user, but I create N read positions from it where N is the number of logical Ogg streams that are embedded within it. The buffered stream then simply keeps ahold of all data between the earliest pointer and the latest pointer. This would, of course, destroy all memory if the user never uses a one of the streams, but that can probably be taken care of intelligently (by supporting a close() method on an OggStream object or something perhaps?)

What kind of API do you foresee with chained files? Do we just fold one stream into another? Do we have a "stream discovery" callback that gets invoked whenever a new stream is discovered? How do we make this as simple as possible for the user?

I'm thinking that we should represent packets as just Vector{UInt8} when they get handed to the codec, so that the codec packages don't need to depend on Ogg.jl and could be getting their packets from anywhere.

I 100% agree. I really don't like the idea of having a series of if statements like you have in your second example above; I'd like to keep the packets as plain 'ole data as much as possible.

ssfrr commented 6 years ago

After reading through this again, I think the underlying abstraction that we need for the "buffering" issues is a buffered stream that supports multiple read positions; e.g. I have one underlying StreamIO object that I receive from the user, but I create N read positions from it where N is the number of logical Ogg streams that are embedded within it.

Yeah, this is nice. So if I only care about one logical stream in a multiplexed file, I would only create a read pointer for that one, and the underlying stream would just discard pages from the other streams. I think the only real API change from my first example would be that we'd need an explicit open on the logical streams that would register a reader.

With chained files all the multiplexed logical streams need to end before the next set of streams can begin, and all the logical streams need to have their bos page happen before any of them have data, so in the case where the user gives a stream OggDecoder wouldn't need explicit chaining support, it would just end when all the streams are done, and the user could check if the bitstream had hit eof and if not it could open a new OggDecoder for the next set of streams. If the user gives a filename it's less obvious how to handle that, but maybe that's just a restriction we live with - when given a filename OggDecoder only gets the first link in the chain. So the high-level flow would look like:

open("filename.ogg") do bitstream
    while !eof(bitstream)
        oggdec = OggDecoder(bitstream)
        open(streams(oggdec)[1]) do logstream
            while !eof(logstream)
                # handle packets of logical stream
            end
        end
    end
end

This way in the inner loop you could either do read(logstream, N) directly to get N packets, or you might wrap logstream in a codec like VorbisDecoder(logstream) and then read audio samples directly from the audio decoder.

staticfloat commented 6 years ago

Yeah, that all seems really reasonable to me, and a really nice API. Nice work.