Closed oracularhades closed 11 months ago
Hi there! There are two things that stand out to me at a quick glance.
The first is that you are saving the raw bytes coming back over RTMP. An FLV
file requires more than just the raw FLV segments. If you see This wikipedia page you will see that valid FLV flies require a header.
After the header you then have the FLV packets themselves, but crucially the data
bytes sent by RTMP are not these directly. If you see the fields table you see: size of previous packet, packet type, payload size, timestamp lower, timestamp upper, stream id, and payload data.
The data
value being passed in only represents payload data
. It does not contain any of the information for the previous fields. Those fields are essentially handled by the RTMP protocol itself.
So, in order to save this data as a valid FLV file you will need to still do some work to conform to the FLV specification (Official specification is here. Specifically you'll need to
data
valuedata
bytesThat should work. Note that you are also spinning up a new file writer in a new thread for every packet received, which can cause ordering or some other weirdness (and isn't compatible with tag size part of the headers). So you'll really want to make sure each segment is written one at a time in a single threaded manner.
ah, yeah, I had a hunch that data variable was RTMP data and not raw video data. Thank you so much for your help! I'll close this issue for now and re-open it if I need help.
Sorry for immediately reopening, just remembered. "is written one at a time in a single threaded manner.", if I don't use a thread, then I get the below error. I suspect this is because the buffer is overflowing. To get around this, could I use the threading system but make sure the video data is put together 1 by 1 later? It seems the simplest way to handle not overflowing the buffer and would be fine on my setup because the server is only ingesting and does nothing else.
code:
fn handle_audio_video_data_received(
&mut self,
stream_key: String,
timestamp: RtmpTimestamp,
data: Bytes,
data_type: ReceivedDataType,
server_results: &mut Vec<ServerResult>,
) {
{
let file_path = "/Users/josh/Documents/video_file.flv";
let cloned_data = data.clone();
let mut file = match OpenOptions::new().create(true).append(true).open(file_path) {
Ok(file) => file,
Err(err) => {
eprintln!("Error opening the file: {}", err);
return;
}
};
// Write the video stream data to the end of the file.
if let Err(err) = file.write_all(&cloned_data) {
eprintln!("Error writing to the file: {}", err);
return;
}
let channel = match self.channels.get_mut(&stream_key) {
Some(channel) => channel,
None => return,
};
// If this is an audio or video sequence header we need to save it, so it can be
// distributed to any late coming watchers
match data_type {
ReceivedDataType::Video => {
if is_video_sequence_header(data.clone()) {
channel.video_sequence_header = Some(data.clone());
}
}
ReceivedDataType::Audio => {
if is_audio_sequence_header(data.clone()) {
channel.audio_sequence_header = Some(data.clone());
}
}
}
error:
Application options: AppOptions { log_io: false, pull: None, push: None }
Listening for connections
New connection (id 0)
Handshake successful!
Connection 0 requested connection to app ''
Event raised by connection 0: UnhandleableAmf0Command { command_name: "_checkbw", transaction_id: 2.0, command_object: Null, additional_values: [] }
Event raised by connection 0: UnhandleableAmf0Command { command_name: "releaseStream", transaction_id: 3.0, command_object: Null, additional_values: [Utf8String("placeholder_streamkey")] }
Event raised by connection 0: UnhandleableAmf0Command { command_name: "FCPublish", transaction_id: 4.0, command_object: Null, additional_values: [Utf8String("placeholder_streamkey")] }
Publish requested on app '' and stream key 'placeholder_streamkey'
New metadata received for app '' and stream key 'placeholder_streamkey'
Spent 553 ms (5% of time) doing work over 10 seconds (avg 17867 microseconds per iteration)
thread 'main' panicked at 'attempt to add with overflow', examples/mio_rtmp_server/src/main.rs:179:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
user@users-MacBook-Air mio_rtmp_server %
Thanks :)
Which line is 179 (the one where the addition is causing an overflow)? Nothing in the branch you showed is obviously performing an add.
Sorry! Apologies for that. Line 179 here: https://github.com/KallDrexx/rust-media-libs/blob/master/examples/mio_rtmp_server/src/main.rs
So that panic is caused because total_ns
is overflowing. Essentially, the addition of the file writing in time is causing the timings to be larger than expected, causing a number that's too big for whatever total_ns
is being inferred as.
You have two options:
handle_audio_video_data_received
is called it passed that data over the mpsc channel, which gets received by the spawned thread and thus written to disk concurrently.Sorry, I know I'm probably missing something obvious here (new to Rust and not super familiar with FLV formatting), but do you know why
ffprobe version 5.1.2 Copyright (c) 2007-2022 the FFmpeg developers
built with Apple clang version 14.0.0 (clang-1400.0.29.202)
configuration: --prefix=/opt/homebrew/Cellar/ffmpeg/5.1.2_6 --enable-shared --enable-pthreads --enable-version3 --cc=clang --host-cflags= --host-ldflags= --enable-ffplay --enable-gnutls --enable-gpl --enable-libaom --enable-libaribb24 --enable-libbluray --enable-libdav1d --enable-libmp3lame --enable-libopus --enable-librav1e --enable-librist --enable-librubberband --enable-libsnappy --enable-libsrt --enable-libsvtav1 --enable-libtesseract --enable-libtheora --enable-libvidstab --enable-libvmaf --enable-libvorbis --enable-libvpx --enable-libwebp --enable-libx264 --enable-libx265 --enable-libxml2 --enable-libxvid --enable-lzma --enable-libfontconfig --enable-libfreetype --enable-frei0r --enable-libass --enable-libopencore-amrnb --enable-libopencore-amrwb --enable-libopenjpeg --enable-libspeex --enable-libsoxr --enable-libzmq --enable-libzimg --disable-libjack --disable-indev=jack --enable-videotoolbox --enable-neon
libavutil 57. 28.100 / 57. 28.100
libavcodec 59. 37.100 / 59. 37.100
libavformat 59. 27.100 / 59. 27.100
libavdevice 59. 7.100 / 59. 7.100
libavfilter 8. 44.100 / 8. 44.100
libswscale 6. 7.100 / 6. 7.100
libswresample 4. 7.100 / 4. 7.100
libpostproc 56. 6.100 / 56. 6.100
[flv @ 0x141f04480] Read FLV header error, input file is not a standard flv format, first PreviousTagSize0 always is 0
[flv @ 0x141f04480] Packet mismatch -1358950000 11 11
Input #0, flv, from '/Users/user/Documents/video_file.flv':
Duration: N/A, start: 0.000000, bitrate: N/A
might be happening here? (And yes, I saw your note about thread scheduling, thank you for that! I'm just trying to get this output to work first, thus there is the placeholder file check there so the FLV header doesn't get written twice.
fn handle_audio_video_data_received(
&mut self,
stream_key: String,
timestamp: RtmpTimestamp,
data: Bytes,
data_type: ReceivedDataType,
server_results: &mut Vec<ServerResult>,
) {
{
let file_path = "/Users/josh/Documents/video_file.flv";
let isOk = fs::metadata(&file_path).is_ok();
// Open the file in append mode (creates if it doesn't exist).
let mut file = match OpenOptions::new().create(true).append(true).open(file_path) {
Ok(file) => file,
Err(err) => {
eprintln!("Error opening the file: {}", err);
return;
}
};
if (isOk == false) {
println!("File does not exist.");
// Prepare the FLV header (9 bytes)
let flv_header: [u8; 9] = [
'F' as u8, 'L' as u8, 'V' as u8, // Signature
1, // Version
5, // Flags (Audio + Video)
0, 0, 0, 9, // Data offset
];
// Write the FLV header to the file
if let Err(err) = file.write_all(&flv_header) {
eprintln!("Error writing FLV header: {}", err);
return;
}
}
let timestamp_value = timestamp.value; // Access the value field
let timestamp_upper = ((timestamp_value >> 16) & 0xFF) as u8;
let timestamp_middle = ((timestamp_value >> 8) & 0xFF) as u8;
let timestamp_lower = (timestamp_value & 0xFF) as u8;
// Prepare the packet header (15 bytes)
let mut packet_header: [u8; 15] = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
// Previous packet size
packet_header[0] = ((data.len() >> 16) & 0xFF) as u8;
packet_header[1] = ((data.len() >> 8) & 0xFF) as u8;
packet_header[2] = (data.len() & 0xFF) as u8;
// Video or audio packet type
packet_header[3] = match data_type {
ReceivedDataType::Video => 9,
ReceivedDataType::Audio => 8,
};
// Timestamp
packet_header[4] = timestamp_upper;
packet_header[5] = timestamp_middle;
packet_header[6] = timestamp_lower;
// Write the packet header to the file
if let Err(err) = file.write_all(&packet_header) {
eprintln!("Error writing packet header: {}", err);
return;
}
// Write the payload data to the file
if let Err(err) = file.write_all(&data) {
eprintln!("Error writing payload data: {}", err);
return;
}
let channel = match self.channels.get_mut(&stream_key) {
Some(channel) => channel,
None => return,
};
Thanks :)
At a quick glance, it appears you are setting the "previous packet size" to the current packet size. The previous packet size is to know how to go to the previous FLV tag for reverse seeks, and as ffprobe shows, it thinks that isn't zero.
thanks for correcting me there! (and thank you so much for your help in general), I've now made it so the size is 0 on the first packet and anything after that is the last packet size, but I'm still getting a packet mismatch for some reason. Am I not meant to use 0 for the first packet's last packet size?
Edit: during this I realized I need to be adding +15 to account for the total packet size and not just the video bytes size.
fn handle_audio_video_data_received(
&mut self,
stream_key: String,
timestamp: RtmpTimestamp,
data: Bytes,
data_type: ReceivedDataType,
server_results: &mut Vec<ServerResult>,
) {
{
let packet_size_tracker = get_packet_size_tracker();
let file_path = "/Users/josh/Documents/video_file.flv";
let isOk = fs::metadata(&file_path).is_ok();
// Open the file in append mode (creates if it doesn't exist).
let mut file = match OpenOptions::new().create(true).append(true).open(file_path) {
Ok(file) => file,
Err(err) => {
eprintln!("Error opening the file: {}", err);
return;
}
};
if (isOk == false) {
println!("File does not exist.");
// Prepare the FLV header (9 bytes)
let flv_header: [u8; 9] = [
'F' as u8, 'L' as u8, 'V' as u8, // Signature
1, // Version
5, // Flags (Audio + Video)
0, 0, 0, 9, // Data offset
];
// Write the FLV header to the file
if let Err(err) = file.write_all(&flv_header) {
eprintln!("Error writing FLV header: {}", err);
return;
}
}
let timestamp_value = timestamp.value; // Access the value field
let timestamp_upper = ((timestamp_value >> 16) & 0xFF) as u8;
let timestamp_middle = ((timestamp_value >> 8) & 0xFF) as u8;
let timestamp_lower = (timestamp_value & 0xFF) as u8;
// Prepare the packet header (15 bytes)
let mut packet_header: [u8; 15] = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
// Get the previous packet size from the tracker
let last_packet_size = packet_size_tracker.get_previous_size(&stream_key.clone());
if !isOk {
println!("USING ZERO PACKET SIZE BECAUSE FILE DOESNT EXIST");
// Set previous packet size to 0 for the first packet
packet_header[0] = 0;
packet_header[1] = 0;
packet_header[2] = 0;
} else {
println!("Using last packet size of {}", last_packet_size);
// Get the size of the last packet from the stored variable
packet_header[0] = ((last_packet_size >> 16) & 0xFF) as u8;
packet_header[1] = ((last_packet_size >> 8) & 0xFF) as u8;
packet_header[2] = (last_packet_size & 0xFF) as u8;
}
println!("Current packet size: {}", data.len() as u32);
// Update the packet size tracker with the size of the current packet
packet_size_tracker.update_size(stream_key.clone(), data.len() as u32+15);
// Video or audio packet type
packet_header[3] = match data_type {
ReceivedDataType::Video => 9,
ReceivedDataType::Audio => 8,
};
// Timestamp
packet_header[4] = timestamp_upper;
packet_header[5] = timestamp_middle;
packet_header[6] = timestamp_lower;
// Write the packet header to the file
if let Err(err) = file.write_all(&packet_header) {
eprintln!("Error writing packet header: {}", err);
return;
}
// Write the payload data to the file
if let Err(err) = file.write_all(&data) {
eprintln!("Error writing payload data: {}", err);
return;
}
Oh I see, sorry, I missed the warning listed in FFProble again, mentioning that it's not zero. I'll download a FLV reader program and take a look at the file itself, because it should be zero and I'm not sure why it's not.
Hmmm...shouldn't the flv header and the flv packet header be on different lines?
Ok, so I updated my code to add new lines. Still getting that same ffmpeg warning. Am I writing the last packet size metadata to the file correctly? I'm definitely able to fetch it and store it correctly, but I might not be formatting it right?
here is a screenshot of hexfiend:
Hm, ok. I removed the new lines, and added a new header size slot to the array. Now the "Read FLV header error, input file is not a standard flv format, first PreviousTagSize0 always is 0" warning is gone, and I just get "Packet mismatch 700160 11 11".
fn handle_audio_video_data_received(
&mut self,
stream_key: String,
timestamp: RtmpTimestamp,
data: Bytes,
data_type: ReceivedDataType,
server_results: &mut Vec<ServerResult>,
) {
{
let packet_size_tracker = get_packet_size_tracker();
let file_path = "/Users/josh/Documents/video_file.flv";
let isOk = fs::metadata(&file_path).is_ok();
// Open the file in append mode (creates if it doesn't exist).
let mut file = match OpenOptions::new().create(true).append(true).open(file_path) {
Ok(file) => file,
Err(err) => {
eprintln!("Error opening the file: {}", err);
return;
}
};
if (isOk == false) {
println!("File does not exist.");
// Prepare the FLV header (9 bytes)
let flv_header: [u8; 9] = [
'F' as u8, 'L' as u8, 'V' as u8, // Signature
1, // Version
5, // Flags (Audio + Video)
0, 0, 0, 9, // Data offset
];
// Write the FLV header to the file
if let Err(err) = file.write_all(&flv_header) {
eprintln!("Error writing FLV header: {}", err);
return;
}
}
let timestamp_value = timestamp.value; // Access the value field
let timestamp_upper = ((timestamp_value >> 16) & 0xFF) as u8;
let timestamp_middle = ((timestamp_value >> 8) & 0xFF) as u8;
let timestamp_lower = (timestamp_value & 0xFF) as u8;
// Prepare the packet header (15 bytes)
let mut packet_header: [u8; 15] = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
// Get the previous packet size from the tracker
let last_packet_size = packet_size_tracker.get_previous_size(&stream_key.clone());
let previous_packet_size_bytes: [u8; 4] = [
((last_packet_size >> 24) & 0xFF) as u8,
((last_packet_size >> 16) & 0xFF) as u8,
((last_packet_size >> 8) & 0xFF) as u8,
(last_packet_size & 0xFF) as u8,
];
if !isOk {
println!("USING ZERO PACKET SIZE BECAUSE FILE DOESNT EXIST");
// Set previous packet size to 0 for the first packet
} else {
println!("Using last packet size of {}", last_packet_size);
// Get the size of the last packet from the stored variable
packet_header[0] = previous_packet_size_bytes[0];
packet_header[1] = previous_packet_size_bytes[1];
packet_header[2] = previous_packet_size_bytes[2];
packet_header[3] = previous_packet_size_bytes[3];
}
// Update the packet size tracker with the size of the current packet
packet_size_tracker.update_size(stream_key.clone(), data.len() as u32+15);
// Video or audio packet type
packet_header[4] = match data_type {
ReceivedDataType::Video => 9,
ReceivedDataType::Audio => 8,
};
// Timestamp
packet_header[5] = timestamp_upper;
packet_header[6] = timestamp_middle;
packet_header[7] = timestamp_lower;
println!("PACKET HEADER: {:?}", packet_header);
// Write the packet header to the file
if let Err(err) = file.write_all(&packet_header) {
eprintln!("Error writing packet header: {}", err);
return;
}
// Write the payload data to the file
if let Err(err) = file.write_all(&data) {
eprintln!("Error writing payload data: {}", err);
return;
};
In the corrupt binary file you sent me, the byte at offset 0x09 is 0A
, but since that's the first previous tag it should be zero. That's why ffprobe was saying the first previous tag size wasn't zero, because 0A != 00
. You can see that also in your second screenshot.
Likewise, the code you are writing is harder to read than it should be and I suggest using the byteorder crate. That automatically does the bit shifting conversion for you and your can focus your code on just writing the actual values directly in the correct endianess. See these tests as an example.
Likewise, I'm not sure why you are encoding the packet size as data.len() as u32 + 15
. Besides it being written confusingly (specifically the +15 part after the as u32
, the flv spec says that the header for each tag is u8+u24+u24+u8+u24
plus 1 more byte for audio or video specific data. So the length of each flv tag header seems wrong.
Likewise, packet header byte 4 you have saving the TagType
field, but then the next byte you have as the 3 timestamp
fields. However, DataSize
is supposed to come between tag type and timestamp. So your timestamp is probably being interpreted as data size, and thus it's seeing invalid audio or video tags.
Thank you so much for your patience with me! I got this working. Without your patience, and to be honest some ChatGPT and heavy Googling, this whole experience would have been way more painful since this is my first time using Rust and first time formatting FLV.
Thank you :)
@oracularhades any chance you can contribute an example to help others wanting to do similar things in the future?
Still polishing this code, but in the future potentially yeah
Thank you for all your help, I won't commit this, but I want to make this available for people to use it in their projects :) - My heavily ChatGPT'd and Google'd code lmao.
fn get_packet_size_tracker() -> &'static mut PacketSizeTracker {
// Define the PacketSizeTracker instance as static mut
static mut PACKET_SIZE_TRACKER: Option<PacketSizeTracker> = None;
// Use unsafe to manage the mutable reference to the instance
unsafe {
if PACKET_SIZE_TRACKER.is_none() {
PACKET_SIZE_TRACKER = Some(PacketSizeTracker::new());
}
PACKET_SIZE_TRACKER.as_mut().unwrap()
}
}
let file_id = "hi";
let input_file_path = format!("/{}.flv", file_id.clone());
let output_file_path = format!("/{}.mpd", file_id.clone());
let isOk = fs::metadata(&input_file_path).is_ok();
let mut file = match OpenOptions::new().create(true).append(true).open(input_file_path.clone()) {
Ok(file) => file,
Err(err) => {
eprintln!("Error opening the file: {}", err);
return;
}
};
let mut file_output: Vec<u8> = Vec::new();
let stream_key = stream_data.stream_key;
let timestamp: RtmpTimestamp = stream_data.timestamp;
let data: Bytes = stream_data.data;
let data_type: ReceivedDataType = stream_data.data_type;
let packet_size_tracker = get_packet_size_tracker();
let timestamp_value = timestamp.value; // Access the value field
let timestamp_upper = ((timestamp_value >> 16) & 0xFF) as u8;
let timestamp_middle = ((timestamp_value >> 8) & 0xFF) as u8;
let timestamp_lower = (timestamp_value & 0xFF) as u8;
// Prepare the packet header (15 bytes)
let mut packet_header: [u8; 15] = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
// Get the previous packet size from the tracker
let mut last_packet_size = packet_size_tracker.get_previous_size(&file_id.clone());
if (isOk == false) {
// Prepare the FLV header (9 bytes)
let flv_header: [u8; 9] = [
'F' as u8, 'L' as u8, 'V' as u8, // Signature
1, // Version
5, // Flags (Audio + Video)
0, 0, 0, 9, // Data offset
];
if let Err(err) = file_output.write_all(&flv_header) {
eprintln!("Error writing FLV header: {}", err);
return;
}
last_packet_size = 11 + data.len() as u32;
}
let previous_packet_size_bytes: [u8; 4] = [
((last_packet_size >> 24) & 0xFF) as u8,
((last_packet_size >> 16) & 0xFF) as u8,
((last_packet_size >> 8) & 0xFF) as u8,
(last_packet_size & 0xFF) as u8,
];
if (isOk == false) {
println!("USING ZERO PACKET SIZE BECAUSE FILE DOESNT EXIST");
// Set previous packet size to 0 for the first packet
} else {
packet_header[0] = previous_packet_size_bytes[0];
packet_header[1] = previous_packet_size_bytes[1];
packet_header[2] = previous_packet_size_bytes[2];
packet_header[3] = previous_packet_size_bytes[3];
}
// Update the packet size tracker with the size of the current packet
packet_size_tracker.update_size(file_id.clone(), 11 + data.len() as u32);
// Video or audio packet type
packet_header[4] = match data_type {
ReceivedDataType::Video => 9,
ReceivedDataType::Audio => 8,
};
let data_size = data.len() as u32;
let data_size_bytes: [u8; 3] = [
((data_size >> 16) & 0xFF) as u8,
((data_size >> 8) & 0xFF) as u8,
(data_size & 0xFF) as u8,
];
packet_header[5] = data_size_bytes[0];
packet_header[6] = data_size_bytes[1];
packet_header[7] = data_size_bytes[2];
// Timestamp
packet_header[8] = timestamp_upper;
packet_header[9] = timestamp_middle;
packet_header[10] = timestamp_lower;
// Write the packet header to the file
if let Err(err) = file_output.write_all(&packet_header) {
eprintln!("Error writing packet header: {}", err);
return;
}
if let Err(err) = file_output.write_all(&data) {
eprintln!("Error writing packet header: {}", err);
return;
}
// Write the payload data to the file
if let Err(err) = file.write_all(&file_output) {
eprintln!("Error writing payload data: {}", err);
return;
};
@LegNeato
This project is awesome, keep up the great work :)
Hello! Below is my modified code of the "handle_audio_video_data_received" function in the mio_rtmp_server example. When I write the incoming data to a file and try to play it back, not even ffmpeg is sure what it is. I'm sure I'm missing something obvious, but could someone help me out? Thanks.