kbredemeier / off_broadway_mqtt

MQTT Producer for https://github.com/plataformatec/broadway
Apache License 2.0
12 stars 7 forks source link

Error on use ActiveMQ Artermis (MQTT) protocol #2

Open sleipnir opened 3 years ago

sleipnir commented 3 years ago

Hello I am trying to use this library with ActiveMQ Artemis. This broker supports different protocols including [MQTT 3.1.1] (https://activemq.apache.org/components/artemis/documentation/latest/mqtt.html). However when I try to connect I get the error below:

2020-10-16 23:01:29.861 [nonode@nohost]:[pid=<0.299.0> file=gen_server.erl ]:[error]:GenServer {Tortoise.Registry, {Tortoise.Connection, "elixir_client_id_3"}} terminating
** (ArgumentError) argument error
    (kernel 7.0) gen_tcp.erl:170: :gen_tcp.connect/4
    (tortoise 0.9.4) lib/tortoise/connection.ex:583: Tortoise.Connection.do_connect/2
    (tortoise 0.9.4) lib/tortoise/connection.ex:385: Tortoise.Connection.handle_info/2
    (stdlib 3.13) gen_server.erl:680: :gen_server.try_dispatch/4
    (stdlib 3.13) gen_server.erl:756: :gen_server.handle_msg/6
    (stdlib 3.13) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message: :connect
State: %Tortoise.Connection{backoff: %Tortoise.Connection.Backoff{max_interval: 30000, min_interval: 100, value: nil}, client_id: "elixir_client_id_3", connect: %Tortoise.Package.Connect{__META__: %Tortoise.Package.Meta{flags: 0, opcode: 1}, clean_session: true, client_id: "elixir_client_id_3", keep_alive: 60, password: nil, protocol: "MQTT", protocol_version: 4, user_name: nil, will: nil}, keep_alive: nil, opts: [client_id: "elixir_client_id_3", handler: {OffBroadway.MQTT.Handler, [client_id: "elixir_client_id_3", queue: {:via, Registry, {OffBroadway.MQTT.QueueRegistry, "tags.00666"}}, sub_ack: nil, meta: %{client_id: "elixir_client_id_3", host: "localhost", password: "******", port: 1883, qos: 0, topic_filter: "tags.00666", username: "admin"}, config: %OffBroadway.MQTT.Config{acknowledger: OffBroadway.MQTT.Acknowledger, client: OffBroadway.MQTT.Client, client_id_prefix: "elixir_client_id", dequeue_interval: 5000, handler: OffBroadway.MQTT.Handler, producer: OffBroadway.MQTT.Producer, queue: OffBroadway.MQTT.Queue, queue_registry: OffBroadway.MQTT.QueueRegistry, queue_supervisor: OffBroadway.MQTT.QueueSupervisor, server: {:tcp, [host: "localhost", port: 1883, username: "admin", password: "admin"]}, telemetry_prefix: :off_broadway_mqtt}]}], server: %Tortoise.Transport{host: 'localhost', opts: [:binary, {:packet, :raw}, {:active, false}, {:username, "admin"}, {:password, "admin"}], port: 1883, type: Tortoise.Transport.Tcp}, status: :down, subscriptions: %Tortoise.Package.Subscribe{__META__: %Tortoise.Package.Meta{flags: 2, opcode: 8}, identifier: nil, topics: [{"tags.00666", 0}]}}
2020-10-16 23:01:29.880 [nonode@nohost]:[pid=<0.309.0> file=gen_server.erl ]:[error]:GenServer {Tortoise.Registry, {Tortoise.Connection, "elixir_client_id_3"}} terminating
** (ArgumentError) argument error
    (kernel 7.0) gen_tcp.erl:170: :gen_tcp.connect/4
    (tortoise 0.9.4) lib/tortoise/connection.ex:583: Tortoise.Connection.do_connect/2
    (tortoise 0.9.4) lib/tortoise/connection.ex:385: Tortoise.Connection.handle_info/2
    (stdlib 3.13) gen_server.erl:680: :gen_server.try_dispatch/4
    (stdlib 3.13) gen_server.erl:756: :gen_server.handle_msg/6
    (stdlib 3.13) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message: :connect
State: %Tortoise.Connection{backoff: %Tortoise.Connection.Backoff{max_interval: 30000, min_interval: 100, value: nil}, client_id: "elixir_client_id_3", connect: %Tortoise.Package.Connect{__META__: %Tortoise.Package.Meta{flags: 0, opcode: 1}, clean_session: true, client_id: "elixir_client_id_3", keep_alive: 60, password: nil, protocol: "MQTT", protocol_version: 4, user_name: nil, will: nil}, keep_alive: nil, opts: [client_id: "elixir_client_id_3", handler: {OffBroadway.MQTT.Handler, [client_id: "elixir_client_id_3", queue: {:via, Registry, {OffBroadway.MQTT.QueueRegistry, "tags.00666"}}, sub_ack: nil, meta: %{client_id: "elixir_client_id_3", host: "localhost", password: "******", port: 1883, qos: 0, topic_filter: "tags.00666", username: "admin"}, config: %OffBroadway.MQTT.Config{acknowledger: OffBroadway.MQTT.Acknowledger, client: OffBroadway.MQTT.Client, client_id_prefix: "elixir_client_id", dequeue_interval: 5000, handler: OffBroadway.MQTT.Handler, producer: OffBroadway.MQTT.Producer, queue: OffBroadway.MQTT.Queue, queue_registry: OffBroadway.MQTT.QueueRegistry, queue_supervisor: OffBroadway.MQTT.QueueSupervisor, server: {:tcp, [host: "localhost", port: 1883, username: "admin", password: "admin"]}, telemetry_prefix: :off_broadway_mqtt}]}], server: %Tortoise.Transport{host: 'localhost', opts: [:binary, {:packet, :raw}, {:active, false}, {:username, "admin"}, {:password, "admin"}], port: 1883, type: Tortoise.Transport.Tcp}, status: :down, subscriptions: %Tortoise.Package.Subscribe{__META__: %Tortoise.Package.Meta{flags: 2, opcode: 8}, identifier: nil, topics: [{"tags.00666", 0}]}}
{:ok, #PID<0.294.0>}
iex(4)> 2020-10-16 23:01:29.882 [nonode@nohost]:[pid=<0.310.0> file=gen_server.erl ]:[error]:GenServer {Tortoise.Registry, {Tortoise.Connection, "elixir_client_id_3"}} terminating
** (ArgumentError) argument error
    (kernel 7.0) gen_tcp.erl:170: :gen_tcp.connect/4
    (tortoise 0.9.4) lib/tortoise/connection.ex:583: Tortoise.Connection.do_connect/2
    (tortoise 0.9.4) lib/tortoise/connection.ex:385: Tortoise.Connection.handle_info/2
    (stdlib 3.13) gen_server.erl:680: :gen_server.try_dispatch/4
    (stdlib 3.13) gen_server.erl:756: :gen_server.handle_msg/6
    (stdlib 3.13) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message: :connect
State: %Tortoise.Connection{backoff: %Tortoise.Connection.Backoff{max_interval: 30000, min_interval: 100, value: nil}, client_id: "elixir_client_id_3", connect: %Tortoise.Package.Connect{__META__: %Tortoise.Package.Meta{flags: 0, opcode: 1}, clean_session: true, client_id: "elixir_client_id_3", keep_alive: 60, password: nil, protocol: "MQTT", protocol_version: 4, user_name: nil, will: nil}, keep_alive: nil, opts: [client_id: "elixir_client_id_3", handler: {OffBroadway.MQTT.Handler, [client_id: "elixir_client_id_3", queue: {:via, Registry, {OffBroadway.MQTT.QueueRegistry, "tags.00666"}}, sub_ack: nil, meta: %{client_id: "elixir_client_id_3", host: "localhost", password: "******", port: 1883, qos: 0, topic_filter: "tags.00666", username: "admin"}, config: %OffBroadway.MQTT.Config{acknowledger: OffBroadway.MQTT.Acknowledger, client: OffBroadway.MQTT.Client, client_id_prefix: "elixir_client_id", dequeue_interval: 5000, handler: OffBroadway.MQTT.Handler, producer: OffBroadway.MQTT.Producer, queue: OffBroadway.MQTT.Queue, queue_registry: OffBroadway.MQTT.QueueRegistry, queue_supervisor: OffBroadway.MQTT.QueueSupervisor, server: {:tcp, [host: "localhost", port: 1883, username: "admin", password: "admin"]}, telemetry_prefix: :off_broadway_mqtt}]}], server: %Tortoise.Transport{host: 'localhost', opts: [:binary, {:packet, :raw}, {:active, false}, {:username, "admin"}, {:password, "admin"}], port: 1883, type: Tortoise.Transport.Tcp}, status: :down, subscriptions: %Tortoise.Package.Subscribe{__META__: %Tortoise.Package.Meta{flags: 2, opcode: 8}, identifier: nil, topics: [{"tags.00666", 0}]}}
2020-10-16 23:01:29.883 [nonode@nohost]:[pid=<0.313.0> file=gen_server.erl ]:[error]:GenServer {Tortoise.Registry, {Tortoise.Connection, "elixir_client_id_3"}} terminating
** (ArgumentError) argument error
    (kernel 7.0) gen_tcp.erl:170: :gen_tcp.connect/4
    (tortoise 0.9.4) lib/tortoise/connection.ex:583: Tortoise.Connection.do_connect/2
    (tortoise 0.9.4) lib/tortoise/connection.ex:385: Tortoise.Connection.handle_info/2
    (stdlib 3.13) gen_server.erl:680: :gen_server.try_dispatch/4
    (stdlib 3.13) gen_server.erl:756: :gen_server.handle_msg/6
    (stdlib 3.13) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message: :connect
State: %Tortoise.Connection{backoff: %Tortoise.Connection.Backoff{max_interval: 30000, min_interval: 100, value: nil}, client_id: "elixir_client_id_3", connect: %Tortoise.Package.Connect{__META__: %Tortoise.Package.Meta{flags: 0, opcode: 1}, clean_session: true, client_id: "elixir_client_id_3", keep_alive: 60, password: nil, protocol: "MQTT", protocol_version: 4, user_name: nil, will: nil}, keep_alive: nil, opts: [client_id: "elixir_client_id_3", handler: {OffBroadway.MQTT.Handler, [client_id: "elixir_client_id_3", queue: {:via, Registry, {OffBroadway.MQTT.QueueRegistry, "tags.00666"}}, sub_ack: nil, meta: %{client_id: "elixir_client_id_3", host: "localhost", password: "******", port: 1883, qos: 0, topic_filter: "tags.00666", username: "admin"}, config: %OffBroadway.MQTT.Config{acknowledger: OffBroadway.MQTT.Acknowledger, client: OffBroadway.MQTT.Client, client_id_prefix: "elixir_client_id", dequeue_interval: 5000, handler: OffBroadway.MQTT.Handler, producer: OffBroadway.MQTT.Producer, queue: OffBroadway.MQTT.Queue, queue_registry: OffBroadway.MQTT.QueueRegistry, queue_supervisor: OffBroadway.MQTT.QueueSupervisor, server: {:tcp, [host: "localhost", port: 1883, username: "admin", password: "admin"]}, telemetry_prefix: :off_broadway_mqtt}]}], server: %Tortoise.Transport{host: 'localhost', opts: [:binary, {:packet, :raw}, {:active, false}, {:username, "admin"}, {:password, "admin"}], port: 1883, type: Tortoise.Transport.Tcp}, status: :down, subscriptions: %Tortoise.Package.Subscribe{__META__: %Tortoise.Package.Meta{flags: 2, opcode: 8}, identifier: nil, topics: [{"tags.00666", 0}]}}

When I use Eclipse Mosquito it works however when I use ActiveMQ Artemis with MQTT enabled it doesn't work.

Below are excerpts of the code used:

defmodule Mercury.Flow do
  use OffBroadway.MQTT

  defmodule FlowException do
    defexception ack: :ignore, message: nil

    def message(e) do
      "message is probably coming from a nincompoop: " <> e.message
    end
  end

  def start_link(config, topic) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producers: [
        default: [
          module: {Producer, [config, subscription: {topic, 0}]},
          stages: 1
        ]
      ],
      processors: [default: [stages: 1]],
      batchers: [
        default: [stages: 1, batch_size: 10]
      ]
    )
  end

  @impl true
  def handle_message(_processor_name, message, _context) do
    message
    |> Message.update_data(&process_data/1)
  rescue
    e ->
      Message.failed(message, e)
  end

  defp process_data(%OffBroadway.MQTT.Data{acc: msg} = data) do
    msg
    |> String.downcase()
    |> String.contains?("great again")
    |> case do
      true -> raise FlowException, "contains \"great again\""
      false -> data
    end
  end

  @impl true
  def handle_batch(_, messages, _batch_info, _context) do
    # ...
    messages
  end
end
config =
  OffBroadway.MQTT.Config.new(
    client_id_prefix: "elixir_client_id",
    server_opts: [
      host: "localhost",
      port: "1883",
      transport: :tcp,
      username: "admin",
      password: "admin"
    ]
  )

Mercury.Flow.start_link(config, "tags.00666")

Do you have any idea what may be happening?

sleipnir commented 3 years ago

I found the reason. Authentication does not seem to work. When running ActiveMQ without authentication support the process worked accordingly.

docker run -it --rm -e DISABLE_SECURITY=true -p 8161:8161 -p 61616:61616 -p 61613:61613 -p 1883:1883 vromero/activemq-artemis:latest-alpine
[root@sleipnir mercury]# iex -S mix
Erlang/OTP 23 [erts-11.0.2] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe]

Interactive Elixir (1.11.0-dev) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> config =
...(1)>   OffBroadway.MQTT.Config.new(
...(1)>     client_id_prefix: "elixir_client_id",
...(1)>     server_opts: [
...(1)>       host: "localhost",
...(1)>       port: "1883",
...(1)>       transport: :tcp
...(1)>     ]
...(1)>   )
%OffBroadway.MQTT.Config{
  acknowledger: OffBroadway.MQTT.Acknowledger,
  client: OffBroadway.MQTT.Client,
  client_id_prefix: "elixir_client_id",
  dequeue_interval: 5000,
  handler: OffBroadway.MQTT.Handler,
  producer: OffBroadway.MQTT.Producer,
  queue: OffBroadway.MQTT.Queue,
  queue_registry: OffBroadway.MQTT.QueueRegistry,
  queue_supervisor: OffBroadway.MQTT.QueueSupervisor,
  server: {:tcp, [host: "localhost", port: 1883]},
  telemetry_prefix: :off_broadway_mqtt
}
iex(2)> Mercury.Flow.start_link(config, "tags.00666")
{:ok, #PID<0.291.0>}
iex(3)> 2020-10-16 23:35:29.397 [nonode@nohost]:[pid=<0.311.0> file=lib/off_broadway_mqtt/handler.ex ]:[debug]:initializing client
2020-10-16 23:35:29.397 [nonode@nohost]:[pid=<0.311.0> file=lib/off_broadway_mqtt/handler.ex ]:[debug]:client attempting to connect
sleipnir commented 3 years ago

Hello

sleipnir commented 3 years ago

ping

sleipnir commented 3 years ago

After investigating the Tortoise library code, I noticed that:

  1. This library here does not correctly initialize the connection when authentication is required.
  2. I didn't find the call to: Tortoise.Connection.subscribe anywhere in this library. During my tests the signature was only created on the ActiveMQ server after the explicit call to this method. I still need to investigate further to understand why this is so.