conduitframework / conduit_amqp

MIT License
8 stars 9 forks source link
amqp conduit elixir message-queue rabbitmq

ConduitAMQP

An AMQP adapter for Conduit.

Installation

This package can be installed as:

  1. Add conduit_amqp to your list of dependencies in mix.exs:

    ```elixir
    def deps do
      [{:conduit_amqp, "~> 0.6.3"}]
    end
    ```
  2. Ensure conduit_amqp is started before your application:

      def application do
        [applications: [:conduit_amqp]]
      end

Configuring the Adapter

# config/config.exs

config :my_app, MyApp.Broker,
  adapter: ConduitAMQP,
  url: "amqp://my_app:secret@my-rabbit-host.com"

# Stop lager redirecting :error_logger messages
config :lager, :error_logger_redirect, false

# Stop lager removing Logger's :error_logger handler
config :lager, :error_logger_whitelist, [Logger.ErrorHandler]

For the full set of options, see ConduitAQMP.

Configuring Exchanges

You can define exchanges with the exchange macro in the configure block of your Broker. The exchange macro accepts the name of the exchange and options for the exchange.

Options

See exchange.declare for more details.

Example

defmodule MyApp.Broker do
  use Conduit.Broker, otp_app: :my_app

  configure do
    exchange "my.topic", type: :topic, durable: true
  end
end

Configuring Queues

You can define queues with the queue macro in the configure block of your Broker. The queue macro accepts the name of the queue and options for the exchange.

Options

See queue.declare for more details.

Example

defmodule MyApp.Broker do
  use Conduit.Broker, otp_app: :my_app

  configure do
    queue "my.queue", from: ["#.created.user"], exchange: "amq.topic", durable: true
  end
end

Configuring a Subscriber

Inside an incoming block for a broker, you can define subscriptions to queues. Conduit will route messages on those queues to your subscribers.

defmodule MyApp.Broker do
  incoming MyApp do
    subscribe :my_subscriber, MySubscriber, from: "my.queue"
    subscribe :my_other_subscriber, MyOtherSubscriber,
      from: "my.other.queue",
      prefetch_size: 20
  end
end

Options

Note: It's highly recommended to set :prefetch_size or :prefetch_count to a non-zero value to limit the memory consumed when a queue is backed up.

See basic.qos and basic.consume for more details on options.

Configuring a Publisher

Inside an outgoing block for a broker, you can define publications to exchanges. Conduit will deliver messages using the options specified. You can override these options, by passing different options to your broker's publish/3.

defmodule MyApp.Broker do
  outgoing do
    publish :destination_route,
      to: "my.routing_key",
      exchange: "amq.topic"
    publish :other_destination_route,
      to: "my.other.routing_key",
      exchange: "amq.topic"
  end
end

Options

See basic.publish for more details.

Example usage

%Message{}
|> put_body(%{"my" => "message"})
|> Broker.publish(:destination_route)

Architecture

ConduitAQMP architecture

When ConduitAMQP is used as an adapter for Conduit, it starts ConduitAMQP as a child supervisor. ConduitAMQP starts:

  1. ConduitAQMP.ConnPool - Creates and supervises a pool of AMQP connections.
  2. ConduitAMQP.PubSub - Creates and supervises ConduitAMQP.PubPool and ConduitAMQP.SubPool.
  3. ConduitAMQP.Subscribers - A supervisor for subscribers that process messages.