PaloAltoNetworks / cobra-tool

Cloud Offensive Breach and Risk Assessment (COBRA) Tool
Apache License 2.0
68 stars 30 forks source link

Refactor scenarios #19

Open mdorn opened 1 month ago

mdorn commented 1 month ago

Is your feature request related to a problem?

Currently to create a new scenario, you need to duplicate an existing scenario folder and replace the relevant code with logic relating to your new scenario. Hundreds of lines of code remain the exact same.

Describe the solution you'd like

I would like to see the code refactored so that, for example, a contributor could simply subclass a base class and/or compose a configuration file that reuses code from a single module wherever possible. This will make it far easier and cleaner for contributors to add scenarios. It will also make test automation much easier, as the project matures.

Describe alternatives you've considered

Right now the only way i've really considered is to create a base class to inherit from (I'll add some more detail in the comments).

mdorn commented 1 month ago

I've started working on a base class that new scenarios could inherit from. So far it looks as below (note the idea is that none of the code seen here would have to be duplicated in new scenarios, as it currently does). Currently more or less in Proof of Concept stage, but I'm happy to continue in this direction and provide a working implementation, but wanted to get any feedback first.

#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""This module provides a base class for COBRA scenarios."""
import json
import os
import subprocess
import webbrowser
from pathlib import Path
from termcolor import colored
from core.helpers import loading_animation, slugify

class BaseScenario(object):
    """Abstract base class for scenario to be inherited by child classes"""
    def __init__(self):
        self.title = 'Title of scenario'
        self.description = 'Scenario description.'
        self.slug = slugify(self.title)  # e.g. title-of-scenario
        self.path = ''
        self.iac_output_data = None

    def setup(self):
        """Deploy resources needed for the scenario."""
        self._deploy_infra()
        # NOTE: other setup steps, including the deployment of additional
        # resources, can be coded in the subclass

    def execute_attack(self):
        """Run the attack scenario on the deployed infra/resources."""
        pass

    def teardown(self):
        """Destroy scenario resources and clean up."""
        self._destroy_infra()
        # NOTE: other setup steps, including the deletion of additional
        # resources, can be coded in the subclass

    def generate_report(self):
        """Generate report."""
        # FIXME: standardize location of single template
        html_template = ''
        with open('cobra-report-{}.html'.format(self.slug), 'w+') as file:
            file.write(html_template)
        webbrowser.open_new_tab(
            'file://{}/cobra-report-{}.html'.format(str(Path.cwd()), self.slug)
        )

    def _deploy_infra(self):
        """Deploy required IaC infrastructure."""
        print("-"*30)
        print(colored("Rolling out Infra", color="red"))
        loading_animation()
        print("-"*30)

        file_path = "./core/aws-scenario-0-output.json"
        if os.path.exists(file_path):
            os.remove(file_path)
            print("File '{}' found and deleted.".format(file_path))
        else:
            print("File '{}' not found.".format(file_path))  # TODO: unneeded?

        # FIXME: derive the paths below and stack name from scenario metadata
        subprocess.call("cd ./scenarios/scenario_0/infra/ && pulumi up -s aws-scenario-0 -y", shell=True)
        subprocess.call("cd ./scenarios/scenario_0/infra/ && pulumi stack -s aws-scenario-0 output --json >> ../../../core/aws-scenario-0-output.json", shell=True)
        with open("./core/aws-scenario-0-output.json", "r") as file:
            self.iac_output_data = json.load(file)

        subprocess.call(
            'cd {}/infra/ && pulumi destroy'.format(self.path), shell=True
        )

    def _destroy_infra(self):
        """Destroy the IaC stack."""
        subprocess.call("cd ./scenarios/scenario_0/infra/ && pulumi destroy", shell=True)
mdorn commented 1 month ago

See PR #20