Closed tschohanna closed 3 years ago
Yes this functionality seems to be broken, let me elaborate on that: 1) in the manual it is suggested that kafkacat should pulish to the mordor topic 2) Logstash Kafka input: topics => ["winlogbeat","winevent","SYSMON_JOIN","filebeat"] the mordor topic is not present 3) There is a mordor logstash pipeline input here (port 3515): https://github.com/Cyb3rWard0g/HELK/tree/master/docker/helk-logstash/mordor_pipeline which then sends to Kafka topic: winevent
So we should try that: send the json file to that 3515 port and see what happens.
I agree though there must be a clear path via kafka, let me think about how can we resolve that.
By the way I am also wondering whether they have used NXLog: https://nxlog.co/products/nxlog-community-edition for doing the windows log collection this could have some implications on the fields structure.
I also found another confusing instructions in Mordor-Elastic. There is a logstash output option:
!python3 Mordor-Elastic.py --no-index-creation --output logstash --url logstash-ip:3515 events.tar.gz
Inside the code:
r = requests.post(logstash_url, json=event, verify=verify_certs)
which performs an http request but we need to perform a plain TCP request as per my previous notes.
I believe the logstash output is currently broken (the elasticsearch one on the other side I was able to perform). I will attempt to write a new script.
I found a way will post the instructions here shortly!
Okay I sang victor too early. I did manage to send the mordor json files to the logstash pipeline that then is fed into Kafka. Code looks like this:
import socket
import sys
HOST = "helk-logstash"
PORT = 3515
def get_constants(prefix):
"""Create a dictionary mapping socket module
constants to their names.
"""
return {
getattr(socket, n): n
for n in dir(socket)
if n.startswith(prefix)
}
families = get_constants('AF_')
types = get_constants('SOCK_')
protocols = get_constants('IPPROTO_')
# Create a TCP/IP socket
sock = socket.create_connection((HOST, PORT))
print('Family :', families[sock.family])
print('Type :', types[sock.type])
print('Protocol:', protocols[sock.proto])
print()
try:
with open('test_log.json','r') as file:
json_lines = file.read()
tot_lines = json_lines.count( "\n" ) + 1
print('Loading ... %d events' % tot_lines)
sock.sendall(json_lines.encode())
except socket.timeout as e:
print(e)
except socket.error as e:
# Something else happened, handle error, exit, etc.
print(e)
finally:
print('closing socket')
sock.close()
However the events get dropped because the mordoer json files are just EVTX converted to JSON. Basically that pipeline has the purpose to process NXLOG formatted events.
Currently I do not see a way to send the JSON events to the correct processing pipeline simply because there is no pipeline to process them.
What I ended up doing is to load directly into ES index with this command:
!python3 Mordor-Elastic.py --no-index-creation --output elasticsearch --url helk-elasticsearch events.tar.gz
which saves the documents inside winlogbeat-mordor.
yes I think the functionality may be an issue - @Cyb3rWard0g any idea? Sorry I hadn't really messed w/ all the mordor pipeline since it got split into logstash "pipelines". But let me know how I can help.
@priamai @tschohanna if you output to port 8531 tcp does that change anything? https://github.com/Cyb3rWard0g/HELK/blob/master/docker/helk-logstash/pipeline/0005-nxlog-winevent-syslog-tcp-input.conf
nxlog or not - HELK handles winlogbeat and nxlog.
Testing your suggestion now. Nevertheless the major problem we are facing is the lack of the original .EVTX files that would allow a full replay via winbeat and trigger the full Logstash+Kafka pipeline. Yes the size will be bigger but it will allow a full end to end testing.
@neu5ron yes I have pushed the events into the 8531 logstash input but they endup in the indexme index. I also visually inspected the logstash pipeline via Kibana and can see that :
By the way the pipeline is a massive if-then-else switch and is very hard to debug when the parse failure happened. I am also not familiar with NXLOG format and I don't know whether there are some extra field which we need to artificially add to the JSON files?
OK, this should be easy to fix.
Well it's parsing so yeah its a lot.. but each and every single filter it hits is added as a tag so you can see exactly where everything hits. If you have suggestions would appreciate them.
Good point on the tags this is what I see:
"etl_pipeline": [
"all-filter-0098",
"all-add_processed_timestamp",
"fingerprint-0099-002",
"json-0301-001"
],
Therefore I am guessing this is the last stage the event pass through: https://github.com/Cyb3rWard0g/HELK/blob/master/docker/helk-logstash/pipeline/0301-nxlog-winevent-to-json-filter.conf
For the logstash structure I found this structure better to maintain:
https://github.com/enotspe/fortinet-2-elasticsearch/blob/master/logstash/10-input_syslog_fortinet
they define a pattern in each configuration file input-filter-output with the pipeline construct. Will be quite a big task to convert the existing one to that format for sure.
@priamai try now w/ updated branch. there was incorrect removal of type
that then knocked most of flow off.
thanks for the suggestion, those sort of flow for LS work for smaller pipelines. Honestly, the best way is probably multi pipeline - but that doesn't work well at scale NOR does it work great when tons of pipeline's share the same code (like geo enrich). It reduces performance a lot. Not really a perfect answer either way, all a trade off - but I am open to other suggestions
re-open if still an issue, thanks!
Describe the problem
When trying to ingest data from the mordor dataset into HELK with kafkacat, all the data goes into the indexme- index pattern and not into the actual logs- index pattern.
Provide the output of the following commands
What version of HELK are you using
What steps did you take trying to fix the issue
I tried to ingest the data with the winevent topic, because the winlogbeat topic did not work, but the results were the same. I do not have a lot of knowledge about Kafka and Logstash, where I assume the issue is, but I tried to analyse the Logstash configurations and found nothing that I could do to fix the issue.
How could we replicate the issue
Install HELK with option 3 of the install script. Download the mordor dataset. Install kafkacat. Try to ingest the data in the two following ways: