pmalhaire / xk6-mqtt

k6 mqtt extension
Apache License 2.0
46 stars 37 forks source link

isConnected not working #26

Open InRiPa opened 3 months ago

InRiPa commented 3 months ago

I believe, the isConnected() function is not working properly. I think once connected, the isConnected() function will always return True, no matter the actual status.

What I did:

  1. Start the broker
  2. Run the script
  3. While the script is running, shutting down the broker

What did I expect: isConnected() returns false

What happened: isConnected() returns true always

For QoS 1 and 2, I can work around it, using the publish() function. But for QoS 0 even the publish() function doesn't throw an error. Which makes it hard to test a broker, if it actually bails out.

Following a test script, maybe I did something wrong:

import { Counter, Gauge, Rate, Trend } from 'k6/metrics';
import {
    check, fail
} from 'k6';

export const options = {
    discardResponseBodies: true,
    scenarios: {
        contacts: {
            executor: 'constant-arrival-rate',
            // How long the test lasts
            duration: '60s',
            // How many iterations per timeUnit
            rate: 1,
            // Start `rate` iterations per second
            timeUnit: '1s',
            // Pre-allocate 2 VUs before starting the test
            preAllocatedVUs: 2,
            // Spin up a maximum of 50 VUs to sustain the defined
            // constant arrival rate.
            maxVUs: 50,
        },
    },
};

const mqtt = require('k6/x/mqtt');

const rnd_count = 2000;
// create random number to create a new topic at each run
let rnd = Math.round(Math.random() * rnd_count);
// conection timeout (ms)
let connectTimeout = 2000
// publish timeout (ms)
let publishTimeout = 2000
// connection close timeout (ms)
let closeTimeout = 2000

// Mqtt topic one per VU
const k6Topic = `benchmark/${rnd}/${__VU}`;
// Connect IDs one connection per VU
const k6PubId = `k6-pub-${rnd}-${__VU}`;

const host = "mqtt://192.666.666.666"; // ip to reach broker
const port = "1883";

const failedMsgs = new Counter('failed_messages');
const failedConnects = new Counter('failed_connects');
const connectRate = new Rate('connect_rate');

// create publisher client
let publisher = new mqtt.Client(
    // The list of URL of  MQTT server to connect to
    [host + ":" + port],
    // A username to authenticate to the MQTT server
    "",
    // Password to match username
    "",
    // clean session setting
    false,
    // Client id for reader
    k6PubId,
    // timeout in ms
    connectTimeout,
)
let err;
let connect_status;

// initial connect MUST happen, otherwise test will fail
try {
    console.log(`before Connect: ${publisher.isConnected()}`)
    connect_status = publisher.connect()
    console.info(`init Connect response: ${connect_status}`)
}
catch (error) {
    err = error
}

if (err != undefined) {
    console.error("publish connect error:", err)
    // you may want to use fail here if you want only to test successfull connection only
    fail("fatal could not make initial connect to broker for publish")
}

export default function () {

    // Message content one per ITER
    const k6Message = `{"temperature":20,"timeStamp":${Math.round(new Date().getTime() / 1000)}}`;

    console.log(`Current Connect status: ${publisher.isConnected()}`);
    connectRate.add(
        check(publisher, {
            "is publisher connected": publisher => publisher.isConnected()
        })
    );

    if (!publisher.isConnected()) {
        failedConnects.add(1);
        publisher.connect()

    } else {
        // publish count messages
        let err_publish;
        try {
            publisher.publish(
                // topic to be used
                k6Topic,
                // The QoS of messages
                1,
                // Message to be sent
                k6Message,
                // retain policy on message
                false,
                // timeout in ms
                publishTimeout,
                // async publish handlers if needed
                (obj) => { // success
                    console.log(obj.type) // publish
                    console.log(obj.topic) // published topic
                },
                (err) => { // failure
                    console.log(err.type)  // error
                    console.log(err.message)
                }
            );
        } catch (error) {
            failedMsgs.add(1);
            err_publish = error
        }
        console.log(err_publish)
    }
}

export function teardown() {
    console.log("CLOSING SESSION")
    // closing both connections at VU close
    publisher.close(closeTimeout);
}
pmalhaire commented 3 months ago

I'll test this

pmalhaire commented 2 weeks ago

I confirm the issue and I'll fix in a bit of time.