membraneframework / membrane_core

The core of the Membrane Framework, advanced multimedia processing framework
https://membrane.stream
Apache License 2.0
1.22k stars 34 forks source link

RTMP plugin delivers end_of_stream on TCP disconnection #792

Open dmorn opened 2 months ago

dmorn commented 2 months ago

This issue is also related to

The RTMP stream is delivering end of stream when the TCP connection is dropped, see here. For us this is plain wrong logic, as one thing is an error such as a connection drop, one thing is an actual end of stream, in amf terms a deleteStream message.

We're turning output HLS streams into VOD when the transmission is ended, and this is an event you cannot withdraw, once player see that tag they stop reloading the playlist.

Can we add an option tunes this behaviour? Or even better (this is how we deal it in our work)

--- a/lib/membrane_rtmp_plugin/rtmp/source/source.ex
+++ b/lib/membrane_rtmp_plugin/rtmp/source/source.ex
@@ -224,8 +224,9 @@ defmodule Membrane.RTMP.Source do
   def handle_info({:socket_closed, _socket}, ctx, state) do
     cond do
       ctx.pads.output.end_of_stream? -> {[], state}
-      ctx.pads.output.start_of_stream? -> {[end_of_stream: :output], state}
-      true -> {[notify_parent: :unexpected_socket_closed, end_of_stream: :output], state}
+      # This might be a connection error. Only deleteStream message signals that
+      # the transmission is finished.
+      true -> {[notify_parent: :unexpected_socket_closed], state}
     end
   end

Basically we just notify the parent of the event, then the pipeline can decide how the rest of the children should react.

mat-hek commented 2 months ago

Since dropping the connection means we won't send anything anymore from the source, the end of stream is quite expected. I assume that the logic is to remove the element on the connection drop and spawn a new one - in that case, some further element should take care of intercepting the end_of_stream and be linked to the newly spawned source.

dmorn commented 2 months ago

Since dropping the connection means we won't send anything anymore from the source

A connection drop is something related to a problem in the transport layer. It signals a network fault, which in my opinion is unrelated to the end of stream event, which means that everything is done (whereas is not in this case).

I assume that the logic is to remove the element on the connection drop and spawn a new one

Thanks to crash groups & dynamic pads this is very easy to accomplish and it is actually how we're handling it.

element should take care of intercepting the end_of_stream

This element the has to know wether that was a false positive or not. I really think this is very confusing. This is an RTMP plugin, RTMP has a message that signals the real end_of_stream, let's just stick with it @mat-hek!

mat-hek commented 1 month ago

I assume that the logic is to remove the element on the connection drop and spawn a new one

Thanks to crash groups & dynamic pads this is very easy to accomplish and it is actually how we're handling it.

Hmm, let's assume we don't send the end_of_stream. You get the unexpected_socket_closed notification and remove the RTMP source and possibly other elements. When you do that, the first thing that happens is that end_of_stream is generated on the input pad just after the removed elements, so it seems the behaviour is the same anyways 🤔

dmorn commented 1 month ago

Not in case the removed elements and the leftovers are connected with a dynamic pad right?

mat-hek commented 1 month ago

I’d have to check, but I think in this case as well

mat-hek commented 1 week ago

Hi @dmorn, any update on this? BTW I checked and the end of stream is always generated:

Mix.install([:membrane_core])

defmodule Source do
  use Membrane.Source
  def_output_pad :output, flow_control: :manual, accepted_format: _any

  @impl true
  def handle_demand(:output, _size, _unit, _ctx, state) do
    {[], state}
  end
end

defmodule Sink do
  use Membrane.Sink
  def_input_pad :input, availability: :on_request, accepted_format: _any

  @impl true
  def handle_end_of_stream(pad, _ctx, state) do
    IO.puts("End of stream on pad #{inspect(pad)}")
    {[], state}
  end
end

defmodule Run do
  import Membrane.ChildrenSpec
  require Membrane.Pad, as: Pad

  def run() do
    p = Membrane.RCPipeline.start_link!()

    Membrane.RCPipeline.exec_actions(p,
      spec: [
        child(:sink, Sink),
        child(:src1, Source) |> via_in(Pad.ref(:input, 1)) |> get_child(:sink),
        child(:src2, Source) |> via_in(Pad.ref(:input, 2)) |> get_child(:sink)
      ]
    )

    Process.sleep(1000)
    Membrane.RCPipeline.exec_actions(p, remove_children: [:src1])
    Process.sleep(1000)
  end
end

Run.run()