MQTT is a lightweight publish/subscribe messaging protocol designed for M2M (machine to machine) telemetry in low bandwidth environments. Fastapi-mqtt is the client for working with MQTT.
For more information about MQTT, please refer to here: MQTT
Fastapi-mqtt wraps around gmqtt module. Gmqtt Python async client for MQTT client implementation. Module has support of MQTT version 5.0 protocol
The key feature are:
MQTT specification avaliable with help decarator methods using callbacks:
MQTT Settings available with pydantic
Authentication to broker with credentials
unsubscribe certain topics and publish to certain topics
pip install fastapi-mqtt
from contextlib import asynccontextmanager
from typing import Any
from fastapi import FastAPI
from gmqtt import Client as MQTTClient
from fastapi_mqtt import FastMQTT, MQTTConfig
mqtt_config = MQTTConfig()
fast_mqtt = FastMQTT(config=mqtt_config)
async def _lifespan(_app: FastAPI):
await fast_mqtt.mqtt_startup()
await fast_mqtt.mqtt_shutdown()
app = FastAPI(lifespan=_lifespan)
def connect(client: MQTTClient, flags: int, rc: int, properties: Any):
client.subscribe("/mqtt") # subscribing mqtt topic
print("Connected: ", client, flags, rc, properties)
@fast_mqtt.subscribe("mqtt/+/temperature", "mqtt/+/humidity", qos=1)
async def home_message(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any):
print("temperature/humidity: ", topic, payload.decode(), qos, properties)
async def message(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any):
print("Received message: ", topic, payload.decode(), qos, properties)
@fast_mqtt.subscribe("my/mqtt/topic/#", qos=2)
async def message_to_topic_with_high_qos(
client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any
"Received message to specific topic and QoS=2: ", topic, payload.decode(), qos, properties
def disconnect(client: MQTTClient, packet, exc=None):
def subscribe(client: MQTTClient, mid: int, qos: int, properties: Any):
print("subscribed", client, mid, qos, properties)
async def func():
fast_mqtt.publish("/mqtt", "Hello from Fastapi") # publishing mqtt topic
return {"result": True, "message": "Published"}
Publish method:
async def func():
fast_mqtt.publish("/mqtt", "Hello from Fastapi") # publishing mqtt topic
return {"result": True, "message": "Published"}
Subscribe method:
def connect(client, flags, rc, properties):
client.subscribe("/mqtt") # subscribing mqtt topic
print("Connected: ", client, flags, rc, properties)
Changing connection params
mqtt_config = MQTTConfig(
fast_mqtt = FastMQTT(config=mqtt_config)
, using an external MQTT broker to connect (defaults to '').# (opc) Run a local mosquitto MQTT broker with docker
docker run -d --name mosquitto -p 9001:9001 -p 1883:1883 eclipse-mosquitto:1.6.15
# Set host for test broker when running pytest
TEST_BROKER_HOST=localhost pytest
# Run the example apps against local broker, with uvicorn
TEST_BROKER_HOST=localhost uvicorn --port 8000 --reload
TEST_BROKER_HOST=localhost uvicorn --port 8000 --reload
Fell free to open issue and send pull request.
Thanks To Contributors. Contributions of any kind are welcome!
Before you start please read CONTRIBUTING