AtherEnergy / rumqtt

Pure rust mqtt cilent
The Unlicense
202 stars 72 forks source link

How to use with actix #146

Open cmnstmntmn opened 5 years ago

cmnstmntmn commented 5 years ago

does anyone have an example of running both actix and rumqtt?

flxo commented 5 years ago

No, but I used both.

In theory this should be a Actor implementation with a rumqtt::MqttClient that implements Handler<(String, QoS, Bytes or Vec<u8>)> for publishes.

The notifications need to be received in a dedicated thread: crossbeam::Receiver doesn't impl Stream, so you cannot implement StreamHandler<..> for you MQTT client actor. Fire up a threads the iterates over the notifications (this may block) and do_send them the to the your actor. If you want to got for the StreamHandler you need to implement a Stream that calls try_recv on the notification handle in poll. No sure what additional things need to be done to notify the correct tasks.

There is a project actix-broker that might help to distribute the Notifications in your actor system - I never used it.

mojzu commented 5 years ago

I've been working on a proof of concept that uses actix/rumqtt. The actor implementation is below, as the TODO notes show, it's pretty rough, no real error handling/graceful exit yet. But provided the mailbox/notification channel sizes are large enough it seems to be able to handle the deluge of messages from a public MQTT server.

use actix::prelude::*;
use futures::future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

/// MQTT errors.
#[derive(Debug, Fail)]
pub enum Error {
    #[fail(display = "MqttError::FromUtf8 {}", _0)]
    FromUtf8(std::string::FromUtf8Error),
}

/// MQTT user, password configuration.
#[derive(Debug, Clone)]
pub struct UserPasswordConfiguration {
    pub user: String,
    pub password: String,
}

/// MQTT topic configuration.
#[derive(Debug, Clone)]
pub struct TopicConfiguration {
    pub topic: String,
    pub qos: u8,
}

/// MQTT configuration.
#[derive(Debug, Clone)]
pub struct Configuration {
    pub mailbox_capacity: usize,
    pub client_id: String,
    pub host: String,
    pub port: u16,
    pub user_password: Option<UserPasswordConfiguration>,
    pub topics: Vec<TopicConfiguration>,
    pub reconnnect_s: u64,
    pub keep_alive_s: u16,
    pub clean_session: bool,
    pub notification_channel_capacity: usize,
}

impl Default for Configuration {
    fn default() -> Self {
        Configuration {
            mailbox_capacity: 512,
            client_id: "a7g87sdg7dsg7sd9g70s".to_owned(),
            host: "test.mosquitto.org".to_owned(),
            port: 1883,
            user_password: None,
            topics: vec![TopicConfiguration {
                topic: "#".to_owned(),
                qos: 1,
            }],
            reconnnect_s: 5,
            keep_alive_s: 10,
            clean_session: false,
            notification_channel_capacity: 512,
        }
    }
}

/// MQTT actor.
pub struct Actor {
    /// MQTT configuration.
    pub configuration: Arc<Configuration>,
    /// MQTT client handle.
    /// Created with None and assigned Some in actor `started` function.
    pub mqtt_client: Option<rumqtt::MqttClient>,
    /// MQTT notifications thread handle.
    /// Created with None and assigned Some in actor `started` function.
    pub thread_handle: Option<std::thread::JoinHandle<()>>,
    /// Publish message counter.
    pub publish_counter: AtomicUsize,
}

impl Actor {
    /// Start instance of actor with configuration.
    pub fn start(configuration: &Configuration) -> Addr<Actor> {
        let configuration = Arc::new(configuration.clone());
        Supervisor::start(|_| Actor {
            configuration,
            mqtt_client: None,
            thread_handle: None,
            publish_counter: AtomicUsize::new(0),
        })
    }

    /// MQTT client options from configuration.
    pub fn mqtt_client_options(&self) -> rumqtt::MqttOptions {
        // TODO: Check client disconnection/reconnection/error handling.
        let reconnection_options =
            rumqtt::ReconnectOptions::AfterFirstSuccess(self.configuration.reconnnect_s);
        let options = rumqtt::MqttOptions::new(
            self.configuration.client_id.to_owned(),
            self.configuration.host.to_owned(),
            self.configuration.port,
        )
        .set_keep_alive(self.configuration.keep_alive_s)
        .set_clean_session(self.configuration.clean_session)
        .set_reconnect_opts(reconnection_options)
        .set_notification_channel_capacity(self.configuration.notification_channel_capacity);

        match &self.configuration.user_password {
            Some(user_password) => {
                options.set_security_opts(rumqtt::SecurityOptions::UsernamePassword(
                    user_password.user.to_owned(),
                    user_password.password.to_owned(),
                ))
            }
            None => options,
        }
    }

    /// Send publish message to actor.
    pub fn try_send_publish(addr: &Addr<Actor>, publish: Publish) -> () {
        match addr.try_send(publish) {
            Ok(_) => {}
            Err(err) => {
                // TODO: How to handle send errors.
                error!("{}", err);
            }
        };
    }

    /// Spawn thread to iterate over notifications channel.
    pub fn thread_spawn_notifications(
        &self,
        notifications: rumqtt::Receiver<rumqtt::Notification>,
        addr: Addr<Actor>,
    ) -> std::thread::JoinHandle<()> {
        // TODO: How to handle process exit while messages being handled (ack later?).
        std::thread::spawn(move || {
            for notification in notifications {
                match notification {
                    rumqtt::Notification::Publish(publish) => {
                        // Convert to publish message and send to actor.
                        match Publish::from_mqtt311_publish(publish) {
                            Ok(publish) => Actor::try_send_publish(&addr, publish),
                            // Messages that are not JSON or valid UTF8 are dropped.
                            Err(_err) => (),
                        };
                    }
                    _ => {
                        // TODO: Handle other notification types.
                        info!("notification: {:?}", notification);
                    }
                };
            }
        })
    }
}

impl actix::Supervised for Actor {
    // TODO: Implement restarting.
}

impl actix::Actor for Actor {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Context<Self>) {
        info!("MqttActor::started");

        // Increase mailbox capacity for increased message throughput.
        ctx.set_mailbox_capacity(self.configuration.mailbox_capacity);

        // MQTT client options.
        let options = self.mqtt_client_options();

        // Start MQTT client and get notifications receiver.
        let (mut mqtt_client, notifications) = rumqtt::MqttClient::start(options).unwrap();

        // Spawn thread to iterate over notifications.
        let address = ctx.address();
        let thread_handle = self.thread_spawn_notifications(notifications, address);

        // Subscribe to topic(s).
        for (_i, x) in self.configuration.topics.iter().enumerate() {
            mqtt_client
                .subscribe(x.topic.to_owned(), rumqtt::QoS::from_u8(x.qos).unwrap())
                .unwrap()
        }

        // Set client and thread handle in actor state to prevent drop.
        self.mqtt_client = Some(mqtt_client);
        self.thread_handle = Some(thread_handle);
    }

    // TODO: Implement signals handling for graceful actor stop.
    fn stopping(&mut self, _ctx: &mut Context<Self>) -> Running {
        info!("MqttActor::stopping");
        let client = self.mqtt_client.as_mut().unwrap();
        client.pause().unwrap();
        Running::Stop
    }

    fn stopped(&mut self, _ctx: &mut Context<Self>) {
        info!("MqttActor::stopped");
    }
}

/// MQTT publish payload type.
#[derive(Debug, Serialize, Deserialize)]
pub enum PublishPayload {
    Json(serde_json::Value),
    Text(String),
}

/// MQTT publish type.
#[derive(Debug, Serialize, Deserialize)]
pub struct Publish {
    /// Message topic.
    pub topic: String,
    /// Message payload, JSON or text.
    pub payload: PublishPayload,
    /// Message quality of service.
    pub qos: u8,
    /// Message duplicate flag.
    pub duplicate: bool,
    /// Message retain flag.
    pub retain: bool,
    /// Message packet identifier.
    pub pkid: Option<u16>,
}

impl Publish {
    /// Try to convert `mqtt311::Publish` to `Publish`.
    pub fn from_mqtt311_publish(publish: mqtt311::Publish) -> Result<Publish, Error> {
        // First try to deserialise to JSON.
        let payload = match serde_json::from_slice(publish.payload.as_ref()) {
            Ok(value) => Ok(PublishPayload::Json(value)),
            // If deserialisation fails, try to parse UTF8 string.
            Err(_err) => match String::from_utf8(publish.payload.to_vec()) {
                Ok(value) => Ok(PublishPayload::Text(value)),
                Err(err) => Err(Error::FromUtf8(err)),
            },
        }?;
        let pkid = match publish.pkid {
            Some(pkid) => Some(pkid.0),
            None => None,
        };

        Ok(Publish {
            topic: publish.topic_name,
            payload,
            qos: publish.qos.to_u8(),
            duplicate: publish.dup,
            retain: publish.retain,
            pkid,
        })
    }
}

impl actix::Message for Publish {
    type Result = Result<(), Error>;
}

impl actix::Handler<Publish> for Actor {
    type Result = ResponseActFuture<Self, (), Error>;

    fn handle(&mut self, _msg: Publish, _ctx: &mut Context<Self>) -> Self::Result {
        // TODO: Better metrics collection.
        // info!("publish: {:?}", msg);
        let counter = self.publish_counter.fetch_add(1, Ordering::Relaxed);
        if counter % 1000 == 0 {
            info!("publish_counter: {}", counter);
        }

        // TODO: Implement parsing/messages to other actors.
        // TODO: How to handle errors in handler.
        let placeholder = future::ok(());
        let wrapped = actix::fut::wrap_future(placeholder);
        Box::new(wrapped)
    }
}