MISP / MISP-STIX-Converter

A utility repo to assist with converting between MISP and STIX formats
GNU Lesser General Public License v3.0
64 stars 31 forks source link

Better STIX Parsing #23

Open alatif113 opened 7 years ago

alatif113 commented 7 years ago

Currently every observable is parsed out of a given STIX input and added as an attribute to a MISP event. In many cases this can give undesired results. A better solution would be as follows:

  1. Create an observable map (python dictionary) with the observable id as a key and the observable object as the value, for every observable within the STIX package.
  2. Create a TTP map (python dictionary) with the TTP id as a key and the TTP object as the value, for every TTP within the STIX package.
  3. If there are indicators within the STIX package, parse the indicator. Use the observable map to retrieve observables part of that indicator (typically they'll be represented only by the ID within the indicator). Use the TTP map to retrieve TTPs part of that indicator (typically they'll be represented only by the ID within the TTP).
  1. If no indicators exists (some simple feeds will not use them), you can fall back to parsing straight observables. Just loop through the observable map you created before.

Here's some sample code of my implementation (It was a rush job so there is no logging/error checking in most of it) Simply import the stix_to_misp.py file and call the build_event method to create a MISP event. I use my own taxonomy for confidence and TTPs, but those can be changed as needed:

stix_parser.py

from cybox.objects import email_message_object, file_object, address_object, socket_address_object
from cybox.objects import domain_name_object, hostname_object, uri_object

def build_observable_map(pkg):
    map = {}
    if pkg.observables:
        for o in pkg.observables.observables:
            try:
                map[o.id_] = o
            except AttributeError:
                continue
    return map

def build_ttp_map(pkg):
    map = {}
    if pkg.ttps:
        for t in pkg.ttps.ttps:
            try:
                map[t.id_] = t.title
            except AttributeError:
                continue
    return map

def process_indicators(pkg):
    observable_map = build_observable_map(pkg)
    ttp_map = build_ttp_map(pkg)
    indicators = []
    if pkg.indicators:
        for i in pkg.indicators:
            indicator = dict()
            indicator["itypes"] = get_indicator_types(i)
            indicator["confidence"] = get_confidence(i)
            indicator["ttps"] = get_ttps(i, ttp_map)
            indicator["attributes"] = get_indicator_attributes(i, observable_map)

            indicators.append(indicator)

    return indicators

def process_observables(pkg):
    observable_map = build_observable_map(pkg)
    if pkg.observables:
        return get_observable_attributes(pkg.observables.observables, observable_map)
    else:
        return []

def get_indicator_types(indicator):
    itypes = []
    for i in indicator.indicator_types:
        itypes.append(i.value)

    return itypes

def get_confidence(indicator):
    if indicator.confidence:
        return indicator.confidence.value.value
    else:
        return "Unknown"

def get_observable_attributes(observables, observable_map):
    attributes = []
    for o in observables:
        if hasattr(o, "idref") and o.idref:
            try:
                observable = observable_map[o.idref]
            except KeyError:
                continue

            extract_observable(observable, attributes)
        else:
            extract_observable(o, attributes)

    return attributes

def get_indicator_attributes(indicator, observable_map):
    observables = []
    if indicator.observable:
        observables += process_observable(indicator.observable)
    if indicator.observables:
        for o in indicator.observables:
            observables += process_observable(o)

        return get_observable_attributes(observables, observable_map)

def get_ttps(indicator, ttp_map):
    ttps = []
    if indicator.indicated_ttps:
        for ttp in indicator.indicated_ttps:
            if ttp.item.idref:
                try:
                    t = ttp_map[ttp.item.idref]
                except KeyError:
                    continue

                ttps.append(t)
            else:
                ttps.append(ttp.item.title)
    return ttps

def process_observable(observable):
    observables = []

    if observable.observable_composition:
        if observable.observable_composition.operator == "OR":
            for o in observable.observable_composition.observables:
                observables += process_observable(o)
    else:
        observables.append(observable)

    return observables

def add_attribute(obj, attributes):
    if not any(a["value"] == obj["value"] for a in attributes):
        attributes.append(obj)

# Dedicated to File object
def add_file_attr(obj, attributes):

    if obj.file_name:
        add_attribute({"type": "filename", "value": str(obj.file_name)}, attributes)

    if obj.md5:
        if len(obj.md5) == 32:
            add_attribute({"type": "md5", "value": str(obj.md5)}, attributes)

    if obj.sha1:
        if len(obj.sha1) == 40:
            add_attribute({"type": "sha1", "value": str(obj.sha1)}, attributes)

    if obj.sha256:
        if len(obj.sha256) == 64:
            add_attribute({"type": "sha256", "value": str(obj.sha256)}, attributes)

    if obj.sha512:
        if len(obj.sha512) == 128:
            add_attribute({"type": "sha512", "value": str(obj.sha512)}, attributes)

# Dedicated to Address Object (DB)
def add_addr_attr(obj, attributes):
    if obj.is_source:
        add_attribute({"type": "ip-src", "value": str(obj.address_value)}, attributes)

    elif obj.is_destination:
        add_attribute({"type": "ip-dst", "value": str(obj.address_value)}, attributes)

    else:
        # We don't know, first check if it's an IP range
        if hasattr(obj, "condition") and obj.condition:
            if obj.condition == "InclusiveBetween":
                add_attribute({"type": "ip-dst", "value": str(obj.address_value[0])}, attributes)
                add_attribute({"type": "ip-dst", "value": str(obj.address_value[1])}, attributes)
        else:
            add_attribute({"type": "ip-dst", "value": str(obj.address_value)}, attributes)

# Dedicated to EmailMessage (DB)
def add_email_attr(obj, attributes):
    if obj.header:
        # We have a header, can check for to/from etc etc
        if obj.header.from_:
            add_attribute({"type": "email-src", "value": str(obj.header.from_.address_value)}, attributes)
        if obj.header.to:
            for mail in obj.header.to:
                add_attribute({"type": "email-dst", "value": str(mail.address_value)}, attributes)
        if obj.header.subject:
            add_attribute({"type": "email-subject", "value": str(obj.header.subject)}, attributes)

# Dedicated to Domain name (DB)
def add_domain_attr(obj, attributes):
    add_attribute({"type": "domain", "value": str(obj.value)}, attributes)

# Dedicated to Hostname (DB)
def add_hostname_attr(obj, attributes):
    add_attribute({"type": "hostname", "value": str(obj.hostname_value)}, attributes)

# Dedicated to URI (DB)
def add_uri_attr(obj, attributes):
    add_attribute({"type": "url", "value": str(obj.value)}, attributes)

def extract_observable(observable, attributes):

    if hasattr(observable, "object_") and observable.object_:
        prop = observable.object_.properties

        if type(prop) == address_object.Address:
            # Now script uses buildAddressAttribute (DB)
            add_addr_attr(prop, attributes)

        elif type(prop) == domain_name_object.DomainName:
            # Now script uses buildDomainNameAttribute (DB)
            add_domain_attr(prop, attributes)

        elif type(prop) == hostname_object.Hostname:
            # Now script uses buildHostnameAttribute
            add_hostname_attr(prop, attributes)

        elif type(prop) == socket_address_object.SocketAddress:
            if prop.ip_address:
                add_addr_attr(prop.ip_address, attributes)
            if prop.hostname:
                add_hostname_attr(prop.hostname, attributes)

        elif type(prop) == uri_object.URI:
            # Now script uses buildURIAttribute (DB)
            add_uri_attr(prop, attributes)

        elif type(prop) == file_object.File:
            # Now script uses buildFileAttribute (DB)
            add_file_attr(prop, attributes)

        elif type(prop) == email_message_object.EmailMessage:
            # Now script uses buildEmailMessageAttribute (DB)
            add_email_attr(prop, attributes)
        else:
            pass
    else:
        pass

stix_to_misp.py

from stix_parser import process_indicators, process_observables
from stix.core import STIXPackage
from tempfile import SpooledTemporaryFile
import pymisp

def load_stix(stix):
    # Just save the pain and load it if the first character is a <

    if isinstance(stix, STIXPackage):
        # Oh cool we're ok
        # Who tried to load this? Honestly.
        return stix

    elif hasattr(stix, 'read'):
        try:
            stix_package = STIXPackage.from_xml(stix)
        except Exception as ex:
            print("Could not load file")
            return

        return stix_package

    elif isinstance(stix, str):
        # It's text, we'll need to use a temporary file
        f = SpooledTemporaryFile(max_size=10 * 1024)
        f.write(stix.encode("utf-8"))
        f.seek(0)

        return load_stix(f)

def build_event(content, **kwargs):
    pkg = load_stix(content)
    event = pymisp.MISPEvent()

    if pkg.stix_header and pkg.stix_header.title:
        event.info = pkg.stix_header.title
    else:
        event.info = "NO_TITLE"

    event.distribution = kwargs.get("distribution", 0)
    event.threat_level_id = kwargs.get("threat_level_id", 3)
    event.analysis = kwargs.get("analysis", 0)

    if pkg.indicators:
        indicators = process_indicators(pkg)
        for i in indicators:
            build_attribute(i, event)
    elif pkg.observables:
        indicator = dict()
        indicator["attributes"] = process_observables(pkg)
        build_attribute(indicator, event)
    else:
        return None

    unique_attr = []
    for attr in event.attributes:
        if not any(attr.value == u.value for u in unique_attr):
            unique_attr.append(attr)

    event.attributes = unique_attr

    return event

def build_attribute(indicator, event):

    tags = []

    if "confidence" in indicator and indicator["confidence"]:
        tags.append({"name": "Confidence:{}".format(indicator["confidence"])})
    else:
        tags.append({"name": "Confidence:Unknown"})
    if "ttps" in indicator and indicator["ttps"]:
        for ttp in indicator["ttps"]:
            tags.append({"name": "TTP:{}".format(ttp)})
    if "itypes" in indicator and indicator["itypes"]:
        for itype in indicator["itypes"]:
            tags.append({"name": "Detail:{}".format(itype)})
    if "attributes" in indicator and indicator["attributes"]:
        for attr in indicator["attributes"]:
            event.add_attribute(attr["type"], attr["value"], Tag=tags)
alatif113 commented 7 years ago

@FloatingGhost I added sample code but the formatting is not cooperating. Can you help me fix it please?

iglocska commented 7 years ago

Sounds like some very sane ideas!

FloatingGhost commented 7 years ago

o christ on a quadbike that's a lot of edited code

I might take a look when I'm feeling more masochistic than usual

alatif113 commented 7 years ago

@FloatingGhost lol most of it is just the parsing of the different observable types taken directly from the existing code.

FloatingGhost commented 7 years ago

just

JUST

Nothing with STIX is ever "JUST"

It's always rooted to the hellish floor of the so-called standard and is never as simple as it seems. Inevitably there'll be a tendril of this eldritch horror that extends beyond the realm of human comprehension and into STIX world, in which it terminates in the STIXPackage of unimaginable terror

iglocska commented 7 years ago

@alatif113, could you do a pull request for the changes? It does sound like a clean approach indeed.

alatif113 commented 7 years ago

@iglocska I'm not very git savvy. Don't really know how to do that.

iglocska commented 7 years ago

Should be simple enough, simply fork the project by clicking the "Fork" button on top, this will create a copy of the repository under your user name on github, so https://github.com/alatif113/MISP-STIX-Converter

You already have the code-base sitting locally on your machine, which is what you have modified. Create a commit with all of your changes by simply doing the following:

git add /path/to/your/changed/file

Repeat it for all of the files that you have modified (to see a list of all files that you've changed just type git status from within the MISP-STIX-Converter directory)

Once you are done it's time to commit the changes:

git commit -m "My STIX parsing improvements"

Once done, add your own github repository as a remote

git remote add myfork https://github.com/alatif113/MISP-STIX-Converter.git

Then push your committed changes to your fork:

git push myfork master

Once this is done, just go to

https://github.com/alatif113/MISP-STIX-Converter

and open up a pull request by clicking the "New pull request" button (upper left side, next to the branch name)

This should be it!

alatif113 commented 7 years ago

@iglocska Ahh I see what you mean. I didn't edit any files directly (I didnt need the MISP to STIX part), but rather created my own 2 files (in the code above) solely for parsing and importing STIX to MISP, heavily using code that already existed within the project to parse the actual observable types.

It's just a proof of concept and is missing trivial things such as error checking and logging.

iglocska commented 7 years ago

Ah ok, I see. Any chance you could move that to MISP-STIX-Converter and integrate it directly? Or is it too different from how the converter works?

alatif113 commented 7 years ago

@iglocska I think replacing the buildEvent function within the buildMISPAttribute file with the build_event function within the stix_to_misp.py file above should do it, barring the fact there would now be a lot of unused old functions.

There would also need to be some agreement on the taxonomy for tags. I use Confidence:<value> and TTP:<value> for mine, but I don't know if there is already some standard that exists out there.

Unfortunately I don't have the time to actually go through and do that (not at the moment at least). Just wanted to bring something to the dev's attention with sample code I utilized for my use case.

iglocska commented 7 years ago

Thanks a lot for the input, we'll keep this issue open until we can get around to implementing it. It indeed looks very promising!

alatif113 commented 7 years ago

@iglocska No problem! Got the idea from how many of the SIEMs and commercial threat platforms parse STIX files.

FloatingGhost commented 7 years ago

Well if they parsed MISP everyone's life would be a lot easier :<

STIX needs to die.

iglocska commented 7 years ago

It looks like it's here to stay though, so we should make sure that the parser makes as much sense as possible - we'll definitely take a look at this too at some point, @FloatingGhost, to preserve some of your sanity ;))

ag-michael commented 4 years ago

Any updates on this? @iglocska , it seems you approve of the general idea and there is demand for this feature. Any chance of accepting PR's related to this soon?

ag-michael commented 4 years ago

@alatif113 I have a Pending PR https://github.com/MISP/MISP-STIX-Converter/pull/40 that addresses some of what you're wanting, care to take a look and comment? This is an important subject for me as well.

adulau commented 4 years ago

So we don't really maintain this as there is a full-blown STIX 1.x and 2.x import/export in MISP.

As the original maintainer is not maintaining this external package anymore, I can merge those. Just let me know if it works for you and I'll merge it.

ag-michael commented 4 years ago

@adulau it would be great if you can merge it. But I didn't know about the MISP feature for STIX import, do you know why MISP-Taxii-Server isn't using that? That's the only way to feed MISP with TAXII that I've found, can you point me in the direction of the docs that show how to import STIX directly into MISP

The STIX section here: https://pymisp.readthedocs.io/en/latest/tools.html uses pymisp.tools which in turn uses this project.

Edit:

I figured out the upload_stix() api, it works, but it doesn't parse confidence,title,information source,etc... I don't want to waste any effort, so can you tell me if MISP-Taxii-Server is maintained (have a pending PR there too), and if it is, I'd like to create a PR/FR to have additional metadata parsed by MISP, but in the meanwhile, it would be great if you can review the existing PR for this project.