A Bro log writer writer that sends logging output to Kafka. This provides a convenient means for tools in the Hadoop ecosystem, such as Storm, Spark, and others, to process the data generated by Bro.
Install librdkafka, a native client library for Kafka. This plugin has been tested against the latest release of librdkafka, which at the time of this writing is v0.9.4.
In order to use this plugin within a kerberized Kafka environment, you will also need libsasl2
installed and will need to pass --enable-sasl
to the configure
script.
curl -L https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz | tar xvz
cd librdkafka-0.9.4/
./configure --enable-sasl
make
sudo make install
Build the plugin using the following commands.
./configure --bro-dist=$BRO_SRC
make
sudo make install
Run the following command to ensure that the plugin was installed successfully.
$ bro -N Apache::Kafka
Apache::Kafka - Writes logs to Kafka (dynamic, version 0.2)
The following examples highlight different ways that the plugin can be used. Simply add the Bro script language to your local.bro
file (for example, /usr/share/bro/site/local.bro
) as shown to demonstrate the example.
The goal in this example is to send all HTTP and DNS records to a Kafka topic named bro
.
kafka_conf
configuration table. topic_name
all records will be sent to the same Kafka topic.logs_to_send
will ensure that only HTTP and DNS records are sent.@load Apache/Kafka/logs-to-kafka.bro
redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG);
redef Kafka::topic_name = "bro";
redef Kafka::kafka_conf = table(
["metadata.broker.list"] = "localhost:9092"
);
It is also possible to send each log stream to a uniquely named topic. The goal in this example is to send all HTTP records to a Kafka topic named http
and all DNS records to a separate Kafka topic named dns
.
topic_name
value must be set to an empty string.$path
value of Bro's Log Writer mechanism is used to define the topic name.$config
configuration table. @load Apache/Kafka/logs-to-kafka.bro
redef Kafka::topic_name = "";
redef Kafka::tag_json = T;
event bro_init()
{
# handles HTTP
local http_filter: Log::Filter = [
$name = "kafka-http",
$writer = Log::WRITER_KAFKAWRITER,
$config = table(
["metadata.broker.list"] = "localhost:9092"
),
$path = "http"
];
Log::add_filter(HTTP::LOG, http_filter);
# handles DNS
local dns_filter: Log::Filter = [
$name = "kafka-dns",
$writer = Log::WRITER_KAFKAWRITER,
$config = table(
["metadata.broker.list"] = "localhost:9092"
),
$path = "dns"
];
Log::add_filter(DNS::LOG, dns_filter);
}
You may want to configure bro to filter log messages with certain characteristics from being sent to your kafka topics. For instance, Metron currently doesn't support IPv6 source or destination IPs in the default enrichments, so it may be helpful to filter those log messages from being sent to kafka (although there are multiple ways to approach this). In this example we will do that that, and are assuming a somewhat standard bro kafka plugin configuration, such that:
bro
topic, by configuring Kafka::topic_name
.http
, dns
, or conn
), by setting tag_json
to true.@load Apache/Kafka/logs-to-kafka.bro
redef Kafka::topic_name = "bro";
redef Kafka::tag_json = T;
event bro_init() &priority=-5
{
# handles HTTP
Log::add_filter(HTTP::LOG, [
$name = "kafka-http",
$writer = Log::WRITER_KAFKAWRITER,
$pred(rec: HTTP::Info) = { return ! (( |rec$id$orig_h| == 128 || |rec$id$resp_h| == 128 )); },
$config = table(
["metadata.broker.list"] = "localhost:9092"
)
]);
# handles DNS
Log::add_filter(DNS::LOG, [
$name = "kafka-dns",
$writer = Log::WRITER_KAFKAWRITER,
$pred(rec: DNS::Info) = { return ! (( |rec$id$orig_h| == 128 || |rec$id$resp_h| == 128 )); },
$config = table(
["metadata.broker.list"] = "localhost:9092"
)
]);
# handles Conn
Log::add_filter(Conn::LOG, [
$name = "kafka-conn",
$writer = Log::WRITER_KAFKAWRITER,
$pred(rec: Conn::Info) = { return ! (( |rec$id$orig_h| == 128 || |rec$id$resp_h| == 128 )); },
$config = table(
["metadata.broker.list"] = "localhost:9092"
)
]);
}
logs_to_send
is mutually exclusive with $pred
, thus for each log you want to set $pred
on, you must individually setup a Log::add_filter
and refrain from including that log in logs_to_send
.is_v6_subnet()
bro function in your predicate, as of their 2.5 release, however the above example should work on bro 2.4 and newer, which has been the focus of the kafka plugin.kafka_conf
The global configuration settings for Kafka. These values are passed through directly to librdkafka. Any valid librdkafka settings can be defined in this table. The full set of valid librdkafka settings are available here.
redef Kafka::kafka_conf = table(
["metadata.broker.list"] = "localhost:9092",
["client.id"] = "bro"
);
topic_name
The name of the topic in Kafka where all Bro logs will be sent to.
redef Kafka::topic_name = "bro";
max_wait_on_shutdown
The maximum number of milliseconds that the plugin will wait for any backlog of queued messages to be sent to Kafka before forced shutdown.
redef Kafka::max_wait_on_shutdown = 3000;
tag_json
If true, a log stream identifier is appended to each JSON-formatted message. For
example, a Conn::LOG message will look like { 'conn' : { ... }}
.
redef Kafka::tag_json = T;
debug
A comma separated list of debug contexts in librdkafka which you want to enable. The available contexts are:
This plugin supports producing messages from a kerberized kafka. There are a couple of prerequisites and a couple of settings to set.
If you are using SASL as a security protocol for kafka, then you must have libsasl or libsasl2 installed. You can tell if sasl is enabled by running the following from the directory in which you have build librdkafka:
examples/rdkafka_example -X builtin.features
builtin.features = gzip,snappy,ssl,sasl,regex
As stated above, you can configure the producer kafka configs in
${BRO_HOME}/share/bro/site/local.bro
. There are a few configs
necessary to set, which are described
here.
For an environment where the following is true:
node1:6667
SASL_PLAINTEXT
as the security protocolmetron
keytabmetron
is metron@EXAMPLE.COM
The kafka topic bro
has been given permission for the metron
user to
write:
# login using the metron user
kinit -kt /etc/security/keytabs/metron.headless.keytab metron@EXAMPLE.COM
${KAFKA_HOME}/kafka-broker/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=node1:2181 --add --allow-principal User:metron --topic bro
The following is how the ${BRO_HOME}/share/bro/site/local.bro
looks:
@load Apache/Kafka/logs-to-kafka.bro
redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG);
redef Kafka::topic_name = "bro";
redef Kafka::tag_json = T;
redef Kafka::kafka_conf = table( ["metadata.broker.list"] = "node1:6667"
, ["security.protocol"] = "SASL_PLAINTEXT"
, ["sasl.kerberos.keytab"] = "/etc/security/keytabs/metron.headless.keytab"
, ["sasl.kerberos.principal"] = "metron@EXAMPLE.COM"
);