afaisman / Test

0 stars 0 forks source link

issue 8 #8

Open afaisman opened 2 months ago

afaisman commented 2 months ago

{ "channelName": "AWM-SalesAssist-89163", "channelType": "MAIN", "channelId": "id", "channelVersion": "1.0.1", "channelUrl": "/application/s2t/esp/rt/channelConfig/us-east-1/1.0.1/AWM-SalesAssist-89163", "subscriberConfig": { "id": "Standard 1.0", "subscriberTelephoneNumbersSet": [], "subscriberTelephoneNumberURLsSet": [] }, "requestConfig": { "id": "Standard 1.0", "persistAudioToS3": true, "persistTranscriptionsToS3": true, "persistToS3BucketName": "app-id-104120-dep-id-104121-uu-id-og7nwvnq9ykf", "enableTranscribe": true, "enablePartialResults": false, "persistToDb": false, "persistToWebsocket": false, "publishToConsole": true, "publishToClients": true, "publishToClientsServerType": "GKS", "publishToClientsServerName": "NAU1720", "publishToClientsTopic": "s2tcdadev1p1realtime_sales_assist_1-na1720",

    "publishTelemetryToS3": true,
    "publishTelemetryToS3BucketARN": "104120-110663-p5g0q6-ptss7bzu8eico8n4sq1fpnp9e6fucuse1b-s3alias",

    "enableThreadModel2": true,
    "enableTranscribeProxy": false,
    "awsCurrentRegion": "US_EAST_1",
    "startSelectorType": "NOW"
},
"realTimeTranscriptionConfig": {
    "id": "Standard 1.0",
    "contentIdentificationType": "",
    "contentRedactionType": "",
    "enableChannelIdentification": false,
    "enablePartialResultsStabilization": false,
    "identifyLanguage": false,
    "languageOptions": "",
    "languageCode": "EN_US",
    "languageModelName": "",
    "partialResultsStability": "",
    "piiEntityTypes": "",
    "preferredLanguage": "",
    "mediaSamplingRate": 8000,
    "mediaEncoding": "PCM",
    "numberOfChannels": 2,
    "showSpeakerLabel": false,
    "vocabularyName": "s2t-uat-rttr-cv-awm-salesassist-89163-v4-na-us-east-1-1",
    "vocabularyFilterName": ""
}

}

afaisman commented 2 months ago

import unittest from typing import Any

def set_value(json_obj: dict, path: str, value: Any) -> None: keys = path.split("/") current = json_obj for key in keys[:-1]: if key.isdigit(): key = int(key) if isinstance(current, list) and isinstance(key, int): while len(current) <= key: current.append({}) current = current[key] elif isinstance(current, dict): if key not in current: current[key] = [] if keys[keys.index(key) + 1].isdigit() else {} current = current[key] else: raise ValueError(f"Invalid path: {path}")

last_key = keys[-1]
if last_key.isdigit():
    last_key = int(last_key)
if isinstance(current, list) and isinstance(last_key, int):
    while len(current) <= last_key:
        current.append(None)
    current[last_key] = value
elif isinstance(current, dict):
    current[last_key] = value
else:
    raise ValueError(f"Invalid path: {path}")

def remove_value(json_obj: dict, path: str) -> None: keys = path.split("/") current = json_obj for key in keys[:-1]: if key.isdigit(): key = int(key) if isinstance(current, list) and isinstance(key, int): if key < len(current): current = current[key] else: return elif isinstance(current, dict): if key in current: current = current[key] else: return else: return

last_key = keys[-1]
if last_key.isdigit():
    last_key = int(last_key)
if isinstance(current, list) and isinstance(last_key, int):
    if last_key < len(current):
        current.pop(last_key)
elif isinstance(current, dict) and last_key in current:
    current.pop(last_key)

Unit tests

class TestJSONPathAPI(unittest.TestCase): def test_set_value(self): json_obj = {} set_value(json_obj, "key1/key2/key3", "value") self.assertEqual(json_obj, {"key1": {"key2": {"key3": "value"}}})

def test_set_value_with_list(self):
    json_obj = {}
    set_value(json_obj, "key1/0/key3", "value")
    self.assertEqual(json_obj, {"key1": [{"key3": "value"}]})

def test_set_value_with_existing_path(self):
    json_obj = {"key1": {"key2": {"key3": "old_value"}}}
    set_value(json_obj, "key1/key2/key3", "new_value")
    self.assertEqual(json_obj, {"key1": {"key2": {"key3": "new_value"}}})

def test_remove_value(self):
    json_obj = {"key1": {"key2": {"key3": "value"}}}
    remove_value(json_obj, "key1/key2/key3")
    self.assertEqual(json_obj, {"key1": {"key2": {}}})

def test_remove_value_with_list(self):
    json_obj = {"key1": [{"key3": "value"}]}
    remove_value(json_obj, "key1/0/key3")
    self.assertEqual(json_obj, {"key1": [{}]})

def test_remove_value_nonexistent_path(self):
    json_obj = {"key1": {"key2": {"key3": "value"}}}
    remove_value(json_obj, "key1/key2/key4")
    self.assertEqual(json_obj, {"key1": {"key2": {"key3": "value"}}})

def test_set_and_remove_nested(self):
    json_obj = {}
    set_value(json_obj, "key1/key2/key3/key4", "value")
    self.assertEqual(json_obj, {"key1": {"key2": {"key3": {"key4": "value"}}}})
    remove_value(json_obj, "key1/key2/key3/key4")
    self.assertEqual(json_obj, {"key1": {"key2": {"key3": {}}}})

def test_set_with_list_index(self):
    json_obj = {"key1": ["old_value"]}
    set_value(json_obj, "key1/0", "new_value")
    self.assertEqual(json_obj, {"key1": ["new_value"]})

def test_remove_list_index(self):
    json_obj = {"key1": ["value"]}
    remove_value(json_obj, "key1/0")
    self.assertEqual(json_obj, {"key1": []})

if name == 'main': unittest.main(argv=[''], exit=False)

afaisman commented 2 months ago

import datetime import logging from typing import Optional

import pytz import yaml

from eks_hk.nsmanager.application.models.deployments.nsData import NSData

from eks_hk.nsmanager.application.models.operations.datum import Datum from eks_hk.nsmanager.application.models.operations.operation import Operation from eks_hk.nsmanager.application.services.platform.kubernetes_service import KubernetesService from eks_hk.nsmanager.application.utils.string_utils import StringUtils

add class level description

class NSDeployment: LOGGER = logging.getLogger(name)

# !!!!!!!! search for self._nsdata = NSData(name, type, id, version, url, env, sealId, operations, operation_data)
# in https://bitbucketdc-cluster08.jpmchase.net/projects/SPEECH2TEXT/repos/aws-transcribe-eks-housekeeper/pull-requests/1/overview?commentId=7284725
def __init__(self, name, type, id, version, url, env, sealId, operations, operation_data, skip_kubernetes_scale=False, skip_parameterstore_write=False, active=True):
    self.name = name  # which is rather eks_deployment_name, for now we're not touching serialized fileds at Deployment level
    self.type = type
    self.id = id
    self.version = version
    self.url = url
    self.env = env
    self.sealId = sealId
    self.operations = operations
    # self.operation_data = operation_data
    self.skip_kubernetes_scale = skip_kubernetes_scale
    self.skip_parameterstore_write = skip_parameterstore_write
    self.active = active
    self.source = None
    self.parameter_store_key = None
    self._transaction_id = None
    self._transaction_id_prefix = "default"
    self._nsdata = NSData(name, type, id, version, url, env, sealId, operations, operation_data)
    self.time_created = datetime.datetime.now(pytz.utc).astimezone(pytz.timezone("America/New_York"))

def initialize(self, string_utils_: StringUtils, kubernetes_service_: KubernetesService, tick_engine_):
    try:
        self._string_utils = string_utils_
        self._kubernetes_service = kubernetes_service_
        self._tick_engine = tick_engine_
    except Exception as e:
        NSDeployment.LOGGER.error(f"Exception: {e}")
        return None

def __str__(self):
    return f"{self.id}\n{self.parameter_store_key}\n {self.to_yaml(False, False)}"

def apply_operations(self, housekeeper_another_instance_):
    try:
        self.operations = self._string_utils.apply_list_by_id(self.operations, housekeeper_another_instance_.operations)
        # self.operation_data = self._string_utils.apply_list_by_id(self.operation_data, housekeeper_another_instance_.operation_data)
        # self._nsdata.operation_data = self._string_utils.apply_list_by_id(self._nsdata.operation_data, housekeeper_another_instance_.operation_data)
    except Exception as e:
        NSDeployment.LOGGER.error(f"Exception: {e}")
        return None

# Converts the deployment object to a dictionary.
def to_dict(self):
    try:
        operations_dicts = []
        for operation in self.operations:
            operations_dicts.append(operation.to_dict())

        operation_data_dicts = []
        for data in self._nsdata.operation_data:
            operation_data_dicts.append(data.to_dict())

        # self.skip_kubernetes_scale = skip_kubernetes_scale
        # self.skip_parameterstore_write = skip_parameterstore_write
        # self.active = active
        return {"name": self.name, "type": self.type, "id": self.id, "version": self.version, "url": self.url, "env": self.env, "sealId": self.sealId, "skip_kubernetes_scale": self.skip_kubernetes_scale, "skip_parameterstore_write": self.skip_parameterstore_write, "active": self.active, "operations": operations_dicts, "operation_data": operation_data_dicts}
    except Exception as e:
        NSDeployment.LOGGER.error(f"Exception: {e}")
        return None

def to_yaml(self, skip_operations_: bool, skip_data_: bool) -> str:
    try:
        dict_dump = self.to_dict()
        if skip_operations_:
            dict_dump.pop("operations")
        if skip_data_:
            dict_dump.pop("operation_data")
        return yaml.dump(dict_dump, sort_keys=False, default_flow_style=False)
    except Exception as e:
        NSDeployment.LOGGER.error(f"Exception: {e}")
        return ""

# Converts the deployment object to a string representation.
def to_str(self, skip_operations_: bool, skip_data_: bool) -> str:
    try:
        return self.to_yaml(skip_operations_, skip_data_)
    except Exception as e:
        NSDeployment.LOGGER.error(f"Exception: {e}")
        return ""

# Retrieves a specific datum from the deployment.
def get_datum(self, data_id_: str) -> Optional[Datum]:
    try:
        return self._nsdata.get_datum(data_id_=data_id_)
    except Exception as e:
        NSDeployment.LOGGER.error(f"Exception: {e}")
        return None

def remove_datum(self, data_id_: str) -> bool:
    try:
        return self._nsdata.remove_datum(data_id_=data_id_)
    except Exception as e:
        NSDeployment.LOGGER.error(f"Exception: {e}")
        return False

# Sets a specific datum for the deployment.
# returns the datume or None if the datum is not found
def set_datum(self, data_id_: str, value=None) -> Optional[Datum]:
    try:
        return self._nsdata.set_datum(data_id_=data_id_, value=value)
    except Exception as e:
        NSDeployment.LOGGER.error(f"Exception: {e}")
        return None

def get_operation(self, id_: str):
    try:
        return next((item for item in self.operations if item.id == id_), None)
    except Exception as e:
        NSDeployment.LOGGER.error(f"Exception: {e}")
        return None

# Sets or updates an operation in the deployment.
def set_operation(self, id_: str, type=None, execution_time=None, target_percentage=None) -> Optional[Operation]:
    try:
        operation = self.get_operation(id_)
        if operation:
            if type is not None:
                operation.type = type
            if execution_time is not None:
                operation.execution_time = execution_time
            if target_percentage is not None:
                operation.target_percentage = target_percentage
            return operation
        else:
            new_operation = Operation(id=id_, type=type or "", execution_time=execution_time or 0, target_percentage=target_percentage or 0)
            self.operations.append(new_operation)

        return new_operation
    except Exception as e:
        NSDeployment.LOGGER.error(f"Exception: {e}")
        return None

def set_transaction_id_prefix(self, transaction_id_prefix_: str) -> str:
    try:
        self._transaction_id_prefix = transaction_id_prefix_
        return self._transaction_id_prefix
    except Exception as e:
        NSDeployment.LOGGER.error(f"Exception: {e}")
        return ''

def normalize_transaction_id(self, tid_str_: str) -> str:
    try:
        ret = tid_str_
        for i in range(9):
            ret = ret.replace(f"p1{i}_p1{i}", f"p1{i}")
        ret = ret.replace(f"realtime", f"rt")
        ret = ret.replace(f"transcribe-engine", f"tr-en")
        ret = ret.replace(f"synthetic", f"syn")

        return ret
    except Exception as e:
        NSDeployment.LOGGER.error(f"Exception: {e}")
        return ''

def create_transaction_id(self, transaction_id_prefix_: str) -> str:
    try:
        region = self._kubernetes_service.get_region()
        cluster = self._kubernetes_service.get_cluster_name()
        namespace = self._kubernetes_service.get_namespace()
        host_name = self._kubernetes_service.get_hostname()
        self._transaction_id_prefix = transaction_id_prefix_
        # self._transaction_id = f"{transaction_id_prefix}_{region}_{cluster}_{namespace}_{host_name}_{self.name}_{self.housekeeper_controller._tick_counter}"
        self._transaction_id = self.normalize_transaction_id(f"{transaction_id_prefix_}{region}_{cluster}_{namespace}_{self.name}")
        return self._transaction_id
    except Exception as e:
        NSDeployment.LOGGER.error(f"Exception: {e}")
        return ''

def create_default_transaction_id(self) -> str:
    try:
        region = self._kubernetes_service.get_region()
        cluster = self._kubernetes_service.get_cluster_name()
        namespace = self._kubernetes_service.get_namespace()
        # host_name = self._kubernetes_service.get_hostname()
        # self._transaction_id = f"{self.transaction_id_prefix}_{region}_{cluster}_{namespace}_{host_name}_{self.name}_{self.housekeeper_controller._tick_counter}"
        self._transaction_id = self.normalize_transaction_id(f"{self.get_transaction_id_prefix()}{region}_{cluster}_{namespace}_{self.name}")
        return self._transaction_id
    except Exception as e:
        NSDeployment.LOGGER.error(f"Exception: {e}")
        return ''

def get_transaction_id(self) -> str:
    if not self._transaction_id:
        self.create_default_transaction_id()

    return self._transaction_id

# Note: execute() returns tuple of booleans (Succeded, Executed, Data were updated)
# Main method to update the state of the deployment based on current time.
def tick(self, current_time_) -> (bool, bool):
    try:
        return self._tick_engine.do_schedule(self, current_time_)
    except Exception as e:
        NSDeployment.LOGGER.error(f"Exception: {e}")
        return False, False

def get_transaction_id_prefix(self) -> str:
    return self._transaction_id_prefix

def get_operation_data(self) -> NSData:
    return self._nsdata
afaisman commented 2 months ago

import datetime import logging from typing import Optional import pytz import yaml from eks_hk.nsmanager.application.models.operations.datum import Datum from eks_hk.nsmanager.application.utils.string_utils import StringUtils from eks_hk.nsmanager.application.utils.time_utils import TimeUtils

class NSData: LOGGER = logging.getLogger(name)

def __init__(self, name, type, id, version, url, env, sealId, operations, operation_data, 
             time_created = datetime.datetime.now(pytz.utc).astimezone(pytz.timezone("America/New_York"))):
    self.name = name
    self.type = type
    self.id = id
    self.version = version
    self.url = url
    self.env = env
    self.sealId = sealId
    self.operations = operations
    self.operation_data = operation_data
    self.source = None
    self.parameter_store_key = None
    self._transaction_id = None
    self._transaction_id_prefix = "default"
    self.time_created = time_created

def initialize(self, _string_utils: StringUtils):
    self._string_utils = _string_utils

def __str__(self):
    try:
        return f"{self.id}\n{self.parameter_store_key}\n {self.to_yaml(False, False)}"
    except Exception as e:
        NSData.LOGGER.error(f"Exception: {e}")
        return 'count not serialize NSData'

# Converts the deployment object to a dictionary.
def to_dict(self):
    try:
        operations_dicts = []
        for operation in self.operations:
            operations_dicts.append(operation.to_dict())

        operation_data_dicts = []
        for data in self.operation_data:
            operation_data_dicts.append(data.to_dict())

        return {"name": self.name, "type": self.type, "id": self.id, "version": self.version, "url": self.url, "env": self.env, "sealId": self.sealId, "operations": operations_dicts, "operation_data": operation_data_dicts}
    except Exception as e:
        NSData.LOGGER.error(f"Exception: {e}")
        return None

def to_yaml(self, skip_operations_: bool, skip_data_: bool) -> str:
    try:
        dict_dump = self.to_dict()
        if skip_operations_:
            dict_dump.pop("operations")
        if skip_data_:
            dict_dump.pop("operation_data")
        return yaml.dump(dict_dump, sort_keys=False, default_flow_style=False)
    except Exception as e:
        NSData.LOGGER.error(f"Exception: {e}")
        return ""

# Converts the deployment object to a string representation.
def to_str(self, skip_operations_: bool, skip_data_: bool) -> str:
    try:
        return self.to_yaml(skip_operations_, skip_data_)
    except Exception as e:
        NSData.LOGGER.error(f"Exception: {e}")
        return ""

# Retrieves a specific datum from the deployment.
def get_datum(self, data_id_: str) -> Optional[Datum]:
    if self.operation_data is None:
        return None
    return next((item for item in self.operation_data if item.data_id == data_id_), None)

def remove_datum(self, data_id_: str) -> bool:
    if self.operation_data is None:
        return False

    item_to_remove = next((item for item in self.operation_data if item.data_id == data_id_), None)
    if item_to_remove is None:
        return False

    self.operation_data.remove(item_to_remove)
    return True

# Sets a specific datum for the deployment.
def set_datum(self, data_id_: str, value=None) -> Optional[Datum]:
    try:
        datum = self.get_datum(data_id_)
        if datum:
            if value is not None:
                datum.set_value(value)

            return datum
        else:
            new_datum = Datum(data_id=data_id_, value=value or 0, recorded_time=TimeUtils.get_est_now())
            self.operation_data.append(new_datum)

        return new_datum
    except Exception as e:
        NSData.LOGGER.error(f"Exception: {e}")
        return None
afaisman commented 2 months ago

import logging

from eks_hk.nsmanager.application.models.operations.operation import Operation from eks_hk.nsmanager.application.utils.log_event import log_operation

Manages the scaling of Kubernetes pods within a deployment.

class ScaleOperation(Operation): LOGGER = logging.getLogger(name)

# id: Identifier for the scaling operation.
# execution_time: Segment of time (execution_time-execution_time + span)
# for the operation to take place.
# target_percentage: The desired target percentage to scale the pods to.
# watermark_id: The ID associated with the watermark for this operation.
def __init__(self, id, type, execution_time, target_percentage, watermark_id):
    super().__init__(id, type, execution_time, target_percentage)
    self.target_percentage = target_percentage
    self.watermark_id = watermark_id

def scale_to_replicas_num(self, housekeeper_deployment_):
    try:
        watermarkdatum = housekeeper_deployment_.get_datum(data_id_=self.watermark_id)
        if watermarkdatum is None:
            ScaleOperation.LOGGER.info(f"Could not find waterparkdatum for {housekeeper_deployment_.name}")
            return -1
        ratio = self.target_percentage / 100
        replicas = watermarkdatum.get_value() * ratio
        return int(replicas)
    except Exception as e:
        ScaleOperation.LOGGER.error(f"Exception: {e}")
        return -1

# Executes the scale operation.
# If the current conditions deem the operation relevant, it will attempt to scale
# the deployment's pods to the target percentage.
# current_time_: The current time to check against the operation's schedule.
# housekeeper_deployment: The houskeeper_deployment object that is the target of the scaling.
# Returns tuple of booleans (Succeded, Executed, Data were updated)
def execute(self, current_time_, housekeeper_deployment_) -> (bool, bool, bool):
    try:
        if self.is_relevant(current_time_):
            data_updated = False
            # clean up datum always
            if housekeeper_deployment_.get_datum(data_id="watcher"):
                housekeeper_deployment_.remove_datum(data_id="watcher")
                data_updated = True

            res = False
            current_num_of_pods = -1
            replicas = self.scale_to_replicas_num(housekeeper_deployment_)
            if replicas == -1:
                ScaleOperation.LOGGER.info(f"Could not calculate the number of replicas to scale to, nothing to scale")
                return False, False, False

            if not housekeeper_deployment_.skip_kubernetes_scale:
                res, current_num_of_pods = self.scale_deployment(housekeeper_deployment_, replicas)
            else:
                ScaleOperation.LOGGER.info(f"Deployment {housekeeper_deployment_.name} not scaling to {replicas} since skip_kubernetes_scale==True")

            if res:
                last_target_scale_id = "last_target_scale"
                last_target_scale_datum = housekeeper_deployment_.get_datum(data_id_=last_target_scale_id)
                if last_target_scale_datum is None:
                    last_target_scale_datum = housekeeper_deployment_.set_datum(data_id_=last_target_scale_id)
                last_target_scale_datum.set_value(replicas)

                self.report_execute(housekeeper_deployment_=housekeeper_deployment_, current_num_of_pods_=current_num_of_pods, target_num_of_pods=replicas, message_=f"Scale: scaling deployment {housekeeper_deployment_.name} to {replicas} pods.")
                data_updated = True
            else:
                self.report_execute(housekeeper_deployment_=housekeeper_deployment_, current_num_of_pods_=current_num_of_pods, target_num_of_pods=replicas, message_=f"Scale: not scaling (not need to scale?) deployment {housekeeper_deployment_.name} to {replicas} pods.")

            return True, True, data_updated
        else:
            return True, False, False
    except Exception as e:
        ScaleOperation.LOGGER.error(f"Exception: {e}")
        return False, False, False

def to_dict(self):
    try:
        return {"id": self.id, "type": self.type, "execution_time": self.execution_time, "target_percentage": self.target_percentage, "watermark_id": self.watermark_id}
    except Exception as e:
        ScaleOperation.LOGGER.error(f"Exception: {e}")
        return None

def report_execute(self, housekeeper_deployment_, current_num_of_pods_, target_num_of_pods, message_="", eventRetCode_=0):
    transaction_id = housekeeper_deployment_.get_transaction_id()

    log_operation(transaction_id, operation_=self.type, eventRetCode_=eventRetCode_, deployment_=housekeeper_deployment_.name, target_percentage_=self.target_percentage, current_num_of_pods_=current_num_of_pods_, target_num_of_pods_=current_num_of_pods_, execution_time_=self.execution_time, message_=message_)
afaisman commented 2 months ago

import logging

from eks_hk.nsmanager.application.models.operations.scaleOperation import ScaleOperation

immediate scaling. So far, this class is used only for debugging

class ImmediateScaleOperation(ScaleOperation): LOGGER = logging.getLogger(name)

def __init__(self, id, type, scale_to):
    super().__init__(id, type, None, None, None)
    self.scale_to = scale_to

def is_relevant(self, current_time_):
    return True

# Returns tuple of booleans (Succeded, Executed, Data were updated)
def execute(self, current_time_, housekeeper_deployment_):
    try:
        if self.is_relevant(current_time_):
            res = False
            current_num_of_pods = -1
            replicas = self.scale_to
            if not housekeeper_deployment_.skip_kubernetes_scale:
                ImmediateScaleOperation.LOGGER.info(f"Before scale_deployment {housekeeper_deployment_.name} to {replicas}")
                res, current_num_of_pods = self.scale_deployment(housekeeper_deployment_, replicas)
            else:
                ImmediateScaleOperation.LOGGER.info(f"Deployment {housekeeper_deployment_.name} not scaling to {replicas} since skip_kubernetes_scale==True")

            if res:
                if current_num_of_pods != replicas:
                    self.report_execute(housekeeper_deployment_=housekeeper_deployment_, current_num_of_pods_=current_num_of_pods, target_num_of_pods=replicas, message_=f"Immediate scale: scaling deployment {housekeeper_deployment_.name} to {self.scale_to} pods.")
                else:
                    self.report_execute(housekeeper_deployment_=housekeeper_deployment_, current_num_of_pods_=current_num_of_pods, target_num_of_pods=replicas, message_=f"Immediate scale: not scaling (not need to scale?) deployment {housekeeper_deployment_.name} to {self.scale_to} pods.")
            else:
                self.report_execute(housekeeper_deployment_=housekeeper_deployment_, current_num_of_pods_=current_num_of_pods, target_num_of_pods=replicas, message_=f"Immediate Scaling error.")

            return True, True, False
        else:
            return True, False, False
    except Exception as e:
        ImmediateScaleOperation.LOGGER.error(f"Exception: {e}")
        return False, False, False

def to_dict(self):
    try:
        return {"id": self.id, "type": self.type, "scale_to": self.scale_to}
    except Exception as e:
        ImmediateScaleOperation.LOGGER.error(f"Exception: {e}")
        return None
afaisman commented 2 months ago

import logging

from eks_hk.nsmanager.application.models.operations.operation import Operation from eks_hk.nsmanager.application.utils.log_event import log_operation from eks_hk.nsmanager.application.utils.time_utils import TimeUtils

class DefineTransactionIdPrefixOperation(Operation): LOGGER = logging.getLogger(name)

def __init__(self, id, type, execution_time, target_percentage, transaction_id_prefix):
    super().__init__(id, type, execution_time, target_percentage)
    self.transaction_id_prefix = transaction_id_prefix

# Returns tuple of booleans (Succeded, Executed, Data were updated)
def execute(self, current_time_, housekeeper_deployment_) -> (bool, bool, bool):
    try:
        if self.is_relevant(current_time_):
            if housekeeper_deployment_.get_transaction_id_prefix() == "default":
                prefix = TimeUtils.convert_prefix(self.transaction_id_prefix)
                housekeeper_deployment_.set_transaction_id_prefix(prefix)
                # should not be reported as the deployment is brand new after every reload of the schedule
                # self.report_execute(housekeeper_deployment_=housekeeper_deployment_, message_=f"DefineTransactionIdPrefixOperation: transactoin prefix for {housekeeper_deployment_.name} set to {self.transaction_id_prefix}")
            return True, True, False
        else:
            return True, False, False
    except Exception as e:
        DefineTransactionIdPrefixOperation.LOGGER.error(DefineTransactionIdPrefixOperation.LOGGER, f"Exception: {e}")
        return False, False, False

def to_dict(self):
    try:
        return {"id": self.id, "type": self.type, "execution_time": self.execution_time, "transaction_id_prefix": self.transaction_id_prefix}
    except Exception as e:
        DefineTransactionIdPrefixOperation.LOGGER.error(DefineTransactionIdPrefixOperation.LOGGER, f"Exception: {e}")
        return None

def report_execute(self, housekeeper_deployment_, message_="", eventRetCode_=0):
    transaction_id = housekeeper_deployment_.get_transaction_id()
    log_operation(transaction_id_=transaction_id, operation_=self.type, eventRetCode_=eventRetCode_, deployment_=housekeeper_deployment_.name, target_percentage_=self.target_percentage, current_num_of_pods_=None, target_num_of_pods_=None, execution_time_=self.execution_time, message_=message_)
afaisman commented 2 months ago

import logging

from eks_hk.nsmanager.application.models.operations.operation import Operation from eks_hk.nsmanager.application.utils.log_event import log_operation from eks_hk.nsmanager.application.utils.time_utils import TimeUtils

This operation updates a watermark datum associated with a deployment to reflect the current number of

pods in that deployment. It should be executed at a time relevant to the operation schedule.

Scales Kubernetes deployment, executes scaling and watermarking operations.

class WatcherOperation(Operation): LOGGER = logging.getLogger(name)

def __init__(self, id, type, execution_time, last_target_scale_id):
    super().__init__(id, type, execution_time, 0)
    self.last_target_scale_id = last_target_scale_id

# Execute the Watcher operation.
# Checks if the operation is relevant at the current time and if so,
# compared the last_target_scale_id datum with the current number of pods.
# current_time_: The current time to check against the operation's schedule.
# Returns tuple of booleans (Succeded, Executed, Data were updated)
def execute(self, current_time_, housekeeper_deployment_) -> (bool, bool, bool):
    try:
        # if housekeeper_deployment_.skip_parameterstore_write:
        #     return True, False, False

        if self.is_relevant(current_time_):
            # If the operation is scheduled for the current time.
            if housekeeper_deployment_.name is None:
                WatcherOperation.LOGGER.info(f"housekeeper_deployment.name is None, nothing to do.")
                return False, False, False
            else:
                current_num_of_pods = self.get_n_pods(housekeeper_deployment_)
                lasttargetscale_datum = housekeeper_deployment_.get_datum(data_id_=self.last_target_scale_id)
                if lasttargetscale_datum:
                    time_diff = TimeUtils.time_diff(current_time_, lasttargetscale_datum.recorded_time)
                    if time_diff > 300:
                        eventRetCode_ = 0
                        if current_num_of_pods != lasttargetscale_datum.get_value():
                            eventRetCode_ = 1
                            # save watcher datum
                            watcherdatum = housekeeper_deployment_.get_datum(data_id_="watcher")
                            if watcherdatum is None:
                                watcherdatum = housekeeper_deployment_.set_datum(data_id_="watcher")
                            watcherdatum.set_value(watcherdatum.get_value() + 1)

                        self.report_execute(housekeeper_deployment_=housekeeper_deployment_, current_num_of_pods_=current_num_of_pods, target_num_of_pods=lasttargetscale_datum.get_value(), eventRetCode_=eventRetCode_, message_=f"watched deployment {housekeeper_deployment_.name}, {current_num_of_pods} pods at the moment, {lasttargetscale_datum.get_value()} target ")

                return True, True, True
        else:
            return False, False, False
    except Exception as e:
        WatcherOperation.LOGGER.error(WatcherOperation.LOGGER, f"Exception: {e}")
        return False, False, False

def to_dict(self):
    try:
        return {"id": self.id, "type": self.type, "execution_time": self.execution_time, "watermark_id": self.last_target_scale_id}
    except Exception as e:
        WatcherOperation.LOGGER.error(WatcherOperation.LOGGER, f"Exception: {e}")
        return None

def report_execute(self, housekeeper_deployment_, current_num_of_pods_, target_num_of_pods, message_="", eventRetCode_=0):
    region = self._kubernetes_service.get_region()
    namespace = self._kubernetes_service.get_namespace()
    transaction_id = housekeeper_deployment_.get_transaction_id()

    log_operation(transaction_id_=transaction_id, operation_=self.type, eventRetCode_=eventRetCode_, deployment_=housekeeper_deployment_.name, target_percentage_=self.target_percentage, current_num_of_pods_=current_num_of_pods_, target_num_of_pods_=current_num_of_pods_, execution_time_=self.execution_time, message_=message_)
    WatcherOperation.LOGGER.info(f"Putting metric data for Namespace: {namespace}, Cluster: {self._kubernetes_service.get_cluster_name()}, Region: {region}, Deployment: {housekeeper_deployment_.deployment_name}")
    try:
        transaction_id = housekeeper_deployment_.get_transaction_id()
        housekeeper_id = self._kubernetes_service.get_housekeeper_id()
        cluster = self._kubernetes_service.get_cluster_name()

        self._aws_service.put_metric_target_vs_curr_pods(deployment_name_=housekeeper_deployment_.deployment_name, transaction_id_=transaction_id, current_num_of_pods=current_num_of_pods_, target_num_of_pods=target_num_of_pods, region_=region, namespace_=namespace, housekeeper_id_=housekeeper_id, cluster_=cluster)
    except Exception as e:
        WatcherOperation.LOGGER.error(WatcherOperation.LOGGER, f"Exception: {e}")
afaisman commented 2 months ago

import logging import os

import boto3 from botocore.exceptions import ClientError from s2t.infra.auto.services.aws.parameterStore.parameterStoreClientProxy import ParameterStoreClientProxy

class AWSService: LOGGER = logging.getLogger(name) LOGGER.setLevel(logging.INFO)

AWS_PARAMETER_STORE_PREFIX_DEFAULT = 'please_define_AWS_PARAMETER_STORE_PREFIX_in_Spinnaker'

AWS_PARAMETER_STORE_PREFIX_DEFAULT = "/application/s2t/esp/houseKeeper"

def __init__(self):
    pass

def get_parameter_store_prefix(self):
    try:
        env_var = os.environ.get("AWS_PARAMETER_STORE_PREFIX")
        if env_var:
            return env_var
        else:
            return AWSService.AWS_PARAMETER_STORE_PREFIX_DEFAULT
    except Exception as e:
        AWSService.LOGGER.error(f"Exception: {e}")
        return None

def save_data_to_parameterstore(self, nsDeployment, region_: str):
    try:
        if nsDeployment is None:
            return

        # str_serialized = self.to_str(skip_operations_=False, skip_data=True)
        parameter_store_key_ = nsDeployment.url
        # self._save_to_ps(parameter_store_key_, str_serialized)

        str_serialized_data = nsDeployment.to_str(skip_operations_=True, skip_data_=False)
        # self._save_to_ps(parameter_store_key_ + "_data", str_serialized_data)
        self.save_to_ps(parameter_store_key_=parameter_store_key_ + "_data", str_serialized_=str_serialized_data, region_=region_)
    except Exception as e:
        AWSService.LOGGER.error(f"Exception: {e}")
        return None

def put_metric_target_vs_curr_pods(self, deployment_name_: str, transaction_id_: str, current_num_of_pods: int, target_num_of_pods: int, region_: str, namespace_: str, cluster_: str, housekeeper_id_: str):
    try:
        cloudwatch = boto3.client("cloudwatch", region_name=region_)
        cloudwatch.put_metric_data(Namespace="Housekeeper", MetricData=[{"MetricName": "PodCountDifference", "Dimensions": [{"Name": "Cluster", "Value": cluster_}, {"Name": "Region", "Value": region_}, {"Name": "Namespace", "Value": namespace_}, {"Name": "Deployment", "Value": deployment_name_}, {"Name": "transaction_id", "Value": transaction_id_}, {"Name": "housekeeper_id", "Value": housekeeper_id_}], "Value": target_num_of_pods - current_num_of_pods, "Unit": "Count"}])  # The difference calculated
    except Exception as e:
        AWSService.LOGGER.error(f"Exception: {e}")
        return None

def save_to_ps(self, parameter_store_key_, str_serialized_, region_: str):
    try:
        ssm_client = boto3.client("ssm", region_name=region_)
        parameter_store_proxy = ParameterStoreClientProxy(client_=ssm_client)
        response = parameter_store_proxy.put_parameter(parameter_store_key_, "", str_serialized_, dry_run_=False)
        AWSService.LOGGER.info(f"Parameter {parameter_store_key_} set.")
        return response
    except ClientError as e:
        AWSService.LOGGER.error(f"Exception: {e}")
        return None
afaisman commented 2 months ago

import logging import os import socket

from kubernetes import config, client

class KubernetesService: LOGGER = logging.getLogger(name)

def __init__(self):
    # self.init_cube_config()
    self._namespace = "default"
    self._apps_v1 = None
    self._cluster_name = None
    self._region_name = None
    self._aws_environment = None
    self._incluster = True

def get_region_name(self):
    return self._region_name

def init_cube_config(self):
    KubernetesService.LOGGER.info(f"init_cube_config")
    KubernetesService.LOGGER.info(f"namespace={self._namespace}")
    self.get_namespace()
    self.get_region()
    self.get_aws_environment()
    try:
        # In-cluster configuration
        config.load_incluster_config()
    except config.config_exception.ConfigException:
        # Outside-cluster configuration
        self._incluster = False
        config.load_kube_config()

    self._apps_v1 = client.AppsV1Api()
    KubernetesService.LOGGER.info(f"_incluster={self._incluster}")
    KubernetesService.LOGGER.info(f"_region_name={self._region_name}")

def get_namespace(self):
    if self._namespace != "default":
        return self._namespace
    try:
        with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f:
            self._namespace = f.read()
    except IOError:
        KubernetesService.LOGGER.info("Fallback to the default namespace if outside the cluster")
        self._namespace = "default"
        return self._namespace

def get_aws_environment(self):
    try:
        if self._aws_environment:
            return self._aws_environment
        self._aws_environment = os.environ.get("AWS_ENVIRONMENT")
        return self._aws_environment
    except Exception as e:
        KubernetesService.LOGGER.error(f"Exception: {e}")
        return "dev"

def get_region(self):
    try:
        if self._region_name:
            return self._region_name
        self._region_name = os.environ.get("AWS_REGION")
        return self._region_name
    except Exception as e:
        KubernetesService.LOGGER.error(f"Exception: {e}")
        return None

def get_cluster_name(self):
    try:
        if self._cluster_name:
            return self._cluster_name
        self._cluster_name = os.environ.get("EKS_CLUSTER_NAME")
        return self._cluster_name
    except Exception as e:
        KubernetesService.LOGGER.error(f"Exception: {e}")
        return None

def get_hostname(self):
    try:
        hostname = "probably_local_"
        try:
            hostname = socket.gethostname()
        except:
            pass

        return hostname
    except Exception as e:
        KubernetesService.LOGGER.error(f"Exception: {e}")
        return None

def get_housekeeper_id(self):
    try:
        region = self.get_region()
        cluster = self.get_cluster_name()
        namespace = self.get_namespace()
        host_name = self.get_hostname()
        transaction_id = f"hkid_{region}_{cluster}_{namespace}_{host_name}"
        return transaction_id
    except Exception as e:
        KubernetesService.LOGGER.error(f"Exception: {e}")
        return None

def get_apps_v1(self):
    return self._apps_v1

def read_namespaced_deployment(self, deployment_name_: str, namespace_: str):
    try:
        ret = self.get_apps_v1().read_namespaced_deployment(deployment_name_, namespace_)
        return ret
    except Exception as e:
        KubernetesService.LOGGER.error(f"Exception: {e}")
        return None

def replace_namespaced_deployment(self, deployment_name_: str, namespace_: str, body_: str):
    try:
        ret = self.get_apps_v1().replace_namespaced_deployment(name=deployment_name_, namespace=namespace_, body=body_)
        return ret
    except Exception as e:
        KubernetesService.LOGGER.error(f"Exception: {e}")
        return None

def list_namespaced_deployment(self, namespace_: str):
    try:
        KubernetesService.LOGGER.info(f"list_namespaced_deployment namespace_={namespace_}")
        KubernetesService.LOGGER.info(f"self.get_apps_v1()={self.get_apps_v1()}")
        return self.get_apps_v1().list_namespaced_deployment(namespace=namespace_)
    except Exception as e:
        KubernetesService.LOGGER.error(f"Exception: {e}")
        return None
afaisman commented 2 months ago

import logging

class EKSRequestHandler: LOGGER = logging.getLogger(name)

def __init__(self):
    self._readiness_liveness = True

def liveliness(self):
    response = None
    try:
        if self._readiness_liveness:
            return "Service is doing well", 200
        return "Service is not doing well", 503
    except Exception as e:
        EKSRequestHandler.LOGGER.error("Exception e:{}".format(e))
    return response

def sat_readiness_liveness(self, readiness_liveness_: bool):
    self._readiness_liveness = readiness_liveness_
afaisma commented 1 month ago

Apps Layer:

No specific classes mentioned fit directly into the Apps layer based on our discussions. This layer generally handles application setup and configuration, which was not the focus of the described classes. Controllers Layer:

No specific classes mentioned fit into the Controllers layer either. This layer typically orchestrates the flow of data between models and views, and none of the classes discussed directly handle these responsibilities. Services Layer:

NSDeploymentManager: This class manages the lifecycle and operations of NSDeployment instances, handling interactions with backend services like AWS and Kubernetes, making it a service layer component. NSDeploymentFactory: Involved in creating NSDeployment instances based on configurations stored in AWS Parameter Store. This factory class encapsulates the logic necessary for creating complex objects and interacts with various services to fetch and process data. Engines Layer:

TickEngine: Manages and executes scheduled operations based on a ticking mechanism. This class is responsible for timing and executing tasks which fit well into the engines layer, where computational and decision-making logic is executed. ImmediateScaleOperation and other operation types like WatermarkOperation, ScaleOperation, WatcherOperation: These classes execute specific operational logic (scaling, monitoring, etc.) and make decisions based on the application state or external inputs, aligning them with the engines layer. Models Layer:

NSDeployment: Represents the structure and state of a network deployment configuration. It's a data model that holds information about deployments and is used across the application to maintain state and configuration data. Operation and its derivatives like ImmediateScaleOperation, WatermarkOperation, etc., could also be considered part of the models layer if viewed as entities that model business processes or operations within the system. However, due to their active role in decision-making and interaction with external systems, placing them in the Engines layer might be more appropriate, as discussed. Utils Layer:

StringUtils, TimeUtils: These classes provide utility functions (such as string manipulation and time calculation) that are used throughout the application to support various functionalities in a reusable manner. They don’t initiate actions but support other classes with common functionalities. By organizing classes in this manner, each part of the application architecture can focus on its primary responsibilities, enhancing maintainability, scalability, and clarity in the system design.