This plugin provides both a Source and Matcher which uses RabbitMQ as its transport.
fluent-amqp-plugin | fluent | ruby |
---|---|---|
>= 0.10.0 | >= 0.14.8 | >= 2.1 |
< 0.10.0 | > 0.10.0, < 2 * | >= 1.9 |
You can use the hosts
parameter to provide an array of rabbitmq hosts which
are in your cluster. This allows for highly avaliable configurations where a
node in your cluster may become inaccessible and this plugin will attempt a reconnection
on another host in the array.
WARNING: Due to limitations in the library being used for connecting to RabbitMQ each node of the cluster must use the same port, vhost and other configuration.
<source>
type amqp
hosts ["amqp01.example.com","amqp02.example.com"]
port 5672
vhost /
user guest
pass guest
queue logs
format json
</source>
If you would like to filter events from certain sources, you can make use of the
key
, tag_key
and tag_header
configuration options.
The RabbitMQ routing key that is set for the message on the broker determines what you may be able to filter against when consuming messages.
For example, if you want a 'catch-all' consumer that gets all messages from a
direct exchange, you should set tag_key true
on both source
and matcher
. This
will then recreate the original event's tag ready for processing by the consumers
matchers.
If you want to have selective control over the messages that are consumed, you
can set tag_key true
on the matcher, but key some.tag
on the source. Only
messages with the given tag will be consumed, however its recommended that you
understand the difference between the different exchange types, and how multiple
consumers may impact message delivery.
The following parameters are common to both matcher and source plugins, and can be used as required.
param | type | default | description |
---|---|---|---|
:host | :string | nil | Required (if hosts unset) Hostname of RabbitMQ server |
:hosts | :array | nil | Required (if host unset) An array of hostnames of RabbitMQ server in a common cluster (takes precidence over host ) |
:user | :string | "guest" | Username to connect |
:pass | :string | "guest" | Password to authenticate with (Secret) |
:vhost | :string | "/" | RabbitMQ Virtual Host |
:port | :integer | 5672 | RabbitMQ listening port |
:durable | :bool | false | Should the queue or exchange be durable? |
:passive | :bool | false | If true, will fail if queue or exchange does not exist |
:auto_delete | :bool | false | Should the queue be deleted when all consumers have closed? |
:heartbeat | :integer | 60 | Frequency of heartbeats to ensure quiet connections are kept open |
:ssl | :bool | false | Is SSL enabled for this connection to RabbitMQ |
:verify_ssl | :bool | false | Verify the SSL certificate presented by RabbitMQ |
:tls | :bool | false | Should TLS be used for authentication |
:tls_cert | :string | nil | Path (or content) of TLS Certificate |
:tls_key | :string | nil | Path (or content) of TLS Key |
:tls_ca_certificates | :array | nil | Array of paths to CA certificates |
:tls_verify_peer | :bool | true | Verify the servers TLS certificate |
:tag_key | :bool | false | Should the routing key be used for the event tag |
:tag_header | :string | nil | What header should be used for the event tag |
:time_header | :string | nil | What header should be used for the events timestamp |
Using the amqp as a source allows you to read messages from RabbitMQ and handle them in the same manner as a locally generated event.
It can be used in isolation; reading (well formed) events generated by other applications and published onto a queue, or used with the amqp matcher, which can replace the use of the fluent forwarders.
Note: The following are in addition to the common parameters shown above.
param | type | default | description |
---|---|---|---|
:tag | :string | "hunter.amqp" | Accepted events are tagged with this string (See also tag_key) |
:queue | :string | nil | What queue contains the events to read |
:exclusive | :bool | false | Should we have exclusive use of the queue? See notes on Multiple Workers below. |
:payload_format | :string | "json" | Deprecated - Use format |
:bind_exchange | :boolean | false | Should the queue automatically bind to the exchange |
:exchange | :string | nil | What exchange should the queue bind to? |
:exchange_type | :string | "direct" | Type of exchange ( direct, fanout, topic, headers, x-consistent-hash, x-modulus-hash ) |
:routing_key | :string | nil | What exchange should the queue bind to? |
:include_headers | :bool | false | If true, include Message headers in the parsed payload with key "headers" (if there is already a field name headers in the parsed payload, then it will be overwritten) |
<source>
type amqp
host amqp.example.com
port 5672
vhost /
user guest
pass guest
queue logs
format json
</source>
param | type | default | description | |
---|---|---|---|---|
:exchange | :string | "" | Name of the exchange to send events to | |
:exchange_type | :string | "direct" | Type of exchange ( direct, fanout, topic, headers, x-consistent-hash, x-modulus-hash ) | |
:persistent | :bool | false | Are messages kept on the exchange even if RabbitMQ shuts down | |
:key | :string | nil | Routing key to attach to events (Only applies when exchange_type topic ) See also tag_key |
|
:content_type | :string | "application/octet" | Content-type header to send with message | |
:content_encoding | :string | nil | Content-Encoding header to send - eg base64 or rot13 |
It is possible to specify message headers based on the content of the incoming message, or as a fixed default value as shown below;
<matcher ...>
...
<header>
name LogLevel
source level
default "INFO"
</header>
<header>
name SourceHost
default my.example.com
</header>
<header>
name CorrelationID
source x-request-id
</header>
<header>
name NestedExample
source a.nested.value
</header>
<header>
name AnotherNestedExample
source ["a", "nested", "value"]
</header>
...
</matcher>
The header elements may be set multiple times for multiple additional headers to be included on any given message.
<match **.**>
type amqp
key my_routing_key
exchange amq.direct
host amqp.example.com
port 5672
vhost /
user guest
pass guest
content_type application/json
</match>
One particular use case of the AMQP plugin is as an alternative to the built-in fluent forwarders.
You can simply setup each client to output events to a RabbitMQ exchange which is then consumed by one or more input agents.
The example configuration below shows how to setup a direct exchange, with multiple consumers each receiving events.
<match **>
type amqp
exchange amq.direct
host amqp.example.com
port 5672
vhost /
user guest
pass guest
format json
tag_key true
</match>
<source>
type amqp
host amqp.example.com
port 5672
vhost /
user guest
pass guest
queue my_queue
format json
tag_key true
</source>
The example below shows how you can configure TLS authentication using signed encryption keys which will be validated by your appropriately configured RabbitMQ installation.
For more information on setting up TLS encryption, see the Bunny TLS documentation
Note: The 'source' configuration accepts the same arguments.
<match **.**>
type amqp
key my_routing_key
exchange amq.direct
host amqp.example.com
port 5671 # Note that your port may change for TLS auth
vhost /
tls true
tls_key "/etc/fluent/ssl/client.key.pem"
tls_cert "/etc/fluent/ssl/client.crt.pem"
tls_ca_certificates ["/etc/fluent/ssl/server.cacrt.pem", "/another/ca/cert.file"]
tls_verify_peer true
auth_mechanism EXTERNAL
</match>
This plugin supports multiple workers for both source and matcher configurations.
Note that when using exclusive queues with multiple workers the queues will be renamed based on the worker id.
For example, if your queue is configured as fluent.queue
, with 4 workers and exclusive: true
the plugin
will create four named queues;
Be aware that the first queue will keep the same name as given to maintain compatibility.
A docker container is included in this project to help with testing and debugging.
You can simply build the docker container's ready for use with the following;
docker-compose build
Start the cluster of three containers with;
docker-compose up
And finally, submit test events, one a second, to the built in tcp.socket source with;
while [ true ] ; do echo "{ \"test\": \"$(date)\" }" | nc ${DOCKER_IP} 20001; sleep 1; done
You may find that rabbitmq doesn't behave nicely when delivering lots of events to a single queue as the process thread gets overloaded and starts to send flow control events back to publishers. If you're in this situation, try the rabbitmq-sharding plugin which is in RMQ 3.6+ and can allow queues to be dynamically generated per-node.
To use this;
rabbitmq-plugins enable rabbitmq_sharding
x-modulus-hash
or x-consistent-hash
rabbitmqctl set_policy images-shard "^fluent.modhash$" '{"shards-per-node": 2, "routing-key": "1234"}'
Warning: You will need to run at least N consumers for the N shards created as the plugin does not try to route all shards onto consumers dynamically.
VERSION
filebundle exec rake release
git push upstream --tags
Copyright (c) 2011 Hiromi Ishii. See LICENSE.txt for Copyright (c) 2013- github/giraffi. See LICENSE.txt for further details.