SvenskaSpel / locust-plugins

A set of useful plugins/extensions for Locust
Apache License 2.0
559 stars 138 forks source link

subscribe to a topic fails #195

Open kamyarz-aws opened 2 weeks ago

kamyarz-aws commented 2 weeks ago

import os
import ssl
import time

import base64
import json
from datetime import datetime
from datetime import timezone
import random

from locust import task, TaskSet
from locust.user.wait_time import constant_throughput, between
from locust_plugins.users.mqtt import MqttUser
import logging
logging.basicConfig(level=logging.DEBUG)

class OneTpsPublish(MqttUser):
    protocol_version = 3
    host = "XXXXX.amazonaws.com"
    port = 8883

    unique_topic = "locust/test/topic"
    context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    context.load_cert_chain("XXX.crt", "mqtt/XXX.key", None)
    context.load_verify_locations("mqtt/AmazonRootCA1.pem")

    def on_start(self):
        self.client.tls_set_context(self.context)
        self.client.subscribe(self.unique_topic)
        time.sleep(2)

    def on_connect(self, client, userdata, flags, rc):
        time.sleep(1)

    @task
    def publish(self):
        payload = '{"message": "Hello, 123!"}'
        self.client.publish(self.unique_topic, payload)

I have this script trying to test against an aws iot message broker.

It does successfully connect and publish, however it is unable to subscribe to a topic.

if I place the subscribe to happen on_connect, no failure would be reported.

I see in the codebase of the mqtt user, that there is a comment on line 25 that says # indicates a failure to subscribe.

Does this mean it is normal for subscribe to fail ??

The same does NOT happen when I use a public message broker like hiveMQ.

kamyarz-aws commented 2 weeks ago

one of my suspicions for the reason behind this behaviour is that the keep_alive value is not set when calling self.client.connect_async through locust_plugins.