mandiant / capa

The FLARE team's open-source tool to identify capabilities in executable files.
Apache License 2.0
3.98k stars 499 forks source link

Extract indicators (HBI/NBI) around capability detections #1907

Open mr-tz opened 6 months ago

mr-tz commented 6 months ago

Capabilities often have associated host-based and network-based indicators (HBIs and NBIs). Especially these examples (by rule namespaces) come to mind:

We often encounter an HBI or NBI as a string used close around a capability, e.g. as argument to an API call.

It would be worth exploring if we can automatically:

I suspect this could work very well with in the dynamic analysis flavor, but also for static extraction on basic samples could work quite well.

aaronatp commented 6 months ago

I agree that it would be interesting to incorporate these things into capa! I'll have a closer look at some capa code and sandbox data and make a proposal for how we can implement some of these features!

mr-tz commented 5 months ago

The Practical Malware Analysis book lab 03-02.dll may be a good test case here.

aaronatp commented 5 months ago

Hi @mr-tz here are my current ideas about the web domain extractors. Hope to have it finished up soon - please let me know if you have any questions!

import dnspython
from pathlib import Path
from typing import Generator, Iterator, Dict
from capa.capa import ida, ghidra
from capa.features.address import Address
from capa.features.extractors.base_extractor import FeatureExtractor, FunctionHandle
from capa.features.extractors import pefile, elffile, viv, cape, dotnetfile
import viv_utils

from capa.helpers import get_auto_format
from capa.features.common import (
     FORMAT_IDA,
     FORMAT_GHIDRA,
     FORMAT_PE,
     FORMAT_ELF,
     FORMAT_VIV,
     FORMAT_CAPE,
     FORMAT_DOTNET,
)

import re

def default_extract_domain_names(file: Path) -> Iterator[str]:
    """yield web domain regex matches from list of strings""" 
    # should the following be turned into a constant?
    # See this Stackoverflow post that discusses the parts of this regex (http://stackoverflow.com/a/7933253/433790)
    domain_pattern = r"^(?!.{256})(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+(?:[a-z]{1,63}|xn--[a-z0-9]{1,59})$"
    for string in get_strings(file):
        if re.search(domain_pattern, string):
            yield string

def get_strings(file: Path) -> Iterator[str]:  # we say ' buf = Path(path).read_bytes()' below - is path str?
    '''different extractors implement 'extract_file_strings' in slightly different ways'''
    format_ = get_auto_format(file)
    if format_ == FORMAT_IDA:
        strings, _ = ida.helpers.extract_file_strings()
    elif format_ == FORMAT_GHIDRA:
        strings, _ = ghidra.helpers.extract_file_strings()
    else:
        buf = file.read_bytes()
        if format_ == FORMAT_PE:
            strings, _ = pefile.extract_file_strings(buf)
        elif format_ == FORMAT_ELF:
            strings, _ = elffile.extract_file_strings(buf)
        elif format_ == FORMAT_VIV:
            strings, _ = viv.file.extract_file_strings(buf)
        elif format_ == FORMAT_CAPE:
            strings, _ = cape.file.extract_file_strings(buf)

    return strings

def verbose_extract_domain_names(extractor: FeatureExtractor, file: Path) -> Generator[str, None, None]:
    """yield web domain regex matches from list of strings""" 
    # should the following be turned into a constant?
    # See this Stackoverflow post that discusses the parts of this regex (http://stackoverflow.com/a/7933253/433790)
    domain_pattern = r"^(?!.{256})(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+(?:[a-z]{1,63}|xn--[a-z0-9]{1,59})$"
    domain_counter_dict = {}
    for string in get_strings(file):
        if re.search(domain_pattern, string):
            try:
                domain_counter_dict[string] += 1
            except KeyError:
                domain_counter_dict[string] = 1

    for string, total_occurrances in domain_counter_dict:
        yield formatted_verbose(extractor, file, string, total_occurrances)

def formatted_verbose(extractor: FeatureExtractor, file: Path, string: str, total_occurrances: int) -> str:
    """
    example output:

    capa -v suspicious.exe
    -----------------------
    google.com
        |---- IP address:
        |        |----192.0.0.1
        |        |----192.0.0.2
        |----Protocols used to communicate with google.com: HTTP (1), HTTPS (2)
        |----3 occurrances
    """
    return (f"{string}\n"
            + f"    |---- {ip_address_statement(string)}\n"
            + f"    |---- {network_protocol_statement(extractor, file, string)}\n"
            + f"    |---- {total_occurrances} occurrances\n")

def ip_address_statement(string: str) -> str:
    resolver = dns.resolver.Resolver()
    answer = resolver.query(f"{string}", "A")
    if len(answer) == 1:
        return "IP address: ".join(ip_address for ip_address in answer)
    else:
        statement = "IP addresses:\n"
        counter = 0
        for ip_address in answer:
            statement.join(f"|    |----{ip_address}\n")
            counter += 1
            if counter = 5:
                statement.join(f"|    |----{total_ips(string) - 5} IP addresses not shown")
                return statement

        return statement

def network_protocol_statement(extractor: FeatureExtractor, file: Path, string: str) -> str:
    """get_protocols supports the following protocols: Ftp, Https, Http"""
    protocols = get_protocols(extractor, file, string)
    if len(protocols) = 1:
        return f"Protocol used to communicate with {string}: ".join(f"{protocol} ({count})" for protocol, count in protocols)
    else:
        statement = f"Protocols used to communicate with {string}:\n"
        for protocol, count in protocols:
            statement.join(f"|    |----{protocol} ({count})\n")

        return statement

def get_protocols(extractor: FeatureExtractor, file: Path, domain: str) -> Dict[str, int]:
    """
    for every occurrance of 'domain' in the extractor, we see which function (e.g., Windows API)
    it is a parameter of
    """
    domain_protocols = {}

    occurrances = domain_occurrances_in_file(file, domain)

    while occurrances > 0:
        try:
            caller_func = yielded_caller_func_static(extractor, domain, file, 0)
        except NotImplementedError:  # if StaticExtractor methods are not implemented, we call DynamicExtractor yielder
            caller_func = yielded_caller_func_dynamic(extractor, domain, file, 0)

        if "Ftp" in caller_func:
            increment_protocol(domain_protocols, "FTP")
        elif "Https" in caller_func:
            increment_protocol(domain_protocols, "HTTPS")
        elif "Http" in caller_func and 'Https' not in caller_func:
            increment_protocol(domain_protocols, "HTTP")
        # elif 'other protocol':
            # pass
        else:
            # Network protocol not found
            increment_protocol(domain_protocols, caller_func)

        occurrances = occurrances - 1

    return domain_protocols # dict of all the protocols used to interact with a domain and number of times each interacts

def domain_occurrances_in_file(file, domain) -> int:
    counter = 0
    for string in get_strings(file):
        if string == domain:
            counter += 1

    return counter

def increment_protocol(protocols_dict: dict, protocol: str) -> Dict[str, int]:
    try:
        protocols_dict[protocol] += 1
    except KeyError:
        protocols_dict[protocol] = 1
    return protocols_dict

def yielded_caller_func_static(extractor: FeatureExtractor, target_string: str, file: Path, start_position: Address):
    for func in extractor.get_functions():
        for feature, addr in func.extract_function_features():
            if addr < start_position:
                continue

            # would function names be stored in any of these locations? could seem to find the answer from code snippets on GitHub
            if feature.value == target_string:
                if any(['Http', 'Https', 'Ftp']) in get_function_name(func, file):
                    yield func.inner

                else:
                    try:
                        yield from yielded_caller_func_static(extractor, func, file, addr)
                    except StopIteration:
                        yield "Network protocol not found - please open an issue on GitHub!"

def get_function_name(func: FunctionHandle, file: Path) -> str:
    format_ = get_auto_format(file)
    if format_ == FORMAT_VIV:
        function_name = viv_utils.get_function_name(func.address)
    elif format_ == FORMAT_PE:
        function_name = pefile.get_function_name(func.address)
    elif format_ == FORMAT_DOTNET:
        function_name = dotnetfile.get_function_name(func.address)
    elif format_ == FORMAT_ELF:
        function_name = elffile.get_function_name(func.address)
    else:
        function_name = 'Problema'

    return function_name

def yielded_caller_func_dynamic(extractor: FeatureExtractor, target_string: str, file: Path, start_position: Address):
    """
    we look into an extractor to see what APIs operates on a web domain

    we loop through processes/threads/calls looking for web domains.
    if we find one, we see if the API (that operates on the web domain)
    contains a network protocol (e.g., "Http").
    most Windows API network management functions contain their protocol in
    their name (e.g., "HttpOpenRequestA").
    if the API does not contain the network protocol, this function yields from
    itself but looks for referenecs to the API name to see if a network management
    function operates on this API (e.g., like how "HttpOpenRequestA" operates
    on a handle returned by "InternetConnect", which contains a web domain).

    """
    for ph in extractor.get_processes():
        for th in extractor.get_threads(ph):
            for ch in extractor.get_calls(ph, th):
                for feature, addr in extractor.extract_call_features(ph, th, ch):
                    if addr < start_position:  # if 'yield from', ignores references to api_name that occur before web domain
                        continue

                    if feature.value == target_string:
                        api_name = extractor.extract_call_features(ph, th, ch)[0][0]
                        if any(['Http', 'Https', 'Ftp']) in api_name:
                            yield api_name

                        else:
                            try:
                                yield from yielded_caller_func_dynamic(extractor, api_name, file, addr)
                            except StopIteration:
                                yield "Network protocol not found - please open a GitHub issue!"
mr-tz commented 5 months ago

is there a commit/branch/PR I can comment on inline? It would also be helpful to see some example output.

aaronatp commented 5 months ago

@mr-tz hopefully by tomorrow! I opened a PR yesterday but deleted it and decided to restructure a couple parts. I'll make sure to include some example output!