angolo40 / mikrocata2selks

Mikrotik + Selks (Suricata) + Telegram + TZSP on Debian 12
GNU General Public License v3.0
51 stars 12 forks source link

Mikrotik Firewall adding stops daily at same time #9

Closed mk31999 closed 5 months ago

mk31999 commented 5 months ago

Hi There, thanks for all your work with this. Works brilliantly

Having an issue with the Firewall address adding (auto blocking) stops at roughly the same time everyday. All services are still running, selks still showing alerts, however not being communicated through to Mikrotik

Can confirm API user still showing logged in in Mikrotik log.

Only solution I can find is restarting Debian. Then starts working again instantly.

Things I noticed

My mikrocataTZSP0.py config mikrocataTZSP0.txt

angolo40 commented 5 months ago

Hi, assuming that packets are coming correcty to interface tzsp0 (check with tcpdump -i tzsp0) i think that the mikrocataTZSP0.service crashes for some reason or is not able to intercept correct line to insert in the firewall. Try to stop mikrocata service with: systemctl stop mikrocataTZSP0.service and then run: python3 /usr/local/bin/mikrocataTZSP0.py keep it running until some error, maybe could tell us more about the problem.

mk31999 commented 5 months ago

Hi, Thanks for your reply

Running this now. Will report back once I have an output

Cheers

mk31999 commented 5 months ago

Hi, looks like something definitely crashes. Weird that its at the same time everyday.

Let me know what you think. Selks is all still running. tcpdump -i tzsp0 shows packets still.

python3 /usr/local/bin/mikrocataTZSP0.py pass severity: 3 pass severity: 3 pass severity: 3 pass severity: 3 pass severity: 3 pass severity: 3 pass severity: 3 pass severity: 3 pass severity: 3 Traceback (most recent call last): File "/usr/local/bin/mikrocataTZSP0.py", line 412, in main() File "/usr/local/bin/mikrocataTZSP0.py", line 395, in main notifier.loop() File "/usr/lib/python3/dist-packages/pyinotify.py", line 1376, in loop self.process_events() File "/usr/lib/python3/dist-packages/pyinotify.py", line 1275, in process_events self._default_proc_fun(revent) File "/usr/lib/python3/dist-packages/pyinotify.py", line 910, in call return _ProcessEvent.call(self, event) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/lib/python3/dist-packages/pyinotify.py", line 630, in call return meth(event) ^^^^^^^^^^^ File "/usr/local/bin/mikrocataTZSP0.py", line 82, in process_IN_MODIFY check_truncated(FILEPATH) File "/usr/local/bin/mikrocataTZSP0.py", line 89, in check_truncated if last_pos > os.path.getsize(fpath): ^^^^^^^^^^^^^^^^^^^^^^ File "", line 50, in getsize FileNotFoundError: [Errno 2] No such file or directory: '/root/SELKS/docker/containers-data/suricata/logs/eve.json'

angolo40 commented 5 months ago

Interesting, it seems that the issue is when attempting to access to Suricata eve.json that gets rotated or temporarily removed, causing daily errors at the same time. The script fails to check for file existence before reading, leading to errors when the file is absent due to rotation or deletion processes. I can try tweaking the function to fix this, but I can't test it myself since I'm not encountering this error.

Edit the check_truncated(fpath) function in /usr/local/bin/mikrocataTZSP0.py

def check_truncated(fpath):
  global last_pos

  if not os.path.exists(fpath):
      print(f"[Mikrocata] File: {fpath} not found during check_truncated. Assuming it might be recreated soon.")
      last_pos = 0
      return

  if last_pos > os.path.getsize(fpath):
      print("[Mikrocata] File seems to be truncated. Resetting last_pos.")
      last_pos = 0

Let me know.

mk31999 commented 5 months ago

Thanks Angolo,

I have amended the script and rebooted debian, Will update in a few hours.

Thanks for your help

mk31999 commented 5 months ago

Hi, Confirmed this has changed the output however same issue is still occurring

See below, have attached my updated mikrocataTZSP0.py file mikrocataTZSP0.txt

[Mikrocata] File: /root/SELKS/docker/containers-data/suricata/logs/eve.json not found. Retrying in 10 seconds.. [Mikrocata] File: /root/SELKS/docker/containers-data/suricata/logs/eve.json not found. Retrying in 10 seconds.. [Mikrocata] File: /root/SELKS/docker/containers-data/suricata/logs/eve.json not found. Retrying in 10 seconds.. [Mikrocata] File: /root/SELKS/docker/containers-data/suricata/logs/eve.json not found. Retrying in 10 seconds.. [Mikrocata] File seems to be truncated. Resetting last_pos.

I will rebuild the VM with all default settings and see if I am getting the same error

angolo40 commented 5 months ago

It seems that at a certain time of the day, the eve.json file gets moved, and Suricata is unable to recreate it. This issue is definitely related to SELKS, specifically to the Suricata container. With the modification of the check_truncated(fpath) function, we've ensured that the service remains active even in the absence of the eve.json file, which is what you're experiencing.

What happens after the last line of your log? Does the service crash, or does it continue to search for the eve.json file?

[Mikrocata] File: /root/SELKS/docker/containers-data/suricata/logs/eve.json not found. Retrying in 10 seconds..
[Mikrocata] File: /root/SELKS/docker/containers-data/suricata/logs/eve.json not found. Retrying in 10 seconds..
[Mikrocata] File seems to be truncated. Resetting last_pos.

If it doesn't crash, please try restarting just the Suricata container to see if that resolves the issue instead to reboot the whole machine. You can restart the container with the command: docker restart suricata

Check for /root/SELKS/docker/containers-data/suricata/logs/eve.json and if the file is recreated and everything starts working again, it suggests that for some reason, Suricata is unable to recreate the file quickly or due to some other issue. This could be related to the performance of the machine.

mk31999 commented 5 months ago

After a full vm rebuild with all default settings (besides IP setting etc) The issue is still occurring. So unlikely to be part of the config that i've changed. VM here is running on latest ESXI, 16gb ram, 100gb storage, 12 core cpu. So should have more then enough performance to handle,

As a test I have disabled VM snapshot backups incase somehow that affects things (exactly 12 hours later)

In reply to your question, nothing more comes after the output [Mikrocata] File seems to be truncated. Resetting last_pos. Mikrocata stops communicating to the Mikrotik, however selks is still receiving alerts as normal.

I will test Suricata and look into what happens to the eve.json at 3pm today, will also try rebooting all docker containers.

Will update shortly

Thanks

mk31999 commented 5 months ago

Updating this,

Restarting all docker containers does not fix the issue.

It seems eve.json

  1. Has the wrong timestamp,
  2. Which is truncated at timestamped 2am (my 3pm) (which is crashing mikrocata)
  3. Is 7,000,000 lines (probably normal)

For whatever reason, mikrocata cannot locate eve.json after this process. Looking at tail of truncated file, vs head of eve.json the difference of the timestamp is below tail eve.json1 {"timestamp":"2024-02-20T02:01:19.249783+0000

head eve.json {"timestamp":"2024-02-20T02:01:20.178815+0000"

There seems to be a delay of 1 sec which is enough to break things. However even after restarting the mikrocata service it is still unable to process new eve.json

Let me know what you think

mk31999 commented 5 months ago

Actually looks like restarting the mikrocataTZSP0.service does now fix the issue.

angolo40 commented 5 months ago

Unfortunately, I was unable to replicate the error you described. As a step in troubleshooting, I set up a new Debian 12 machine and went through the reinstallation process, but the issue did not occur for me.

Based on the SELKS documentation regarding Suricata's log rotation ( https://github.com/StamusNetworks/SELKS/wiki/Logrotate ), I suggest trying to manually force a log rotation to see if that helps isolate the issue. This approach might offer more immediate insights compared to waiting for the automatic daily rotation. You can do this by accessing the Suricata container and executing the log rotation command for eve.json.

Here’s how:

docker exec -it suricata bash
logrotate --force /etc/logrotate.d/suricata

This command grants you access to the Suricata container, allowing you to manually initiate the log rotation process for eve.json.

I'm thinking to improve the python script to manage this kind of situation Could you also confirm that restarting (only and nothing else more) the mikrocataTZSP0.service resolves the issue?

Let me know 👍

angolo40 commented 5 months ago

Ok, i'm able to reproduce the error...

Please change the class EventHandler(pyinotify.ProcessEvent) like this:

class EventHandler(pyinotify.ProcessEvent):
    def process_IN_MODIFY(self, event):
        if event.pathname == FILEPATH:
            try:
                add_to_tik(read_json(FILEPATH))
            except ConnectionError:
                connect_to_tik()

    def process_IN_CREATE(self, event):
        if event.pathname == FILEPATH:
            print(f"[Mikrocata] New eve.json detected. Resetting last_pos.")
            global last_pos
            last_pos = 0
            self.process_IN_MODIFY(event)

    def process_IN_DELETE(self, event):
        if event.pathname == FILEPATH:
            print(f"[Mikrocata] eve.json deleted. Monitoring for new file.")

and also main():

def main():
    seek_to_end(FILEPATH)
    connect_to_tik()
    read_ignore_list(IGNORE_LIST_LOCATION)
    os.makedirs(os.path.dirname(SAVE_LISTS_LOCATION), exist_ok=True)
    os.makedirs(os.path.dirname(UPTIME_BOOKMARK), exist_ok=True)

    directory_to_monitor = os.path.dirname(FILEPATH)

    wm = pyinotify.WatchManager()
    handler = EventHandler()
    notifier = pyinotify.Notifier(wm, handler)
    wm.add_watch(directory_to_monitor, pyinotify.IN_CREATE | pyinotify.IN_MODIFY | pyinotify.IN_DELETE, rec=False)

    while True:
        try:
            notifier.loop()

        except (librouteros.exceptions.ConnectionClosed, socket.timeout) as e:
            print(f"[Mikrocata] (4) {str(e)}")
            connect_to_tik()
            continue

        except librouteros.exceptions.TrapError as e:
            print(f"[Mikrocata] (8) librouteros.TrapError: {str(e)}")
            continue

        except KeyError as e:
            print(f"[Mikrocata] (8) KeyError: {str(e)}")
            continue

the new .py should look like this:

#!/usr/bin/env python3

import ssl
import os
import socket
import re
from time import sleep
from datetime import datetime as dt
import pyinotify
import ujson
import json
import librouteros
from librouteros import connect
from librouteros.query import Key
import requests

# ------------------------------------------------------------------------------
################# START EDIT SETTINGS

#Set Mikrotik login information
USERNAME = "mikrocata2selks"
PASSWORD = "XXXXXXXXXXx"
ROUTER_IP = "10.0.10.1"
TIMEOUT = "1d"
PORT = 8729  # api-ssl port
BLOCK_LIST_NAME = "Suricata"

#Set Telegram information
enable_telegram = False
TELEGRAM_TOKEN = "TOKEN"
TELEGRAM_CHATID = "CHATID"

# You can add your WAN IP, so it doesn't get mistakenly blocked (don't leave empty string)
WAN_IP = "192.168.1.26"
LOCAL_IP_PREFIX = "10.0."
WHITELIST_IPS = (WAN_IP, LOCAL_IP_PREFIX, "127.0.0.1", "1.1.1.1", "8.8.8.8")
COMMENT_TIME_FORMAT = "%-d %b %Y %H:%M:%S.%f"  # See datetime strftime formats.

#Set comma separated value of suricata alerts severity which will be blocked in Mikrotik. All severity values are ("1","2","3")
SEVERITY=("1","2")

################# END EDIT SETTINGS
# ------------------------------------------------------------------------------
LISTEN_INTERFACE=("tzsp0")

# Suricata log file
SELKS_CONTAINER_DATA_SURICATA_LOG="/root/SELKS/docker/containers-data/suricata/logs/"
FILEPATH = os.path.abspath(SELKS_CONTAINER_DATA_SURICATA_LOG + "eve.json")

# Save Mikrotik address lists to a file and reload them on Mikrotik reboot.
# You can add additional list(s), e.g. [BLOCK_LIST_NAME, "blocklist1", "list2"]
SAVE_LISTS = [BLOCK_LIST_NAME]

# (!) Make sure you have privileges (!)
SAVE_LISTS_LOCATION = os.path.abspath("/var/lib/mikrocata/savelists-tzsp0.json")

# Location for Mikrotik's uptime. (needed for re-adding lists after reboot)
UPTIME_BOOKMARK = os.path.abspath("/var/lib/mikrocata/uptime-tzsp0.bookmark")

# Ignored rules file location - check ignore.conf for syntax.
IGNORE_LIST_LOCATION = os.path.abspath("/var/lib/mikrocata/ignore-tzsp0.conf")

# Add all alerts from alerts.json on start?
# Setting this to True will start reading alerts.json from beginning
# and will add whole file to firewall when pyinotify is triggered.
# Just for testing purposes, i.e. not good for systemd service.
ADD_ON_START = False

# global vars
last_pos = 0
api = None
ignore_list = []

class EventHandler(pyinotify.ProcessEvent):
    def process_IN_MODIFY(self, event):
        if event.pathname == FILEPATH:
            try:
                add_to_tik(read_json(FILEPATH))
            except ConnectionError:
                connect_to_tik()

    def process_IN_CREATE(self, event):
        if event.pathname == FILEPATH:
            print(f"[Mikrocata] New eve.json detected. Resetting last_pos.")
            global last_pos
            last_pos = 0
            self.process_IN_MODIFY(event)

    def process_IN_DELETE(self, event):
        if event.pathname == FILEPATH:
            print(f"[Mikrocata] eve.json deleted. Monitoring for new file.")

def check_truncated(fpath):
  global last_pos
  print(os.path.getsize(fpath))
  if not os.path.exists(fpath):
      print(f"[Mikrocata] File: {fpath} not found during check_truncated. Assuming it might be recreated soon.")
      last_pos = 0
      return

  if last_pos > os.path.getsize(fpath):
      print("[Mikrocata] File seems to be truncated. Resetting last_pos.")
      last_pos = 0

def seek_to_end(fpath):
    global last_pos

    if not ADD_ON_START:
        while True:
            try:
                last_pos = os.path.getsize(fpath)
                return

            except FileNotFoundError:
                print(f"[Mikrocata] File: {fpath} not found. Retrying in 10 seconds..")
                sleep(10)
                continue

def read_json(fpath):
    global last_pos
    while True:
        try:
            with open(fpath, "r") as f:
                f.seek(last_pos)
                alerts = []
                for line in f.readlines():
                    try:
                        alert = json.loads(line)
                        if alert.get('event_type') == 'alert':
                            alerts.append(json.loads(line))
                        else:
                            last_pos = f.tell()
                            continue
                    except:
                        continue
                last_pos = f.tell()
                return alerts
        except FileNotFoundError:
            print(f"[Mikrocata] File: {fpath} not found. Retrying in 10 seconds..")
            sleep(10)
            continue

def add_to_tik(alerts):
    global last_pos
    global api

    _address = Key("address")
    _id = Key(".id")
    _list = Key("list")

    address_list = api.path("/ip/firewall/address-list")
    resources = api.path("system/resource")

    # Remove duplicate src_ips.
    for event in {item['src_ip']: item for item in alerts}.values():

        if str(event["alert"]["severity"]) not in SEVERITY:
            print("pass severity: " + str(event["alert"]["severity"]))
            break

        if str(event["in_iface"]) not in LISTEN_INTERFACE:
            break

        if not in_ignore_list(ignore_list, event):
            timestamp = dt.strptime(event["timestamp"],
                                    "%Y-%m-%dT%H:%M:%S.%f%z").strftime(
                                        COMMENT_TIME_FORMAT)

            if event["src_ip"].startswith(WHITELIST_IPS):
                if event["dest_ip"].startswith(WHITELIST_IPS):
                    continue

                wanted_ip, wanted_port = event["dest_ip"], event.get("src_port")
                src_ip, src_port = event["src_ip"], event.get("dest_port")

            else:
                wanted_ip, wanted_port = event["src_ip"], event.get("dest_port")
                src_ip, src_port = event["dest_ip"], event.get("src_port")

            try:
                cmnt=f"""[{event['alert']['gid']}:{
                                 event['alert']['signature_id']}] {
                                 event['alert']['signature']} ::: Port: {
                                 wanted_port}/{
                                 event['proto']} ::: timestamp: {
                                 timestamp}"""

                address_list.add(list=BLOCK_LIST_NAME,
                                 address=wanted_ip,
                                 comment=cmnt,
                                 timeout=TIMEOUT)

                print(f"[Mikrocata] new ip added: {cmnt}")
                if enable_telegram == True:
                    print(requests.get(sendTelegram("From: " + wanted_ip + "\nTo: " + src_ip + ":" + wanted_port + "\nRule: " + cmnt)).json())

            except librouteros.exceptions.TrapError as e:
                if "failure: already have such entry" in str(e):
                    for row in address_list.select(_id, _list, _address).where(
                            _address == wanted_ip,
                            _list == BLOCK_LIST_NAME):
                        address_list.remove(row[".id"])

                    address_list.add(list=BLOCK_LIST_NAME,
                                     address=wanted_ip,
                                     comment=f"""[{event['alert']['gid']}:{
                                     event['alert']['signature_id']}] {
                                     event['alert']['signature']} ::: Port: {
                                     wanted_port}/{
                                     event['proto']} ::: timestamp: {
                                     timestamp}""",
                                     timeout=TIMEOUT)

                else:
                    raise

            except socket.timeout:
                connect_to_tik()

    # If router has been rebooted add saved list(s), then save lists to a file.
    if check_tik_uptime(resources):
        add_saved_lists(address_list)

    save_lists(address_list)

def check_tik_uptime(resources):

    for row in resources:
        uptime = row["uptime"]

    if "w" in uptime:
        weeks = int(re.search(r"(\A|\D)(\d*)w", uptime).group(2))
    else:
        weeks = 0

    if "d" in uptime:
        days = int(re.search(r"(\A|\D)(\d*)d", uptime).group(2))
    else:
        days = 0

    if "h" in uptime:
        hours = int(re.search(r"(\A|\D)(\d*)h", uptime).group(2))
    else:
        hours = 0

    if "m" in uptime:
        minutes = int(re.search(r"(\A|\D)(\d*)m", uptime).group(2))
    else:
        minutes = 0

    if "s" in uptime:
        seconds = int(re.search(r"(\A|\D)(\d*)s", uptime).group(2))
    else:
        seconds = 0

    total_seconds = (weeks*7*24 + days*24 + hours)*3600 + minutes*60 + seconds

    if total_seconds < 900:
        total_seconds = 900

    with open(UPTIME_BOOKMARK, "r") as f:
        try:
            bookmark = int(f.read())
        except ValueError:
            bookmark = 0

    with open(UPTIME_BOOKMARK, "w+") as f:
        f.write(str(total_seconds))

    if total_seconds < bookmark:
        return True

    return False

def connect_to_tik():
    global api
    ctx = ssl.create_default_context()
    ctx.check_hostname = False
    ctx.set_ciphers('ADH:@SECLEVEL=0')

    while True:
        try:
            api = connect(username=USERNAME, password=PASSWORD, host=ROUTER_IP,
                          ssl_wrapper=ctx.wrap_socket, port=PORT)
            print(f"[Mikrocata] Connected")
            break

        except librouteros.exceptions.TrapError as e:
            if "invalid user name or password" in str(e):
                print("[Mikrocata] Invalid username or password.")
                sleep(10)
                continue

            raise

        except socket.timeout as e:
            print(f"[Mikrocata] Socket timeout: {str(e)}.")
            sleep(30)
            continue

        except ConnectionRefusedError:
            print("[Mikrocata] Connection refused. (api-ssl disabled in router?)")
            sleep(10)
            continue

        except OSError as e:
            if e.errno == 113:
                print("[Mikrocata] No route to host. Retrying in 10 seconds..")
                sleep(10)
                continue

            if e.errno == 101:
                print("[Mikrocata] Network is unreachable. Retrying in 10 seconds..")
                sleep(10)
                continue

            raise

def sendTelegram(message):

    sleep(2)

    telegram_url = "https://api.telegram.org/bot" + TELEGRAM_TOKEN + "/sendMessage?chat_id=" + TELEGRAM_CHATID + "&text=" + message + "&disable_web_page_preview=true&parse_mode=html"

    return telegram_url

def save_lists(address_list):
    _address = Key("address")
    _list = Key("list")
    _timeout = Key("timeout")
    _comment = Key("comment")

    with open(SAVE_LISTS_LOCATION, "w") as f:
        for save_list in SAVE_LISTS:
            for row in address_list.select(_list, _address, _timeout,
                                           _comment).where(_list == save_list):
                f.write(ujson.dumps(row) + "\n")

def add_saved_lists(address_list):
    with open(SAVE_LISTS_LOCATION, "r") as f:
        addresses = [ujson.loads(line) for line in f.readlines()]

    for row in addresses:
        cmnt = row.get("comment")
        if cmnt is None:
            cmnt = ""
        try:
            address_list.add(list=row["list"], address=row["address"],
                             comment=cmnt, timeout=row["timeout"])

        except librouteros.exceptions.TrapError as e:
            if "failure: already have such entry" in str(e):
                continue

            raise

def read_ignore_list(fpath):
    global ignore_list

    try:
        with open(fpath, "r") as f:

            for line in f:
                line = line.partition("#")[0].strip()

                if line.strip():
                    ignore_list.append(line)

    except FileNotFoundError:
        print(f"[Mikrocata] File: {IGNORE_LIST_LOCATION} not found. Continuing..")

def in_ignore_list(ignr_list, event):
    for entry in ignr_list:
        if entry.isdigit() and int(entry) == int(event['alert']['signature_id']):
            sleep(1)
            return True

        if entry.startswith("re:"):
            entry = entry.partition("re:")[2].strip()

            if re.search(entry, event['alert']['signature']):
                sleep(1)
                return True

    return False

def main():
    seek_to_end(FILEPATH)
    connect_to_tik()
    read_ignore_list(IGNORE_LIST_LOCATION)
    os.makedirs(os.path.dirname(SAVE_LISTS_LOCATION), exist_ok=True)
    os.makedirs(os.path.dirname(UPTIME_BOOKMARK), exist_ok=True)

    directory_to_monitor = os.path.dirname(FILEPATH)

    wm = pyinotify.WatchManager()
    handler = EventHandler()
    notifier = pyinotify.Notifier(wm, handler)
    wm.add_watch(directory_to_monitor, pyinotify.IN_CREATE | pyinotify.IN_MODIFY | pyinotify.IN_DELETE, rec=False)

    while True:
        try:
            notifier.loop()

        except (librouteros.exceptions.ConnectionClosed, socket.timeout) as e:
            print(f"[Mikrocata] (4) {str(e)}")
            connect_to_tik()
            continue

        except librouteros.exceptions.TrapError as e:
            print(f"[Mikrocata] (8) librouteros.TrapError: {str(e)}")
            continue

        except KeyError as e:
            print(f"[Mikrocata] (8) KeyError: {str(e)}")
            continue

if __name__ == "__main__":
    main()

Let me know

mk31999 commented 5 months ago

Thanks, I have amended .py with these changes,

Will update later today

Thanks for your help

mk31999 commented 5 months ago

Looks like thats done it, confirming everything has been running happy for over 24 hours.

What do you think caused this? A suricata/selks update? Or something specific to my config?

Thanks again for your help, constantly being scanned by botnets so this is definitely helping block out the known ones.

angolo40 commented 5 months ago

Hello,

Glad to hear it's working now :)

The problem was with how pyinotify was handling the eve.json file. For some reason, it wasn't picking up changes after the logs had been rotated. Honestly, I'm not sure why it wasn't working as expected because, in theory, the script should have been fine. I ended up having to find another way to monitor these daily events.

PLS don't forget to give the project a star :+1:

Cheers