renderedtext / ex-tackle

💯 percent reliable microservice communication
MIT License
48 stars 14 forks source link
amqp asynchronous elixir microservices rabbitmq semaphore-open-source

Tackle

Build Status

Tackles the problem of processing asynchronous jobs in reliable manner by relying on RabbitMQ.

You should also take a look at Ruby Tackle.

Why should I use tackle?

Installation

Add the following to the list of your dependencies:

def deps do
  [
    {:tackle, github: "renderedtext/ex-tackle"}
  ]
end

Also, add it to the list of your applications:

def application do
  [applications: [:tackle]]
end

Publishing messages to an exchange

To publish a message to an exchange:

options = %{
  url: "amqp://rabbitmq:5672",
  exchange: "test-exchange",
  routing_key: "test-messages",
}

Tackle.publish("Hi!", options)

The previous example will open a new connection, create the exchange if it doesn't exists, and close the connection. This makes sure that everything is place, but has pretty bad performance.

For fast publishing of multiple messages, open a channel manually, make sure that the exchange exists, and publish messages with Tackle.Exchange.publish. Example:

{:ok, c}  = Tackle.Connection.open(:publisher, "amqp://rabbitmq:5672")
channel   = Tackle.Channel.create(c)
exchange  = Tackle.Exchange.create(channel, "test-exchange")
route_key = "hello"

(1..1_000) |> Enum.each(fn _ ->
  Tackle.Exchange.publish(channel, exchange, "Hi!", routing_key)
end)

Consuming messages from an exchange

First, declare a consumer module:

defmodule TestConsumer do
  use Tackle.Consumer,
    url: "amqp://rabbitmq:5672",
    exchange: "test-exchange",
    routing_key: "test-messages",
    service: "my-service"

  def handle_message(message) do
    IO.puts "A message arrived. Life is good!"

    IO.puts message
  end
end

And then start it to consume messages:

TestConsumer.start_link

Rescuing dead messages

If you consumer is broken, or in other words raises an exception while handling messages, your messages will end up in a dead messages queue.

To rescue those messages, you can use Tackle.republish:

dead_queue_name = "my-service.test-message.dead"

options = {
  url: "amqp://rabbitmq:5672",
  queue: dead_queue_name,
  exchange: "test-exchange",
  routing_key: "test-messages",
  count: 1
}

Tackle.republish(options)

The above will pull one message from the dead_queue_name and publish it on the test-exchange exchange with test-messages routing key.

To republish multiple messages, use a bigger count number.

Opening multiple channels through the same connection

By default each channel (consumer) opens separate connection to the server.

If you want to reduce number of opened connections from one Elixir application to RabbitMQ server, you can map multiple channels to single connection.

Each connection can have name, supplied as optional parameter connection_id. All consumers that have the same connection name share single connection.

Parameter connection_id is optional and if not supplied, connection_id is set to :default. Value :default has exceptional semantic: all channels with connection_id set to :default use separate connections - one channel per :default connection.

To use this feature

In consumer specification use connection_id parameter:

defmodule Consumer do
  use Tackle.Consumer,
    url: "...",
    connection_id: :connection_identifier,
    ...