git clone https://github.com/JustDoIt0910/tinyMQTT.git
cd tinyMQTT
mkdir build && cd build
cmake ..
make
编译生成
在broker可执行文件目录下添加 tinymqtt.conf 配置文件:
# server port
port=1883
# location of the password file
password_file=pwd.conf
# if allow anonymous login, the broker won't check the username and password
allow_anonymous=true
# the maximum number of unacknowalegded publish packet
inflight_window=1
# the number of io thread
io_threads=4
可以使用 tinymqtt_pwd 工具添加用户:
./tinymqtt_pwd -c ./pwd.conf -A new_user
如果设置了allow_anonymous 为true, broker将不再检查用户密码。
测试对比 tinyMQTT 和 mosquitto 在广播场景下的性能。测试一、二均使用 qos 0 消息。
测试一:建立 1000 订阅者,订阅 bench/1 主题,建立 5 个发布者,每个发布者每秒向 bench/1 主题发送 50 条 payload 为 16B 的消息。预期订阅速率为 25W msg/s
测试二:建立 1000 订阅者,订阅 bench/1 主题,建立 5 个发布者,每个发布者每秒向 bench/1 主题发送 100 条 payload 为 16B 的消息。预期订阅速率为 50W msg/s
测试工具与 broker 在同一机器。
操作系统 | CPU数 | 内存 | 测试工具 |
---|---|---|---|
Ubuntu 22.04 虚拟机 | 8 | 6GB | emqtt-bench |
25W 测试:
# 订阅方
./emqtt-bench sub -c 1000 -i 1 -t bench/1 -V 4
# 发布方
./emqtt-bench pub -c 5 -I 20 -t bench/1 -s 16 -V 4
mosquitto:
实际订阅速率为 13W 左右,没有达到预期。
tinyMQTT:
实际订阅速率达到且稳定在 25W。
50W 测试:
# 订阅方
./emqtt-bench sub -c 1000 -i 1 -t bench/1 -V 4
# 发布方
./emqtt-bench pub -c 5 -I 10 -t bench/1 -s 16 -V 4
mosquitto:
订阅速率基本没变,可见 13W 已经是 mosquitto 极限。
tinyMQTT
实际订阅速率在 40W ~ 50W 之间,接近 50W。
mosquitto 是单线程的,没法利用多核 cpu,所以 tinyMQTT 采用了 one epoll per thread 的多 IO 线程设计,但除 IO 任务之外的其余逻辑,如查询订阅树、mqtt session 管理等仍由唯一的逻辑线程执行。逻辑线程与 IO 线程之间通过类似 Actor 模式的消息传递方式通信,无共享内存。
客户端提供了同步和异步两种使用方式,详见example中的client_sync.c和client_async.c,,默认是采用同步模式的,所有操作(connect/subscribe/unsubscribe/publish)都是同步阻塞的,如果要持续监听消息,可以在程序最后调用tinymqtt_loop(),这会阻塞当前线程,收到消息会调用on_message回调。
如果要使用异步模式,需要先调用tinymqtt_loop_threaded(),这会单独创建一个io线程,此后所有操作都是非阻塞的,操作请求将会被内部队列转移到io线程执行,结果都以回调函数形式进行通知。
client_sync.c
void on_message(char* topic, char* message, uint8_t qos, uint8_t retain)
{
tlog_info("received message [%s] topic=%s, qos=%u, retain=%u", message, topic, qos, retain);
}
int main()
{
tiny_mqtt* mqtt = tinymqtt_new("192.168.3.7", 1883);
connect_options ops = {
"username",
"password",
"tmq_client_test_client",
1,
60,
NULL
};
int res = tinymqtt_connect(mqtt, &ops);
if(res == CONNECTION_ACCEPTED)
{
tinymqtt_subscribe(mqtt, "test/sub", 1);
//tinymqtt_unsubscribe(mqtt, "test/sub");
tinymqtt_set_message_callback(mqtt, on_message);
tinymqtt_publish(mqtt, "test/pub", "hello!", 1, 0);
}
tinymqtt_loop(mqtt);
tinymqtt_destroy(mqtt);
return 0;
}
client_async.c
void on_message(char* topic, char* message, uint8_t qos, uint8_t retain)
{
tlog_info("received message [%s] topic=%s, qos=%u, retain=%u", message, topic, qos, retain);
}
void on_connect(tiny_mqtt* mqtt, int return_code)
{
if(return_code == CONNECTION_ACCEPTED)
{
tlog_info("CONNECTION_ACCEPTED");
tinymqtt_subscribe(mqtt, "test/sub", 2);
}
else
tlog_error("connect falied. return code=%d", return_code);
}
void on_disconnect(tiny_mqtt* mqtt)
{
tlog_info("disconnected");
tinymqtt_quit(mqtt);
}
int main()
{
tiny_mqtt* mqtt = tinymqtt_new("192.168.3.7", 1883);
connect_options ops = {
"username",
"password",
"tmq_client_test_client",
1,
60,
NULL
};
tinymqtt_set_message_callback(mqtt, on_message);
tinymqtt_set_connect_callback(mqtt, on_connect);
tinymqtt_set_disconnect_callback(mqtt, on_disconnect);
tinymqtt_loop_threaded(mqtt);
tinymqtt_connect(mqtt, &ops);
tinymqtt_async_wait(mqtt);
tinymqtt_destroy(mqtt);
return 0;
}