harlanc / xiu

A simple,high performance and secure live media server in pure Rust (RTMP[cluster]/RTSP/WebRTC[whip/whep]/HTTP-FLV/HLS).🦀
https://www.rustxiu.com
MIT License
1.6k stars 168 forks source link

Some tips after studying your code #121

Open shiqifeng2000 opened 2 months ago

shiqifeng2000 commented 2 months ago

Hi

I've recently studying your code, rtmp part, it seems your are using a "stream hub" as the media distributor and some stream_handler to handle meta data. I see the StreamHubEvent::Publish event did the job to register the stream to hub, but why bother sending a "sender" and receive another "sender"? here's the code

 pub async fn publish_to_channels(
        &mut self,
        app_name: String,
        stream_name: String,
        pub_id: Uuid,
        gop_num: usize,
    ) -> Result<(), SessionError> {
        self.stream_handler
            .set_cache(Cache::new(app_name.clone(), stream_name.clone(), gop_num))
            .await;

        let (event_result_sender, event_result_receiver) = oneshot::channel();
        let publish_event = StreamHubEvent::Publish {
            identifier: StreamIdentifier::Rtmp {
                app_name,
                stream_name,
            },
            info: self.get_publisher_info(pub_id),
            stream_handler: self.stream_handler.clone(),
            result_sender: event_result_sender,
        };

        if self.event_producer.send(publish_event).is_err() {
            return Err(SessionError {
                value: SessionErrorValue::StreamHubEventSendErr,
            });
        }

        let result = event_result_receiver.await??;
        self.data_sender = result.0.unwrap();
        Ok(())
    }

I guess there's many alternatives, such as a tokio::sync::broadcast channel, sending the tokio::sync::mpsc::Receiver, or maybe a Arc<Rwlock/Mutex>, could you share some idea why it is designed like this?

harlanc commented 2 months ago

One channel is used for sending the Publish event to the streamhub, and the other is used for receiving the event execution result sent by the streamhub.

I want to use as few locks as possible in the project which can be achieved by using channels..

Why use tokio::sync::broadcast ,you mean replacing the tokio::sync::mpsc? Could you explain the reasons?

shiqifeng2000 commented 2 months ago

> One channel is used for sending the Publish event to the streamhub, and the other is used for receiving the event execution result sent by the streamhub.

I guess an actix-actor message will well suit that

> I want to use as few locks as possible in the project which can be achieved by using channels..

I understand, locker can be blocking

> Why use tokio::sync::broadcast ,you mean replacing the tokio::sync::mpsc? Could you explain the reasons?

A broadcast sender impl clone trait and can create another subscriber, but would drop items when it is full or lagged. For your case, unless you want to keep ALL the media data, you could simple create a broadcast channel and share/clone the sender to the stream hub. When a subscriber comes, send him the sender so that he could create a receiver.

harlanc commented 2 months ago

One channel is used for sending the Publish event to the streamhub, and the other is used for receiving the event execution result sent by the streamhub.

I guess an actix-actor message will well suit that

The implementation of the codes about publish/subscribe is indeed not clear enough, I was wondering if it could be replaced by a simple RPC framework. I will consider your suggestions. Thanks.

Why use tokio::sync::broadcast ,you mean replacing the tokio::sync::mpsc? Could you explain the reasons?

A broadcast sender impl clone trait and can create another subscriber, but would drop items when it is full or lagged. For your case, unless you want to keep ALL the media data, you could simple create a broadcast channel and share/clone the sender to the stream hub. When a subscriber comes, send him the sender so that he could create a receiver.

The RTMP protocol is a little special: you should not send the publisher's real-time data immediately after the client's subscribe. Instead, before that ,some cached data (avc/aac sequence header) needs to be sent to client . So if we subscribe a broadcast channel, can we insert them into the channel firstly?

shiqifeng2000 commented 2 months ago

One channel is used for sending the Publish event to the streamhub, and the other is used for receiving the event execution result sent by the streamhub.

I guess an actix-actor message will well suit that

The implementation of the codes about publish/subscribe is indeed not clear enough, I was wondering if it could be replaced by a simple RPC framework. I will consider your suggestions. Thanks.

Why use tokio::sync::broadcast ,you mean replacing the tokio::sync::mpsc? Could you explain the reasons?

A broadcast sender impl clone trait and can create another subscriber, but would drop items when it is full or lagged. For your case, unless you want to keep ALL the media data, you could simple create a broadcast channel and share/clone the sender to the stream hub. When a subscriber comes, send him the sender so that he could create a receiver.

The RTMP protocol is a little special: you should not send the publisher's real-time data immediately after the client's subscribe. Instead, before that ,some cached data (avc/aac sequence header) needs to be sent to client . So if we subscribe a broadcast channel, can we insert them into the channel firstly?

I think it's quit possible, by not just relying to the channel way. when you publish the stream, you could just send the side data/meta/sequence header to the hub, when subscribing, you could pull all these down, invoke a send_channel_data before setting up the subscribing could do the job.