Closed randolpht closed 5 years ago
Hey @randolpht , can you provide a few datasets that you were testing when that error showed? That will help me to do some testing with a few things I am changing on the HELK side. I will also link that to a HELK issue since it is a HELK Kafka broker issue and not the dataset provided.
I tested every single file and I did not get that error message. Can you please provide more information about your setup? I would like to know:
Let me know. I cannot replicate your issue. I decompressed each file and sent it over to a fresh HELK instance. I was not ingesting logs at the same time I was using kafkacat. Maybe that is different with your testing steps. Please provide more details. Thank you @randolpht 👍
@randolpht , Lets say you have the following:
Now just add the -d msg,broker,topic
before the l
argument to the command arguments to send data to HELK via kafkacat. That should set kafkacat to debug mode. Like this:
kafkacat -b 10.0.10.102:9092 -t winlogbeat -P -d msg,broker,topic -l empire_userland_schtasks_2019-03-19024742.json
Please provide the output of the files that are failing for you. thank you! 👍
I apologize ahead of time as I am not an ELK guy :p
trevor@ubuntu:~$ kafkacat -b 127.0.0.1:9092 -t winlogbeat -P -d msg,broker,topic mordor/small_datasets/windows/discovery/account_discovery_T1087/domain_users/empire_net_user_domain_2019-03-19021158.json %7|1553814554.657|BROKER|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Added new broker with NodeId -1 %7|1553814554.657|TOPIC|rdkafka#producer-0| New local topic: winlogbeat %7|1553814554.657|BRKMAIN|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Enter main broker thread %7|1553814554.657|CONNECT|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: broker in state INIT connecting %7|1553814554.658|CONNECTED|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: connected to localhost:9092 %7|1553814554.658|STATE|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Broker changed state INIT -> UP %7|1553814554.658|SEND|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Sent 1 bufs %7|1553814554.658|SEND|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Sent 0 bufs %7|1553814554.660|BROKER|rdkafka#producer-0| 192.168.220.128:9092/1: Added new broker with NodeId 1 %7|1553814554.660|STATE|rdkafka#producer-0| Topic winlogbeat changed state unknown -> exists %7|1553814554.660|PARTCNT|rdkafka#producer-0| Topic winlogbeat partition count changed from 0 to 1 %7|1553814554.660|BRKDELGT|rdkafka#producer-0| Broker 192.168.220.128:9092/1 is now leader for topic winlogbeat [0] with 0 messages (0 bytes) queued %7|1553814554.660|PARTCNT|rdkafka#producer-0| Partitioning 1 unassigned messages in topic winlogbeat to 1 partitions %7|1553814554.660|UAS|rdkafka#producer-0| 1/1 messages were partitioned in topic winlogbeat %7|1553814554.660|BRKMAIN|rdkafka#producer-0| 192.168.220.128:9092/1: Enter main broker thread %7|1553814554.660|CONNECT|rdkafka#producer-0| 192.168.220.128:9092/1: broker in state INIT connecting %7|1553814554.671|CONNECTED|rdkafka#producer-0| 192.168.220.128:9092/1: connected to 192.168.220.128:9092 %7|1553814554.671|STATE|rdkafka#producer-0| 192.168.220.128:9092/1: Broker changed state INIT -> UP %7|1553814554.673|PRODUCE|rdkafka#producer-0| 192.168.220.128:9092/1: produce messageset with 1 messages (3554789 bytes) %7|1553814554.676|PARTCNT|rdkafka#producer-0| No change in partition count for topic winlogbeat %7|1553814554.676|TOPICUPD|rdkafka#producer-0| No leader change for topic winlogbeat [0] with leader 1 %7|1553814554.678|SEND|rdkafka#producer-0| 192.168.220.128:9092/1: Sent 1 bufs %7|1553814554.678|SEND|rdkafka#producer-0| 192.168.220.128:9092/1: Sent 0 bufs %7|1553814554.679|MSGSET|rdkafka#producer-0| 192.168.220.128:9092/1: MessageSet with 1 message(s) delivered %7|1553814554.679|MSGSET|rdkafka#producer-0| 192.168.220.128:9092/1: MessageSet with 1 message(s) encountered error: Broker: Message size too large % Delivery failed for message: Broker: Message size too large %7|1553814554.679|BROKERFAIL|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: failed: err: Local: Broker handle destroyed: (errno: Interrupted system call) %7|1553814554.679|STATE|rdkafka#producer-0| 127.0.0.1:9092/bootstrap: Broker changed state UP -> DOWN %7|1553814554.679|BROKERFAIL|rdkafka#producer-0| 192.168.220.128:9092/1: failed: err: Local: Broker handle destroyed: (errno: Interrupted system call) %7|1553814554.679|STATE|rdkafka#producer-0| 192.168.220.128:9092/1: Broker changed state UP -> DOWN %7|1553814554.679|BRKTP|rdkafka#producer-0| 192.168.220.128:9092/1: Undelegating winlogbeat [0] %7|1553814554.679|BRKDELGT|rdkafka#producer-0| Broker 192.168.220.128:9092/1 no longer leader for topic winlogbeat [0] %7|1553814554.679|BRKDELGT|rdkafka#producer-0| No broker is leader for topic winlogbeat [0
Also another side note, I have issues when I try to use the -l flag. Looking at the help pages for kafakacat, I do not see a -l flag.
what version of Kafkacat are you running? This is mine:
Usage: kafkacat <options> [file1 file2 .. | topic1 topic2 ..]]
kafkacat - Apache Kafka producer and consumer tool
https://github.com/edenhill/kafkacat
Copyright (c) 2014-2015, Magnus Edenhill
Version 1.3.1 (JSON) (librdkafka 0.11.6 builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins)
General options:
-C | -P | -L | -Q Mode: Consume, Produce, Metadata List, Query mode
-G <group-id> Mode: High-level KafkaConsumer (Kafka 0.9 balanced consumer groups)
Expects a list of topics to subscribe to
-t <topic> Topic to consume from, produce to, or list
-p <partition> Partition
-b <brokers,..> Bootstrap broker(s) (host[:port])
-D <delim> Message delimiter character:
a-z.. | \r | \n | \t | \xNN
Default: \n
-E Do not exit on non fatal error
-K <delim> Key delimiter (same format as -D)
-c <cnt> Limit message count
-X list List available librdkafka configuration properties
-X prop=val Set librdkafka configuration property.
Properties prefixed with "topic." are
applied as topic properties.
-X dump Dump configuration and exit.
-d <dbg1,...> Enable librdkafka debugging:
all,generic,broker,topic,metadata,queue,msg,protocol,cgrp,security,fetch,feature
-q Be quiet (verbosity set to 0)
-v Increase verbosity
-V Print version
-h Print usage help
Producer options:
-z snappy|gzip Message compression. Default: none
-p -1 Use random partitioner
-D <delim> Delimiter to split input into messages
-K <delim> Delimiter to split input key and message
-l Send messages from a file separated by
delimiter, as with stdin.
(only one file allowed)
-T Output sent messages to stdout, acting like tee.
-c <cnt> Exit after producing this number of messages
-Z Send empty messages as NULL messages
file1 file2.. Read messages from files.
With -l, only one file permitted.
Otherwise, the entire file contents will
be sent as one single message.
Consumer options:
-o <offset> Offset to start consuming from:
beginning | end | stored |
<value> (absolute offset) |
-<value> (relative offset from end)
-e Exit successfully when last message received
-f <fmt..> Output formatting string, see below.
Takes precedence over -D and -K.
-J Output with JSON envelope
-D <delim> Delimiter to separate messages on output
-K <delim> Print message keys prefixing the message
with specified delimiter.
-O Print message offset using -K delimiter
-c <cnt> Exit after consuming this number of messages
-Z Print NULL messages and keys as "NULL"(instead of empty)
-u Unbuffered output
Metadata options (-L):
-t <topic> Topic to query (optional)
Query options (-Q):
-t <t>:<p>:<ts> Get offset for topic <t>,
partition <p>, timestamp <ts>.
Timestamp is the number of milliseconds
since epoch UTC.
Requires broker >= 0.10.0.0 and librdkafka >= 0.9.3.
Multiple -t .. are allowed but a partition
must only occur once.
Format string tokens:
%s Message payload
%S Message payload length (or -1 for NULL)
%R Message payload length (or -1 for NULL) serialized
as a binary big endian 32-bit signed integer
%k Message key
%K Message key length (or -1 for NULL)
%T Message timestamp (milliseconds since epoch UTC)
%t Topic
%p Partition
%o Message offset
\n \r \t Newlines, tab
\xXX \xNNN Any ASCII character
Example:
-f 'Topic %t [%p] at offset %o: key %k: %s\n'
Consumer mode (writes messages to stdout):
kafkacat -b <broker> -t <topic> -p <partition>
or:
kafkacat -C -b ...
High-level KafkaConsumer mode:
kafkacat -b <broker> -G <group-id> topic1 top2 ^aregex\d+
Producer mode (reads messages from stdin):
... | kafkacat -b <broker> -t <topic> -p <partition>
or:
kafkacat -P -b ...
Metadata listing:
kafkacat -L -b <broker> [-t <topic>]
Query offset by timestamp:
kafkacat -Q -b broker -t <topic>:<partition>:<timestamp>
so you are not using the -l
flag then? I see you are sending the whole file right?
thats what it is I believe let me try without the -l
flag.
yup thats what it is
robertos-mbp:registry_run_keys_startup_folder_T1060 wardog$ kafkacat -b 10.0.10.102:9092 -t winlogbeat -P -d all empire_userland_schtasks_2019-03-19024742.json
%7|1553815032.355|WAKEUPFD|rdkafka#producer-1| [thrd:app]: 10.0.10.102:9092/bootstrap: Enabled low-latency partition queue wake-ups
%7|1553815032.355|BRKMAIN|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1553815032.355|WAKEUPFD|rdkafka#producer-1| [thrd:app]: 10.0.10.102:9092/bootstrap: Enabled low-latency ops queue wake-ups
%7|1553815032.355|STATE|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Broker changed state INIT -> UP
%7|1553815032.355|BROADCAST|rdkafka#producer-1| [thrd::0/internal]: Broadcasting state change
%7|1553815032.355|BROKER|rdkafka#producer-1| [thrd:app]: 10.0.10.102:9092/bootstrap: Added new broker with NodeId -1
%7|1553815032.355|INIT|rdkafka#producer-1| [thrd:app]: librdkafka v0.11.6 (0xb06ff) rdkafka#producer-1 initialized (builtin.features 0xffff, debug 0xffff)
%7|1553815032.355|BRKMAIN|rdkafka#producer-1| [thrd:10.0.10.102:9092/bootstrap]: 10.0.10.102:9092/bootstrap: Enter main broker thread
%7|1553815032.355|CONNECT|rdkafka#producer-1| [thrd:10.0.10.102:9092/bootstrap]: 10.0.10.102:9092/bootstrap: broker in state INIT connecting
%7|1553815032.355|TOPIC|rdkafka#producer-1| [thrd:app]: New local topic: winlogbeat
%7|1553815032.355|TOPPARNEW|rdkafka#producer-1| [thrd:app]: NEW winlogbeat [-1] 0x7fcc80700c90 (at rd_kafka_topic_new0:362)
%7|1553815032.355|METADATA|rdkafka#producer-1| [thrd:app]: Skipping metadata refresh of 1 topic(s): no usable brokers
% ERROR: Failed to produce message (4916235 bytes): Broker: Message size too large
robertos-mbp:registry_run_keys_startup_folder_T1060 wardog$
how did you install kafkacat?
apt-get install kafkacat
Below is my version
Usage: kafkacat
General options:
-C | -P | -L Mode: Consume, Produce or metadata List
-t
Producer options:
-z snappy|gzip Message compression. Default: none
-p -1 Use random partitioner
-D
Consumer options:
-o
That looks like an old version to me since mine is Version 1.3.1 (JSON) (librdkafka 0.11.6
. I wonder if you can install the latest version. or compile it. let me know
I can see what the problem is also from the versions perspective. What Debian distro are you using?
So ubuntu xenial has an old version:
https://packages.ubuntu.com/xenial/kafkacat
Bionic has the latest one:
@randolpht I found what you can do in a Xenial system or older versions where Kafkacat by default is very old:
Remove current Kafcakat
sudo apt -y remove kafkacat
Download latest Debian release from github:
wget https://github.com/edenhill/kafkacat/archive/debian/1.3.1-1.tar.gz
Decompress it
tar -xzvf 1.3.1-1.tar.gz
change directory location
cd kafkacat-debian-1.3.1-1/
I needed to install the following before running ./bootstrap.sh
sudo apt-get install librdkafka-dev libyajl-dev build-essential
Now you can run ./bootstrap.sh
Once it is done, make sure you copy the new kafkacat
binary from your current folder to /usr/local/bin/
sudo cp kafkacat /usr/local/bin/
Then you do kafkacat -h
and you should get the following:
helk@ubuntu:~/kafkacat-debian-1.3.1-1$ kafkacat -h
Usage: kafkacat <options> [file1 file2 .. | topic1 topic2 ..]]
kafkacat - Apache Kafka producer and consumer tool
https://github.com/edenhill/kafkacat
Copyright (c) 2014-2015, Magnus Edenhill
Version 1.3.1 (librdkafka 1.0.0 builtin.features=snappy,sasl,regex,lz4,sasl_plain,plugins)
General options:
-C | -P | -L | -Q Mode: Consume, Produce, Metadata List, Query mode
-G <group-id> Mode: High-level KafkaConsumer (Kafka 0.9 balanced consumer groups)
Expects a list of topics to subscribe to
-t <topic> Topic to consume from, produce to, or list
-p <partition> Partition
-b <brokers,..> Bootstrap broker(s) (host[:port])
-D <delim> Message delimiter character:
a-z.. | \r | \n | \t | \xNN
Default: \n
-E Do not exit on non fatal error
-K <delim> Key delimiter (same format as -D)
-c <cnt> Limit message count
-X list List available librdkafka configuration properties
-X prop=val Set librdkafka configuration property.
Properties prefixed with "topic." are
applied as topic properties.
-X dump Dump configuration and exit.
-d <dbg1,...> Enable librdkafka debugging:
all,generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos
-q Be quiet (verbosity set to 0)
-v Increase verbosity
-V Print version
-h Print usage help
Producer options:
-z snappy|gzip Message compression. Default: none
-p -1 Use random partitioner
-D <delim> Delimiter to split input into messages
-K <delim> Delimiter to split input key and message
-l Send messages from a file separated by
delimiter, as with stdin.
(only one file allowed)
-T Output sent messages to stdout, acting like tee.
-c <cnt> Exit after producing this number of messages
-Z Send empty messages as NULL messages
file1 file2.. Read messages from files.
With -l, only one file permitted.
Otherwise, the entire file contents will
be sent as one single message.
Consumer options:
-o <offset> Offset to start consuming from:
beginning | end | stored |
<value> (absolute offset) |
-<value> (relative offset from end)
-e Exit successfully when last message received
-f <fmt..> Output formatting string, see below.
Takes precedence over -D and -K.
-D <delim> Delimiter to separate messages on output
-K <delim> Print message keys prefixing the message
with specified delimiter.
-O Print message offset using -K delimiter
-c <cnt> Exit after consuming this number of messages
-Z Print NULL messages and keys as "NULL"(instead of empty)
-u Unbuffered output
Metadata options (-L):
-t <topic> Topic to query (optional)
Query options (-Q):
-t <t>:<p>:<ts> Get offset for topic <t>,
partition <p>, timestamp <ts>.
Timestamp is the number of milliseconds
since epoch UTC.
Requires broker >= 0.10.0.0 and librdkafka >= 0.9.3.
Multiple -t .. are allowed but a partition
must only occur once.
Format string tokens:
%s Message payload
%S Message payload length (or -1 for NULL)
%R Message payload length (or -1 for NULL) serialized
as a binary big endian 32-bit signed integer
%k Message key
%K Message key length (or -1 for NULL)
%T Message timestamp (milliseconds since epoch UTC)
%t Topic
%p Partition
%o Message offset
\n \r \t Newlines, tab
\xXX \xNNN Any ASCII character
Example:
-f 'Topic %t [%p] at offset %o: key %k: %s\n'
Consumer mode (writes messages to stdout):
kafkacat -b <broker> -t <topic> -p <partition>
or:
kafkacat -C -b ...
High-level KafkaConsumer mode:
kafkacat -b <broker> -G <group-id> topic1 top2 ^aregex\d+
Producer mode (reads messages from stdin):
... | kafkacat -b <broker> -t <topic> -p <partition>
or:
kafkacat -P -b ...
Metadata listing:
kafkacat -L -b <broker> [-t <topic>]
Query offset by timestamp:
kafkacat -Q -b broker -t <topic>:<partition>:<timestamp>
Now in my xenial where I had the default old version, I can do the following:
helk@ubuntu:~$ kafkacat -b 192.168.64.138:9092 -t winlogbeat -P -l empire_userland_registry_2019-03-19023812.json
helk@ubuntu:~$
no error messages about -l
anymore 👍
If for some reason your librdkafka
shows an older version you can update it with confluent
deb repo like this:
sudo apt-get install software-properties-common
sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.1 stable main"
sudo apt update
sudo apt upgrade
That should update it to latest one as far as I know. Please try the solution above and see if you get the l
flag to show.
I believe I can also recommend to install kafkacat
on debian systems such as Ubuntu but with minimal suite Bionic (18.04) to avoid those issues. Thats a Kafkacat thing. I have mine in my mac and I installed it via Brew per Kafkacat install instructions. That gave me the latest version.
Just to recap:
-l
flag was not available-l
flag is available and ready to be used to send contents of the mordor file to a HELK kafka broker@randolpht please let me know if that fixes your issue please.
@Cyb3rWard0g
Thanks for the assistance, I went the route of just updating to the newest version of Bionic as this was just a throw away VM and am having no issues anymore. I appreciate the time you put in to tracking down the issue and hopefully it helps others who run into the issues with older builds.
Thank you very much for validating it was the Ubuntu version :) . I updated the docs because of this. Thank you for reporting it and sharing your experience with it! I hope you enjoy the project! 👍
I updated this section 👍 https://mordor.readthedocs.io/en/latest/consume_mordor.html#consume-logs thank you again!
Seeing the error "% ERROR: Failed to produce message (62261687 bytes): Broker: Message size too large" for various data-sets while trying to send the data to HELK.