tenable / pyTenable

Python Library for interfacing into Tenable's platform APIs
https://pytenable.readthedocs.io
MIT License
343 stars 172 forks source link

Use Native Python Types #329

Open kkirsche opened 3 years ago

kkirsche commented 3 years ago

Is your feature request related to a problem? Please describe. In the Tenable.SC python module, the library returns strings for various data types complicating interactions. As a result, end-users need to be aware of all the various data types in the API to properly interact with and manipulate the data. For example, calculating differences in times between various scans, processing things like ID's as integers, etc.

Describe the solution you'd like Use native python types

Describe alternatives you've considered I have written my own custom integration layer to handle type conversions to native python types and key styles.

Additional context Example from my integration layer:

from __future__ import annotations

from datetime import datetime
from typing import Any, List, Optional, TypedDict, Callable
from pydantic.dataclasses import dataclass

def convert_str_to_boolean(v: str) -> bool:
    # tenable uses a mix of values for booleans, we need to clean these up for consistency
    normalized_v = v.lower()
    if normalized_v == "true" or normalized_v == "yes":
        return True
    if normalized_v == "false" or normalized_v == "no":
        return False
    raise ValueError(
        "Value was not 'true', 'yes', 'false', or 'no'. Cannot convert to boolean"
    )

# batch conversion of attributes
def convert_attr_list_to_types(
    __obj: object, attr_list: List[str], factory: Callable
) -> object:
    for attr in attr_list:
        if getattr(__obj, attr) is not None:
            setattr(__obj, attr, factory(getattr(__obj, attr)))
    return __obj

# what Tenable sends, with comments to clarify what the values really are
class PolicyDetailDict(TypedDict):
    auditFiles: List[Any]
    canManage: str  # 'true' | 'false'
    canUse: str  # 'true' | 'false'
    context: str
    createdTime: str
    creator: UserDict
    description: str
    groups: Optional[List[Any]]
    ownerGroup: Optional[StrIDNameAndDescriptionDict]
    targetGroup: Optional[IntIDNameAndDescriptionDict]
    owner: Optional[UserDict]
    generateXCCDFResults: str  # 'true' | 'false'
    id: str  # int
    modifiedTime: str  # int - epoch time
    name: str
    families: Optional[List[PolicyFamilyDict]]
    preferences: PolicyPreferencesDict
    policyTemplate: PolicyTemplateDict
    status: str
    tags: str

 # what we return to a client
class SerializedPolicyDetailDict(TypedDict):
    auditFiles: List[Any]
    canManage: bool
    canUse: bool
    context: str
    createdTime: int
    createdTimeDate: str
    creator: SerializedUserDict
    description: str
    groups: List[Any]
    ownerGroup: Optional[SerializedIDNameAndDescriptionDict]
    targetGroup: Optional[SerializedIDNameAndDescriptionDict]
    owner: Optional[SerializedUserDict]
    generateXCCDFResults: bool
    id: int
    modifiedTime: int
    modifiedTimeDate: str
    name: str
    families: Optional[List[SerializedPolicyFamilyDict]]
    preferences: SerializedPolicyPreferencesDict
    policyTemplate: SerializedPolicyTemplateDict
    status: int
    tags: str

# A native layer for use in applications, this is what I'd expect to see pyTenable return
@dataclass
class PolicyDetail:
    audit_files: List[Any]
    can_manage: bool
    can_use: bool
    context: str
    created_time: datetime
    creator: User
    description: str
    generate_xccdf_results: bool
    groups: List[Any]
    id: int
    modified_time: datetime
    name: str
    preferences: PolicyPreferences
    families: List[PolicyFamily]
    policy_template: PolicyTemplate
    owner_group: Optional[IDNameAndDescription]
    target_group: Optional[IDNameAndDescription]
    owner: Optional[User]
    status: int
    tags: str

    @classmethod
    def from_dict(cls, d: PolicyDetailDict) -> PolicyDetail:
        to_create = tenablesc_keys_to_snake_case(d)
        if "preferences" in to_create:
            to_create["preferences"] = PolicyPreferences.from_dict(
                to_create["preferences"]
            )

        for k in ["groups", "families"]:
            if k not in to_create:
                to_create[k] = []
        for k in ["owner", "owner_group", "target_group"]:
            if k not in to_create:
                to_create[k] = None

        return cls(**to_create)

    def __post_init__(self) -> None:
        convert_attr_list_to_types(
            __obj=self,
            attr_list=["can_manage", "can_use", "generate_xccdf_results"],
            factory=convert_str_to_boolean,
        )
        convert_attr_list_to_types(__obj=self, attr_list=["id", "status"], factory=int)
        convert_attr_list_to_types(
            __obj=self,
            attr_list=["modified_time", "created_time"],
            factory=lambda x: datetime.fromtimestamp(int(x)),
        )

    def serialize(self) -> SerializedPolicyDetailDict:
        return SerializedPolicyDetailDict(
            auditFiles=self.audit_files,
            canManage=self.can_manage,
            canUse=self.can_use,
            context=self.context,
            createdTime=convert_datetime_to_milliseconds(self.created_time),
            createdTimeDate=convert_datetime_to_string(self.created_time),
            creator=self.creator.serialize(),
            description=self.description,
            groups=self.groups,
            generateXCCDFResults=self.generate_xccdf_results,
            id=self.id,
            modifiedTime=convert_datetime_to_milliseconds(self.modified_time),
            modifiedTimeDate=convert_datetime_to_string(self.modified_time),
            name=self.name,
            preferences=self.preferences.serialize(),
            policyTemplate=self.policy_template.serialize(),
            status=self.status,
            tags=self.tags,
            owner=self.owner.serialize() if self.owner else self.owner,
            ownerGroup=self.owner_group.serialize()
            if self.owner_group
            else self.owner_group,
            targetGroup=self.target_group.serialize()
            if self.target_group
            else self.target_group,
            families=[family.serialize() for family in self.families],
        )
SteveMcGrath commented 3 years ago

While I completely agree that work like this needs to be done, it would likely need to be part of a much larger modernization refactoring effort. Currently the pyTenable Tenable.sc and Tenable.io packages still rely on a lot of Python 2.x compatible code that would need to be modernized along with this.

kkirsche commented 3 years ago

That's perfectly reasonable, and I appreciate the consideration. Please let me know if there is any support needed to accelerate that type of effort.

EDIT: This comment assumes that the goal would be to do so soon, due to Python 2 being end-of-life. If realistically that is not a goal of the project due to enterprise or other customers, I fully understand.

levwais commented 3 years ago

this will be part of a bigger refactoring to Python 3.x code standards that is on our roadmap.

SteveMcGrath commented 2 years ago

So in diving into this further, and when looking into what would be needed to support all of Tenable.sc 5.x, this may be a much bigger addition than expected. This would also mean the library would have to be revved for every version of Tenable.sc, or any other products when they get revved to add new fields. I'm not sure is native datatypes are the right approach.

I do understand that SC's responses are almost entirely string values, and it's a pain having to recast them. my concern is that by forcing folks to upgrade the library potentially every rev of any product may create other issues.

There may be some alternative validation and transformation libraries that can handle data in a less lossy way.

SteveMcGrath commented 2 years ago

so Marshmallow might be a more appropriate path forward here, as it can handle unknown fields in a more appropriate manner.

https://marshmallow.readthedocs.io/en/stable/

In any case, implementing response attribute recasting across the whole library will dramatically increase the time to refactor the code on the pathway to the larger v2 refactor.

SteveMcGrath commented 2 years ago

So please excuse the stream of posts here, but I'll be treating this issue as a bit of a scratch pad while I research this. In either case, marshmallow shows the most promise so far. I was already looking into using it to replace a lot of madness in the IO package around filtering.

Custom Fields: https://marshmallow.readthedocs.io/en/stable/custom_fields.html Examples: https://marshmallow.readthedocs.io/en/stable/examples.html#inflection-camel-casing-keys

Potential issues/thoughts:

SteveMcGrath commented 2 years ago

@kkirsche so the approach here will be to refactor parts of the library using marshmallow to handle the data typing and transformation. This will effectively replace a lot of the old constructor code. It'll be happening organically, and the first part of that was to replace the old, inflexible connection class with something that's better tested. #457 Details this initial work. We didn't want to have to touch everything multiple times, but this enables a pattern like so (below). It's a work in progress, just keeping you posted.

Example Tenable.io Access Groups v2 API Schema

import re
from marshmallow import (
    Schema,
    fields,
    pre_load,
    validate as v,
    validates_schema,
)
from tenable.base.schema.fields import LowerCase, UpperCase
from tenable.io.base.schemas.filters.base import BaseFilterSchema
from tenable.io.base.session import TenableIO

class RuleSchema(BaseFilterSchema):
    '''
    Marshmallow schema for processing the Access List rule definitions.
    '''
    type = fields.Str(required=True)
    operator = fields.Str(required=True)
    terms = fields.List(fields.Str(), required=True)
    _filters = None

    @classmethod
    def populate_filters(cls, tio: TenableIO):
        '''
        Pre-populates the asset filter definitions into the RuleSchema class.
        '''
        super().populate_filters(tio, 'filters/workbenches/assets')

    @validates_schema
    def filter_validation(self, data, **kwargs):  # noqa: PLW0613
        '''
        Handles validation of the filter provided against the asset filter
        definitions.
        '''
        self.validate_filter(data.get('type'),
                             data.get('operator'),
                             data.get('terms'))

    @pre_load(pass_many=False)
    def tuple_expansion(self, data, **kwargs):  # noqa: PLW0613
        '''
        Handles expanding a tuple definition into the dictionary equivalent.
        '''
        if isinstance(data, tuple):
            return {
                'type': data[0],
                'operator': data[1],
                'terms': data[2]
            }
        return data

class PrincipalSchema(Schema):
    principal_name = fields.Str()
    principal_id = fields.UUID()
    type = LowerCase(
        fields.Str(validate=v.OneOf(['all_users', 'user', 'group'])))
    permissions = fields.List(UpperCase(
        fields.Str(validate=v.OneOf(['CAN_VIEW', 'CAN_SCAN']))))

    @pre_load(pass_many=False)
    def pre_process(self, data, **kwargs):
        if isinstance(data, tuple):
            # unpack the tuple into the appropriate fields.  Supporting format
            #
            # ('TYPE', 'NAME', ['PERMS'])
            #
            # or
            #
            # ('TYPE', 'UUID', ['PERMS'])
            newdata = {
                'type': data[0],
                'permissions': data[2]
            }
            if re.match(r'[0-9a-f]{8}(?:-[0-9a-f]{4}){3}-[0-9a-f]{12}',
                        data[1],
                        re.I):
                newdata['principal_id'] = data[1]
            else:
                newdata['principal_name'] = data[1]
            return newdata
        return data

class AccessGroupsSchema(Schema):  # noqa: PLR0903
    '''
    '''
    name = fields.Str(required=True)
    access_group_type = UpperCase(fields.Str(validate=v.OneOf(['MANAGE_ASSETS',
                                                               'SCAN_TARGETS',
                                                               'ALL'
                                                               ])))
    all_users = fields.Bool()
    principals = fields.List(fields.Nested(PrincipalSchema))
    rules = fields.List(fields.Nested(RuleSchema))
    container_uuid = fields.UUID(load_only=True)
    created_by_uuid = fields.UUID(load_only=True)
    updated_by_uuid = fields.UUID(load_only=True)
    created_by_name = fields.Str(load_only=True)
    updated_by_name = fields.Str(load_only=True)
    created_at = fields.DateTime(load_only=True)
    updated_at = fields.DateTime(load_only=True)
    id = fields.UUID(load_only=True)
    status = fields.Str(load_only=True)
    all_assets = fields.Bool(load_only=True)
    processing_percent_complete = fields.Int(load_only=True)