eclipse / paho.mqtt.rust

paho.mqtt.rust
Other
525 stars 102 forks source link

Can't get a second message with my subscriber #232

Open EvilWatermelon opened 3 months ago

EvilWatermelon commented 3 months ago

The first message comes without any problem but after that I'm getting a StreamError: Closed(..) and no other error messages. What does this mean and how can I fix this problem?

pub async fn mqtt_connect() -> Result<AsyncClient, paho_mqtt::errors::Error> {

    let mqtt_url: String = std::env::var("MQTT_CON").expect("MQTT_CON must be set.");
    let host: String = std::env::args()
        .nth(1)
        .unwrap_or_else(|| mqtt_url);
    let id: &str = "isumis_backend";
    /*
     Create the client. Use an ID for a persistent session.
     A real system should try harder to use a unique ID.
    */
    let create_opts: CreateOptions = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id(id)
        .finalize();

    // Create the client connection
    let cli: AsyncClient = AsyncClient::new(create_opts).unwrap_or_else(|e| {
        error!("Error creating the client: {}", e);
        process::exit(1);
    });

    // Define the set of options for the connection
    let lwt: Message = Message::new("test", "Async subscriber lost connection", mqtt::QOS_1);

    let conn_opts: ConnectOptions = mqtt::ConnectOptionsBuilder::new()
        .clean_session(false)
        .properties(mqtt::properties![mqtt::PropertyCode::SessionExpiryInterval => 3600])
        //.keep_alive_interval(Duration::from_secs(15))
        .will_message(lwt)
        .finalize();

    // Make the connection to the broker
    info!("Connecting to the MQTT server...");
    cli.connect(conn_opts).await?;

    match cli.is_connected() {
        true => {
            warn!("Connection unsuccessful, reconnecting...");
            cli.reconnect();
        },
        false => info!("Connection to MQTT successful.")
    };

    Ok(cli)
}

pub fn subscriber() -> () {

    if let Err(err) = block_on(async {

        let mut cli: AsyncClient = mqtt_connect().await?;

        // Get message stream before connecting.
        let strm: mqtt::AsyncReceiver<Option<Message>> = cli.get_stream(25);

        info!("Subscribing to topics: {:?}", TOPICS);
        //let sub_opts: Vec<mqtt::SubscribeOptions> = vec![mqtt::SubscribeOptions::with_retain_as_published(); TOPICS.len()];
        //cli.subscribe_many_with_options(TOPICS, QOS, &sub_opts, None)
        cli.subscribe_many(TOPICS, QOS).await?;

        // Just loop on incoming messages.
        info!("Waiting for messages...");

        /*
         Note that we're not providing a way to cleanly shut down and
         disconnect. Therefore, when you kill this app (with a ^C or
         whatever) the server will get an unexpected drop and then
         should emit the LWT message.
        */
        message_loop(cli, strm).await;

        // Explicit return type for the async block
        Ok::<(), mqtt::Error>(())
    }) {
        error!("{}", err);
    }
}

async fn message_loop(cli: AsyncClient, mut strm: mqtt::AsyncReceiver<Option<Message>>) -> () {

    while let Some(msg_opt) = strm.next().await {
        if let Some(sub_msg) = msg_opt {

            if sub_msg.retained() {
                info!("(R) ");
            }

            let msg: &str = match std::str::from_utf8(sub_msg.payload()) {
                Ok(v) => v,
                Err(e) => panic!("Invalid UTF-8 sequence: {e}"),
            };

            let payload_json: Value = serde_json::from_str(msg).unwrap();

            info!("New message: {}", &payload_json);

            match sub_msg.topic() {
                TRAFFIC => network_traffic(payload_json)
                    .await
                    .expect("Could not handle network traffic"),
                FILE_INFO => handle_file_info(msg)
                    .await
                    .expect("Could not handle scanned app results"),
                BLACKLIGHT => handle_blacklight(payload_json)
                    .await
                    .expect("Could not handle blacklight results"), 
                &_ => error!("No suitable topic name found")
            };

        } else {
            // A "None" means we were disconnected. Try to reconnect...
            warn!("Lost connection. Attempting reconnect.");
            while let Err(err) = cli.reconnect().await {
                error!("Error reconnecting: {}", err);
                actix_web::rt::time::sleep(Duration::from_millis(1000)).await;
            }
        }
    }
}

I recieve a part of the message and then I get the error

2024-06-28T12:49:07.915945141+02:00 DEBUG paho_mqtt_c - 20240628 124906.261 m->c->connect_state = 0
2024-06-28T12:49:07.916177041+02:00 DEBUG paho_mqtt_c - 20240628 124906.261 11 my_backend <- PUBLISH msgid: 0 qos: 0 retained: 0 payload len(1172): {"host":"example.com
2024-06-28T12:49:07.916257531+02:00 DEBUG paho_mqtt_c - 20240628 124906.261 Calling messageArrived for client isumis_backend, queue depth 0
2024-06-28T12:49:07.916308994+02:00 DEBUG paho_mqtt::async_client - Message arrived. Client: 0x7fb9f0024160, topic: 0x7fb9f0026c10 len 31 cmsg: 0x7fb9f00240a0: MQTTAsync_message { struct_id: [77, 81, 84, 77], struct_version: 1, payloadlen: 1172, payload: 0x7fb9ec002610, qos: 0, retained: 0, dup: 0, msgid: 0, properties: MQTTProperties { count: 0, max_count: 0, length: 0, array: 0x0 } }
2024-06-28T12:49:07.916444203+02:00 ERROR paho_mqtt::async_client - Stream error: Closed(..)