Red5 / red5-server-common

Red5 Servers common classes
Apache License 2.0
47 stars 106 forks source link

Fix for RTMPProtocolDecoder failure when media chunks cross RTMPT requests #45

Closed jessecrossen closed 7 years ago

jessecrossen commented 8 years ago

The bug can be reproduced as follows:

1) Connect to Red5 using RTMPT or RTMPS:

var nc:NetConnection = new NetConnection();
nc.connect('rtmpt://example.com:5080/application');

2) After the connection is made, start recording video using a buffer:

var ns:NetStream = new NetStream(nc);
ns.bufferTime(60);
ns.attachCamera(camera);
ns.attachAudio(mic);
ns.publish('streamid', 'record');

3) Record until there are few seconds of buffer (it helps to slow down your upload speed with tc or similar), then stop and wait for the buffer to drain as follows:

// disconnect sources to allow buffer to drain
ns.attachCamera(null);
ns.attachAudio(null);
// wait for the buffer to be empty and close the stream
setInterval(function():void {
  if (! (ns.bufferLength > 0.0)) {
    ns.close();
  }
}, 500);

4) Note that Red5 does not close and finalize the stream until the connection closes.

After examining trace output, I understand the mechanism like this:

When the connection begins draining in step 3, it's no longer bound by real-time constraints, so chunks are sent as fast as possible. There seems to be a size limit of 8192 bytes per request, and the Flash client will fill this completely if it can. Since this is not an even multiple of common chunk sizes (e.g. 258 bytes), partial chunks appear at the ends of requests.

In the main loop of RTMPProtocolDecoder.decodeBuffer, an instance of RTMPDecodeState is used to track state. This state persists across calls, and the only place it's cleared is here:

if (state.canStartDecoding(remaining)) {
  log.trace("Can start decoding");
  state.startDecoding();
} else {
  log.trace("Cannot start decoding");
  break;
}

Now, suppose a large buffer is decoded and goes into the buffered state on its final chunk. After the final chunk has been processed, the state is left in RTMPDecodeState.DECODER_BUFFER for the next chunk, since we exit the loop and state.startDecoding() is not called again. If the next buffer is sufficiently large, there's no problem, but suppose we get a small buffer containing the closeStream command, which is what happens in the reproduction method above. The state.canStartDecoding test fails, and we wind up waiting until more data comes in, which it won't since recording is finished, so the closeStream call is never parsed.

I can't claim to understand this state machine well enough to suggest a perfect fix, but I've made a patch that seems to work by calling state.startDecoding() before entering the decode loop, like so:

// get the local decode state
RTMPDecodeState state = conn.getDecoderState();
state.startDecoding(); // <= add this line to initialize the state
log.trace("RTMP decode state {}", state);

This does seem to fix the issue without causing other problems. Since the unparsed portion of the previous buffer should be available at this point, I think the only consequence will be to re-read the chunk header at the start of the buffer, and only on the first pass through the loop. However, I haven't tested extensively enough to rule out unintended side effects, which is why I'm filing this as an issue rather than a fork and pull request.

jessecrossen commented 8 years ago

My proposed fix actually causes errors that end the connection in some cases. I'll investigate this, but I believe my analysis of the problem is still close to the mark.

mondain commented 8 years ago

Thanks for digging into this, we await your final analysis.

jessecrossen commented 8 years ago

@mondain I believe I have found the root cause.

I recorded a problematic series of inputs to BaseRTMPTConnection.decode and simulated its action in TestRTMPProtocolDecoder. However, the problem did not occur under test. This suggested that the problem had nothing to do with buffer content. I then instrumented the method by modifying the end like this:

        log.warn("start {}: {}", System.identityHashCode(this.getDecoderState()), this.getDecoderState());
        List<?> result = decoder.decodeBuffer(this, buffer);
        log.warn("finish {}: {}", System.identityHashCode(this.getDecoderState()), this.getDecoderState());
        return(result);

This reveals output like the following from the end of a recording session (simplified for clarity):

start 1398356736: RTMPDecodeState [sessionId=2BBSZBQNDCHY0, decoderState=Buffer, decoderBufferAmount=258]
finish 1398356736: RTMPDecodeState [sessionId=2BBSZBQNDCHY0, decoderState=Ok, decoderBufferAmount=0]
...
start 1690723177: RTMPDecodeState [sessionId=2BBSZBQNDCHY0, decoderState=Buffer, decoderBufferAmount=258]
finish 1690723177: RTMPDecodeState [sessionId=2BBSZBQNDCHY0, decoderState=Buffer, decoderBufferAmount=258]
...
start 1376815404: RTMPDecodeState [sessionId=2BBSZBQNDCHY0, decoderState=Buffer, decoderBufferAmount=258]
finish 1376815404: RTMPDecodeState [sessionId=2BBSZBQNDCHY0, decoderState=Buffer, decoderBufferAmount=258]

So we can see that although the first of these left the state with no pending data, the second two did not. Note that all three have different identity hashes for the state object. Filtering for one of these, we can see that state is consistent for each instance of RTMPDecodeState:

start 1690723177: RTMPDecodeState [sessionId=2BBSZBQNDCHY0, decoderState=Ok, decoderBufferAmount=0]
finish 1690723177: RTMPDecodeState [sessionId=2BBSZBQNDCHY0, decoderState=Ok, decoderBufferAmount=0]
start 1690723177: RTMPDecodeState [sessionId=2BBSZBQNDCHY0, decoderState=Ok, decoderBufferAmount=0]
finish 1690723177: RTMPDecodeState [sessionId=2BBSZBQNDCHY0, decoderState=Ok, decoderBufferAmount=0]
start 1690723177: RTMPDecodeState [sessionId=2BBSZBQNDCHY0, decoderState=Ok, decoderBufferAmount=0]
finish 1690723177: RTMPDecodeState [sessionId=2BBSZBQNDCHY0, decoderState=Buffer, decoderBufferAmount=265]
start 1690723177: RTMPDecodeState [sessionId=2BBSZBQNDCHY0, decoderState=Buffer, decoderBufferAmount=265]
finish 1690723177: RTMPDecodeState [sessionId=2BBSZBQNDCHY0, decoderState=Buffer, decoderBufferAmount=258]
start 1690723177: RTMPDecodeState [sessionId=2BBSZBQNDCHY0, decoderState=Buffer, decoderBufferAmount=258]
finish 1690723177: RTMPDecodeState [sessionId=2BBSZBQNDCHY0, decoderState=Buffer, decoderBufferAmount=258]

I believe this is a design flaw in how the decode state is stored. From RTMPConnection.java:

    // keeps track of the decode state
    protected transient ThreadLocal<RTMPDecodeState> decoderState = new ThreadLocal<RTMPDecodeState>() {

        @Override
        protected RTMPDecodeState initialValue() {
            return new RTMPDecodeState(getSessionId());
        }

    };

So although there's only one instance of the buffer per connection instance, the state which contains information essential for parsing that buffer is stored on a per-thread basis, and evidently threads are being allocated from a pool. If this design is meant to ensure thread safety, it's undermined by the fact that BaseRTMPTConnection.decode does not itself appear to be thread-safe, i.e. there's nothing guarding against flipping the buffer while it's being decoded, or multiple decoders addressing the same buffer at the same time.

I can see at least two ways to resolve this:

  1. Treat the decoder state as local to RTMPProtocolDecoder.decodeBuffer, and re-acquire state from the buffer if needed.
  2. Guarantee there can only be one RTMPDecodeState instance per RTMPConnection instance.

However, I'm not sure what's best given the wider context of the project.

mondain commented 8 years ago

I'd say number 2 is best: Guarantee there can only be one RTMPDecodeState instance per RTMPConnection instance.

mondain commented 7 years ago

Modified to fit option 2 in 1.0.9-SNAPSHOT