femtomc / MQTT.jl

Updated MQTT interface for Julia.
0 stars 0 forks source link

MQTT.jl

Build Status Coverage Status

MQTT Client Library

This code builds a library which enables applications to connect to an MQTT broker to publish messages, and to subscribe to topics and receive published messages.

This library supports: fully asynchronous operation, file persistence

Contents

Installation

Pkg.clone("https://github.com/rweilbacher/MQTT.jl.git")

Testing

Pkg.test("MQTT")

Usage

Import the library with the using keyword.

Samples are available in the examples directory.

using MQTT

Getting started

To use this library you need to follow at least these steps:

  1. Define an on_msg callback function.
  2. Create an instance of the Client struct and pass it your on_msg function.
  3. Call the connect method with your Client instance.
  4. Exchange data with the broker through publish, subscribe and unsubscribe.
  5. Disconnect from the broker. (Not strictly necessary, if you don't want to resume the session but considered good form and less likely to crash).

Basic example

Refer to the corresponding method documentation to find more options.

using MQTT
broker = "test.mosquitto.org"

#Define the callback for receiving messages.
function on_msg(topic, payload)
    info("Received message topic: [", topic, "] payload: [", String(payload), "]")
end

#Instantiate a client.
client = Client(on_msg)
connect(client, broker)
#Set retain to true so we can receive a message from the broker once we subscribe
#to this topic.
publish(client, "jlExample", "Hello World!", retain=true)
#Subscribe to the topic we sent a retained message to.
subscribe(client, ("jlExample", QOS_1))
#Unsubscribe from the topic
unsubscribe(client, "jlExample")
#Disconnect from the broker. Not strictly needed as the broker will also
#disconnect us if the socket is closed. But this is considered good form
#and needed if you want to resume this session later.
disconnect(client)

Client struct

The client struct is used to store state for an MQTT connection. All callbacks, apart from on_message, can't be set through the constructor and have to be set manually after instantiating the Client struct.

Fields in the Client that are relevant for the library user:

Constructors

All constructors take the on_message callback as an argument.

Client(on_msg::Function)

Specify a custom ping_timeout

Client(on_msg::Function, ping_timeout::UInt64)

Message struct

The Message struct is the data structure for generic MQTT messages. This is mostly used internally but is exposed to the user in some cases for easier to read arguments (Passing a "will" to the connect method uses the Message struct for example).

Constructors

This is a reduced constructor meant for messages that can't be duplicate or retained (like the "will"). This message constructor should be in most cases! The dup and retained flag are false by default.

function Message(qos::QOS, topic::String, payload...)

This is the full Message constructor. It has all possible fields.

function Message(dup::Bool, qos::QOS, retain::Bool, topic::String, payload...)

This constructor is mostly for internal use. It uses the UInt8 equivalent of the QOS enum for easier processing.

function Message(dup::Bool, qos::UInt8, retain::Bool, topic::String, payload...)

Connect

MQTT v3.1.1 Doc

Connects the Client instance to the specified broker. There is a synchronous and an asynchronous version available. Both versions take the same arguments.

Arguments

Required arguments:

Optional arguments:

Call example

The dup and retain flag of a will have to be false so it's safest to use the minimal Message constructor (Refer to Message documentation above).

connect(c, "test.mosquitto.org", keep_alive=60, client_id="TestClient", user=User("name", "pw"), will=Message(QOS_2, "TestClient/will", "payload", more_payload_data))

Synchronous connect

This method waits until the client is connected to the broker. TODO add return documentation

function connect(client::Client, host::AbstractString, port::Integer=1883;
keep_alive::Int64=0,
client_id::String=randstring(8),
user::User=User("", ""),
will::Message=Message(false, 0x00, false, "", Array{UInt8}()),
clean_session::Bool=true)

Asynchronous connect

This method doesn't wait and returns a Future object. You may wait on this object with the fetch method. This future completes once the client is fully connected. TODO add future data documentation

function connect_async(client::Client, host::AbstractString, port::Integer=1883;
keep_alive::Int64=0,
client_id::String=randstring(8),
user::User=User("", ""),
will::Message=Message(false, 0x00, false, "", Array{UInt8}()),
clean_session::Bool=true)

Publish

MQTT v3.1.1 Doc

Publishes a message to the broker connected to the Client instance provided as a parameter. There is a synchronous and an asynchronous version available. Both versions take the same arguments.

Arguments

Required arguments:

Optional arguments:

Call example

These are valid payload... examples.

publish(c, "hello/world")
publish(c, "hello/world", "Test", 6, 4.2)

This is a valid use of the optional arguments.

publish(c, "hello/world", "Test", 6, 4.2, qos=QOS_1, retain=true)

Synchronous publish

This method waits until the publish message has been processed completely and successfully. So in case of QOS 2 it waits until the PUBCOMP has been received. TODO add return documentation

function publish(client::Client, topic::String, payload...;
    dup::Bool=false,
    qos::UInt8=0x00,
    retain::Bool=false)

Asynchronous publish

This method doesn't wait and returns a Future object. You may choose to wait on this object. This future completes once the publish message has been processed completely and successfully. So in case of QOS 2 it waits until the PUBCOMP has been received. TODO change future data documentation

function publish_async(client::Client, topic::String, payload...;
    dup::Bool=false,
    qos::UInt8=0x00,
    retain::Bool=false)

Subscribe

MQTT v3.1.1 Doc

Subscribes the Client instance, provided as a parameter, to the specified topics. There is a synchronous and an asynchronous version available. Both versions take the same arguments.

Arguments

Required arguments:

Call example

This example subscribes to the topic "test" with QOS_2 and "test2" with QOS_0.

subscribe(c, ("test", QOS_2), ("test2", QOS_0))

Synchronous subscribe

This method waits until the subscribe message has been successfully sent and acknowledged. TODO add return documentation

function subscribe(client, topics::Tuple{String, QOS}...)

Asynchronous subscribe

This method doesn't wait and returns a Future object. You may choose to wait on this object. This future completes once the subscribe message has been successfully sent and acknowledged. TODO change future data documentation

function subscribe_async(client, topics::Tuple{String, QOS}...)

Unsubscribe

MQTT v3.1.1 Doc

This method unsubscribes the Client instance from the specified topics. There is a synchronous and an asynchronous version available. Both versions take the same arguments.

Arguments

Required arguments:

Example call

unsubscribe(c, "test1", "test2", "test3")

Synchronous unsubscribe

This method waits until the unsubscribe method has been sent and acknowledged. TODO add return documentation

function unsubscribe(client::Client, topics::String...)

Asynchronous unsubscribe

This method doesn't wait and returns a Future object. You may wait on this object with the fetch method. This future completes once the unsubscribe message has been sent and acknowledged. TODO add future data documentation

function unsubscribe_async(client::Client, topics::String...)

Disconnect

MQTT v3.1.1 Doc

Disconnects the Client instance gracefully, shuts down the background tasks and stores session state. There is only a synchronous version available.

Arguments

Required arguments:

Example call

disconnect(c)

Synchronous disconnect

function disconnect(client::Client))

Internal workings

It isn't necessary to read this section if you just want to use this library but it might give additional insight into how everything works.

The Client instance has a Channel, called write_packets, to keep track of outbound messages that still need to be sent. Julia channels are basically just blocking queues so they have exactly the behavior we want.

For storing messages that are awaiting acknowledgment, Client has a Dict, mapping message ids to Future instances. These futures get completed once the message has been completely acknowledged. There might then be information in the Future relevant to the specific message.

Once the connect method is called on a Client, relevant fields are initialized and the julia connect method is called to get a connected socket. Then two background tasks are started that perpetually check for messages to send and receive. If keep_alive is non-zero another tasks get started that handles sending the keep alive and verifying the pingresp arrived in time.

TODO explain read and write loop a bit