synfinner / KEVin

The missing API for the CISA Known Exploited Vulnerabilities Catalog. This repository contains the source running at kevin.gtfkd.com
https://kevin.gtfkd.com/
7 stars 0 forks source link

KEV Threat Actor Search #198

Closed synfinner closed 2 months ago

synfinner commented 2 months ago

Looking at restructuring the AllKEV resource so that it will have fuzzy search for actors. We will also need to ensure DB indexing on the threat data field for adversaries.

class AllKevVulnerabilitiesResource(BaseResource):
    def get(self):
        """
        Retrieve all KEV vulnerabilities with optional filtering, sorting, and pagination.

        This method fetches vulnerabilities from the database, allowing for
        pagination, sorting, and filtering based on user-defined parameters.
        It returns a structured response containing the vulnerabilities and
        pagination information.

        Query Parameters:
        - page (int): The page number for pagination (default is 1).
        - per_page (int): The number of results per page (default is 25, max is 100).
        - sort (str): The field to sort by (default is "dateAdded").
        - order (str): The sort order, either "asc" or "desc" (default is "desc").
        - search (str): A search term to filter vulnerabilities.
        - filter (str): A filter to include only vulnerabilities related to ransomware.
        - actor (str): A search term to filter vulnerabilities by potential threat actors.

        Returns:
        Response: A JSON response containing pagination info and a list of
                  vulnerabilities, or an error message if an internal error occurs.
        """
        try:
            try:
                page = int(request.args.get("page", 1))
                per_page = max(1, min(100, int(request.args.get("per_page", 25))))
            except ValueError:
                return self.handle_error("Invalid page or per_page parameter. Must be integers.", 400)

            sort_param = sanitize_query(request.args.get("sort", "dateAdded"))
            order_param = sanitize_query(request.args.get("order", "desc"))
            search_query = sanitize_query(request.args.get("search", ''))
            filter_ransomware = sanitize_query(request.args.get("filter", ''))
            actor_query = sanitize_query(request.args.get("actor", ''))

            query = {"$text": {"$search": search_query}} if search_query else {}
            if filter_ransomware.lower() == 'ransomware':
                query["knownRansomwareCampaignUse"] = "Known"
            if actor_query and actor_query.strip():  # Ensure actor_query is not empty or just whitespace
                # Fuzzy match for actor search
                actor_query = {"$or": [
                    {"openThreatData.communityAdversaries": {"$regex": actor_query.strip(), "$options": "i"}},
                    {"openThreatData.adversaries": {"$regex": actor_query.strip(), "$options": "i"}}
                ]}
                query.update(actor_query)  # Merge actor query into the main query

            sort_order = DESCENDING if order_param == "desc" else ASCENDING
            sort_criteria = [(sort_param, sort_order)]

            # Check if actor_query is present to decide on caching
            if actor_query:
                # No caching if actor is specified
                total_vulns = self.count_documents(query)
                vulnerabilities = self.fetch_vulnerabilities(query, sort_criteria, page, per_page)
            else:
                # Use caching when actor is not specified
                @cache.cached(timeout=120, key_prefix='kev_all_listing', query_string=True)
                def cached_fetch():
                    total_vulns = self.count_documents(query)
                    vulnerabilities = self.fetch_vulnerabilities(query, sort_criteria, page, per_page)
                    return total_vulns, vulnerabilities

                total_vulns, vulnerabilities = cached_fetch()

            total_pages = math.ceil(total_vulns / per_page)

            return self.make_json_response({
                "page": page,
                "per_page": per_page,
                "total_vulns": total_vulns,
                "total_pages": total_pages,
                "vulnerabilities": [serialize_vulnerability(v) for v in vulnerabilities]
            })
        except Exception as e:
            return self.handle_error("An internal server error occurred! ", 500)

    def count_documents(self, query):
        """Count the total number of vulnerabilities matching the query."""
        try:
            return collection.count_documents(query)
        except Exception as e:
            raise e

    def fetch_vulnerabilities(self, query, sort_criteria, page, per_page):
        """Fetch vulnerabilities from the database."""
        try:
            cursor = collection.find(query).sort(sort_criteria).skip((page - 1) * per_page).limit(per_page)
            return list(cursor)  # Return cursor as a list
        except Exception as e:
            raise e