A MQTT connector for Broadway.
Add off_broadway_mqtt_connector
to the list of dependencies in mix.exs
:
def deps do
[
{:off_broadway_mqtt, "~> 0.1.0", hex: "off_broadway_mqtt_connector"}
]
end
Notice that the package has a different name than the application!
Add it as a producer to your Broadway
:
defmodule MyApp.NincompoopFilter do
use OffBroadway.MQTT
defmodule Nincompoop do
defexception ack: :ignore, message: nil
def message(e) do
"message is probably coming from a nincompoop: " <> e.message
end
end
def start_link(config, topic) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producers: [
default: [
module: {Producer, [config, subscription: {topic, 0}]},
stages: 1
]
],
processors: [default: [stages: 1]],
batchers: [
default: [stages: 1, batch_size: 10]
]
)
end
@impl true
def handle_message(_processor_name, message, _context) do
message
|> Message.update_data(&process_data/1)
rescue
e ->
Message.failed(message, e)
end
defp process_data(%OffBroadway.MQTT.Data{acc: msg} = data) do
msg
|> String.downcase()
|> String.contains?("great again")
|> case do
true -> raise Nincompoop, "contains \"great again\""
false -> data
end
end
@impl true
def handle_batch(_, messages, _batch_info, _context) do
# ...
messages
end
end
Start it by passing it at least a t:OffBroadway.MQTT.Config.t/0
and the
subscription
option with a topic to subscribe to and the desired QOS. For
further options refer to the OffBroadway.MQTT.Producer
docs.
Default values for the configuration can be given via the mix config:
use Mix.Config
config :off_broadway_mqtt,
client_id_prefix: "sensor_data_processor",
server_opts: [
host: "vernemq",
port: 8883,
transport: :ssl
],
handler: MyApp.BetterHandler
Then build a config and start your broadway:
# Builds a configuration with all configured default values
config = OffBroadway.MQTT.Config.new_from_app_config()
# Builds a configuration from the defaults and overrides values
config =
OffBroadway.MQTT.Config.new(
client_id_prefix: "myapp",
server_opts: [
host: "mosquitto",
# port is converted into a `integer` if it is not already one
port: "1883",
transport: :tcp,
username: "admin",
password: "admin"
]
)
# Start broadway
MyApp.NincompoopFilter.start(config, "test_topic")
Keep in mind that any option with nil or the empty string will be removed from the server options to prevent issues when configuring the application from environment variables.
Telemetry events are disabled by default. To enable them the following must be configured at compile time:
use Mix.Config
config :off_broadway_mqtt,
telemetry_enabled: true,
A prefix can be configured that is used to prefix any telemetry event.
use Mix.Config
config :off_broadway_mqtt,
telemetry_prefix: :my_broadway,
The prefix can also be passed at runtime with the
t:OffBroadway.MQTT.Config.t/0
to the producer.
The following events are emitted:
my_broadway.client.connection.up.count
my_broadway.client.connection.down.count
my_broadway.client.subscription.up.count
my_broadway.client.subscription.down.count
my_broadway.client.messages.count
my_broadway.queue.in.count
my_broadway.queue.in.size
my_broadway.queue.out.count
my_broadway.queue.out.size
my_broadway.acknowledger.success.count
my_broadway.acknowledger.failed.count
my_broadway.acknowledger.ignored.count
my_broadway.acknowledger.requeued.count
For development you probably need a running MQTT server in your develoment
environment. The provided docker-compose.yml
starts a vernemq
container for you.
Then run the following commands:
mix deps.get
mix test
The documentation can be generated with mix docs
. Code coverage report can be
generated with mix coveralls.html
.
This library heavily depends on Martin Gausby's tortoise for connecting to the MQTT broker.
Copyright 2019 Kristopher Bredemeier
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.