microsoft / msticpy

Microsoft Threat Intelligence Security Tools
Other
1.77k stars 319 forks source link

Add support for package-based msticpy extensions and plugins #671

Open rcobb-scwx opened 1 year ago

rcobb-scwx commented 1 year ago

Current State

msticpy supports a variety of extensibility options, including custom pivot functions, data providers, TI providers, and context providers through a class-based plugin system.

Problem to Solve

msticpy does not support extensions with additional Python dependencies.

Proposal

Decisions to Make

If all of the above is acceptable, the important decisions are:

I will open a draft PR shortly containing these changes for review.

ianhelle commented 1 year ago

I'm good with extensions Re: where in the project structure? We could create a dev folder. I think it would be cool to extend (at some point) the cookiecutter projects to create specific data providers and ti providers templates...maybe other things in the future.

ianhelle commented 1 year ago

Adding "queries" folder to imported queries for custom drivers.

My thoughts: Implement the following in either the drivers init or in another module and import them into init (oh that might not be possible due to recursive imports)

import sys
from pathlib import Path
_USE_IMPORTLIB_RES_FILES = sys.version_info >= (3, 9)
if _USE_IMPORTLIB_RES_FILES:
    from importlib import resources

# I think this won't work if the driver is in a zip rather than FS
# maybe we can use pkgutil.get_data to get the file contents?
def get_driver_queries_folder(driver_class: type):
    # try to get queries using py3.9 importlib.resources.files
    # TODO

    # if this fails, fall back to pathlib

    driver_name = driver_class.__module__
    driver = sys.modules[driver_name]

    if driver.__file__ is None:
        return None
    driver_path = Path(driver.__file__)  # the offending line
    if driver_path.parent.joinpath("queries").is_dir():
        return driver_path.parent / "queries"
    return None

from msticpy.data import drivers

def is_custom_driver(driver_class: type):
    """Return true if the driver class is a custom driver."""
    return driver_class in drivers.CUSTOM_PROVIDERS.values()

def is_builtin_driver(driver_class: type):
    """Return true if the driver class is a builtin driver."""
    driver_names = {
        cls_name for _, cls_name in drivers._ENVIRONMENT_DRIVERS.values()
    }
    return driver_class.__name__ in driver_names

# testing code
from tests.testdata.plugins.data_prov import CustomDataProvA

print(is_custom_driver(CustomDataProvA), is_builtin_driver(CustomDataProvA))
get_driver_queries_folder(CustomDataProvA)

Then in msticpy.core.data_providers:

# line 129+:
# __init__ method
if driver.use_query_paths:
    logger.info("Using query paths %s", query_paths)
    data_env_queries.update(
        self._read_queries_from_paths(query_paths=query_paths, driver_class=self._driver_class)
    )

Then in _read_queries_from_paths


# line 336
def _read_queries_from_paths(self, query_paths, driver_class: type) -> Dict[str, QueryStore]:
    """Fetch queries from YAML files in specified paths."""
    settings: Dict[str, Any] = get_config("QueryDefinitions", {})
    all_query_paths: List[Union[Path, str]] = []
    for def_qry_path in settings.get("Default"):  # type: ignore
        # only read queries from environment folder
        builtin_qry_paths = self._get_query_folder_for_env(
            def_qry_path, self.environment_name
        )
        all_query_paths.extend(
            str(qry_path) for qry_path in builtin_qry_paths if qry_path.is_dir()
        )
    # <<NEW>> 
    # Add default queries from plugin drivers
    if drivers.is_custom_driver(driver_class):
        custom_qry_path = drivers.get_driver_queries_folder(driver_class)
        if custom_qry_path:
            all_query_paths.append(custom_qry_path)
    # <<END NEW>>
    if settings.get("Custom") is not None:
        for custom_path in settings.get("Custom"):  # type: ignore
            custom_qry_path = _resolve_path(custom_path)
            if custom_qry_path:
                all_query_paths.append(custom_qry_path)
    if query_paths:
        for param_path in query_paths:
            param_qry_path = _resolve_path(param_path)
            if param_qry_path:
                all_query_paths.append(param_qry_path)
    if all_query_paths:
        logger.info("Reading queries from %s", all_query_paths)
        return QueryStore.import_files(
            source_path=all_query_paths,
            recursive=True,
            driver_query_filter=self._query_provider.query_attach_spec,
        )
    # if no queries - just return an empty store
    return {self.environment_name: QueryStore(self.environment_name)}