membraneframework / membrane_core

The core of the Membrane Framework, advanced multimedia processing framework
https://membrane.stream
Apache License 2.0
1.22k stars 34 forks source link

Add support for SRT ingest (instead of RTMP) #808

Open JustJoostNL opened 1 month ago

JustJoostNL commented 1 month ago

Hi there,

Would it be possible to add support for SRT ingest instead of RTMP? SRT is way better than RTMP in terms of latency.

mat-hek commented 1 month ago

Hi, we currently use WebRTC for low-latency ingest. Regarding SRT, there are no plans for now, but my friend created Elixir bindings: https://github.com/Qizot/ex_libsrt. They should be easy to wrap in Membrane elements.

JustJoostNL commented 1 month ago

@mat-hek Thanks for sharing that library. Would you be able to give a small example on how to integrate that with Membrane?

mat-hek commented 1 month ago

Ok, so I played with it for a while and got a POC of a server. It only handles the happy path and single stream, but it seems to work. It uses FFmpeg as a client, receives stream via SRT and saves it to out.mp4

Mix.install([
  {:ex_libsrt, github: "Qizot/ex_libsrt"},
  # small fix for kim-company/membrane_mpeg_ts_plugin
  {:membrane_mpeg_ts_plugin, github: "mat-hek/membrane_mpeg_ts_plugin", branch: "patch-1"},
  :membrane_aac_plugin,
  :membrane_h26x_plugin,
  :membrane_mp4_plugin,
  :membrane_file_plugin
])

defmodule Membrane.SRT.Source do
  use Membrane.Source

  alias ExLibSRT.Server

  def_output_pad(:output, accepted_format: Membrane.RemoteStream, flow_control: :push)

  def_options(port: [spec: :inet.port_number()], ip: [spec: String.t(), default: "0.0.0.0"])

  @impl true
  def handle_playing(_ctx, opts) do
    {:ok, server} = Server.start(opts.ip, opts.port)
    IO.inspect(:started)
    {[stream_format: {:output, %Membrane.RemoteStream{}}], %{server: server}}
  end

  @impl true
  def handle_info({:srt_server_connect_request, _address, _stream_id}, _ctx, state) do
    IO.inspect(:accepting)
    :ok = Server.accept_awaiting_connect_request(state.server)
    IO.inspect(:accepted)
    {[], state}
  end

  @impl true
  def handle_info({:srt_data, _conn_id, payload}, _ctx, state) do
    {[buffer: {:output, %Membrane.Buffer{payload: payload}}], state}
  end

  @impl true
  def handle_info({:srt_server_conn_closed, _conn_id}, _ctx, state) do
    {[end_of_stream: :output], state}
  end

  @impl true
  def handle_info(message, _ctx, state) do
    IO.inspect(message, label: :message)
    {[], state}
  end
end

defmodule Pipeline do
  use Membrane.Pipeline

  @impl true
  def handle_init(_ctx, opts) do
    spec =
      child(%Membrane.SRT.Source{port: opts[:port]})
      |> child(:demuxer, Membrane.MPEG.TS.Demuxer)

    {[spec: spec], %{}}
  end

  @impl true
  def handle_child_notification({:mpeg_ts_pmt, pmt}, :demuxer, _context, state) do
    streams_spec =
      Enum.map(pmt.streams, fn {id, %{stream_type: type}} ->
        get_child(:demuxer)
        |> via_out(Pad.ref(:output, {:stream_id, id}))
        |> then(
          &case type do
            :H264 ->
              &1 |> child(%Membrane.H264.Parser{output_stream_structure: :avc1})

            :AAC ->
              &1 |> child(%Membrane.AAC.Parser{out_encapsulation: :none, output_config: :esds})
          end
        )
        |> get_child(:mp4)
      end)

    spec =
      [
        child(:mp4, Membrane.MP4.Muxer.ISOM)
        |> child(:sink, %Membrane.File.Sink{location: "out.mp4"})
      ] ++ streams_spec

    {[spec: spec], state}
  end

  @impl true
  def handle_element_end_of_stream(:sink, _pad, _ctx, state) do
    {[terminate: :normal], state}
  end

  @impl true
  def handle_element_end_of_stream(_element, _pad, _ctx, state) do
    {[], state}
  end
end

{:ok, supervisor, _pipeline} = Membrane.Pipeline.start_link(Pipeline, port: 1234)
Process.monitor(supervisor)
Process.sleep(1000)

url =
  "https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples/big-buck-bunny/bun33s.mp4"

{_output, 0} = System.shell("ffmpeg -re -i #{url} -c copy -f mpegts srt://127.0.0.1:1234")

receive do
  {:DOWN, _ref, _type, ^supervisor, _reason} -> :ok
end