TheDr1ver / ledhntr-suite-public

LEDHNTR, Plugins, LEDMGMT, and Dockerization configs
Apache License 2.0
0 stars 0 forks source link

[PLUGINS] - Change all parsing methods to use jsonpath-ng #3

Open TheDr1ver opened 2 months ago

TheDr1ver commented 2 months ago

The parsing methods on these things is a nightmare. Using jsonpath-ng makes it significantly easier to identify which key/value pairs should be transformed into LEDHNTR Things.

This is a decent start that needs to be integrated into hntr.py, then the parsers can all be replaced with a simple dictionary instead of 400+ lines of if/then/conversions.

import json
from jsonpath_ng import jsonpath, parse
from ledhntr.data_classes import Attribute, Entity, Relation

# This part would go in hntr.py

def process_parsing_rules(data, rules):
    def parse_attributes(data, rules):
        attributes = []
        for rule in rules:
            jsonpath_expr = parse(rule['jsonpath'])
            matches = [match.value for match in jsonpath_expr.find(data)]
            for match in matches:
                attributes.append(Attribute(label=rule['label'], value=match))
        return attributes

    def generate_entity(data, rule):
        ent = Entity(label=rule['label'], has=[])
        for sub_rule in rule.get('has', []):
            jsonpath_expr = parse(sub_rule['jsonpath'])
            matches = [match.value for match in jsonpath_expr.find(data)]
            for match in matches:
                ent.has.append(Attribute(label=sub_rule['label'], value=match))
        if ent.keyattr == 'comboid':
            comboid = ent.get_comboid()
            ent.has.append(comboid)
        if not ent.keyval:
            print(f"Missing {ent.keyattr} from {ent}. Skipping creation.")
            return False
        return ent

    def parse_entities(data, rules):
        entities = []
        for rule in rules:
            newent = generate_entity(data, rule)
            if newent:
                entities.append(newent)
        return entities

    def generate_relation(data, rule):
        rel = Relation(label=rule['label'], has=[])
        for sub_rule in rule.get('has', []):
            jsonpath_expr = parse(sub_rule['jsonpath'])
            matches = [match.value for match in jsonpath_expr.find(data)]
            for match in matches:
                rel.has.append(Attribute(label=sub_rule['label'], value=match))
            if rel.has or rel.players:
                if rel.keyattr == 'comboid':
                    comboid = rel.get_comboid()
                    rel.has.append(comboid)
                if rel.keyattr and not rel.keyval:
                    print(f"Missing {rel.keyattr} from {rel}. Skipping creation.")
                    return False
            else:
                return False
        return rel

    def parse_relations(data, rules):
        relations = []
        for rule in rules:
            relation = generate_relation(data, rule)
            if not relation:
                continue
            players = rule.get('players', [])
            for player_type, player_rules in players.items():
                for player_rule in player_rules:
                    player_entity = generate_entity(data, player_rule)
                    if player_entity.has:
                        if 'players' not in relation:
                            relation['players'] = {}
                        if player_type not in relation['players']:
                            relation['players'][player_type] = []
                        relation['players'][player_type].append(player_entity)

        return relations

    parsed_rules = {
        'attributes': parse_attributes(data, rules.get('attributes', [])),
        'entities': parse_entities(data, rules.get('entities', [])),
        'relations': parse_relations(data, rules.get('relations', []))
    }
    return parsed_rules

# DEBUG Load the JSON file
with open('./data/host-myblob.json', 'r') as f:
    json_blob = json.load(f)

# This part would go in the parser section of the individual plugins
parsing_rules = {
    'attributes':[
        {'jsonpath': '$.ip_str', 'label': 'ip-address'},
        {'jsonpath': '$.last_update', 'label': 'last-update'},
        {'jsonpath': '$.tags[*]', 'label': 'tag'}
    ],
    'entities': [
        {'label': 'hostname', 'has': [
            {'jsonpath': '$.hostnames[*]', 'label': 'fqdn'},
            {'jsonpath': '$.ip_str', 'label': 'ip-address'}
        ]},
        {'label': 'domain', 'has': [
            {'jsonpath': '$.domains[*]', 'label': 'domain-name'},
            {'jsonpath': '$.ip_str', 'label': 'ip-address'}
        ]},
    ],
    'relations': [
        {'label': 'resolution', 'has': [
            {'jsonpath': '$.hostnames[*]', 'label': 'fqdn'},
            {'jsonpath': '$.ip_str', 'label': 'ip-address'}
        ], 'players': {
            'query': [{'label': 'hostname', 'has': [
                {'jsonpath': '$.hostnames[*]', 'label': 'fqdn'},
                {'jsonpath': '$.ip_str', 'label': 'ip-address'}
            ]}],
            'answer': [{'label': 'ip', 'has': [
                {'jsonpath': '$.ip_str', 'label': 'ip-address'}
            ]}]
        }}
    ],
}

# Parse the JSON using the defined rules
parsed_result = process_parsing_rules(json_blob, parsing_rules)

# DEBUG Print the parsed result
from pprint import pprint, pformat
pprint(parsed_result)

# Add meta attributes

ledsrc1 = Attribute(label='ledsrc', value='192.168.1.100')
ledsrc2 = Attribute(label='ledsrc', value='hunt-123')
dd = Attribute(label='date-discovered', value='2024-05-16T23:00Z')
meta = [ledsrc1, ledsrc2, dd]

final_things = {'attributes':[], 'entities':[], 'relations':[]}
for ttype, things in parsed_result.items():
    if ttype == 'attributes':
        final_things[ttype]=things
        continue
    for thing in things:
        thing.has += meta
        final_things[ttype].append(thing)

return final_things