Open afaisman opened 4 months ago
import logging
from eks_hk.nsmanager.application.models.deployments.nsDeployment import NSDeployment
class TickEngine: """ TickEngine Class:
Purpose:
Manages and executes scheduled operations based on a ticking mechanism.
This engine evaluates and executes operations at specified intervals,
ensuring that actions are triggered at the correct times.
Main API:
- __init__(): Initializes the engine, setting it to active.
- do_schedule(nsDeployment_, current_time_): Schedules and executes operations
based on the current time. Returns a tuple indicating success and if a data update occurred.
"""
LOGGER = logging.getLogger(__name__)
def __init__(self):
self.ticker_active = True
pass
# Note: do_schedule() returns tuple of booleans (Succeded, Executed, Data were updated)
def do_schedule(self, nsdeployment_: NSDeployment, current_time_):
"""
Schedules and executes operations from the nsDeployment_ object at the given current_time_.
Parameters:
- nsDeployment_ : An object containing operations that may need to be executed.
- current_time_ : The current time to evaluate which operations should be executed.
Returns:
- Tuple (success, update_required) where:
- success (bool)
- update_required (bool): Indicates if any data was updated during the operations,
which may require serializing data to AWS.
"""
try:
# Returns: (success, data_updated). data_updated is used by housekeepercontroller to decide if serializing tp the paramter store is needed
update_required = False
for operation in nsdeployment_.operations:
succeeded, executed, data_updated = operation.execute(current_time_, nsdeployment_)
if data_updated is True:
update_required = True
return True, update_required
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return False, False
import datetime import logging from typing import Optional
import pytz import yaml
from eks_hk.nsmanager.application.models.operations.datum import Datum from eks_hk.nsmanager.application.utils.string_utils import StringUtils from eks_hk.nsmanager.application.utils.time_utils import TimeUtils
class NSData: """ Manages the data related to a specific network deployment. Provides mechanisms to serialize deployment data. The purpose of the class is to encapsulate and manage ata related to a specific deployment.
Main API:
set_datum(self, data_id, value): Sets or updates the value of a specific datum. """ LOGGER = logging.getLogger(name)
def init(self, name, type, id, version, url, env, sealId, operations, operation_data, time_created=datetime.datetime.now(pytz.utc).astimezone(pytz.timezone("America/New_York"))): self.name = name self.type = type self.id = id self.version = version self.url = url self.env = env self.sealId = sealId self.operations = operations self.operation_data = operation_data self.source = None self.parameter_store_key = None self._transaction_id = None self._transaction_id_prefix = "default" self.time_created = time_created
def initialize(self, _string_utils: StringUtils): self._string_utils = _string_utils
def str(self): try: return f"{self.id}\n{self.parameter_store_key}\n {self.to_yaml(False, False)}" except Exception as e: NSData.LOGGER.error(f"Exception: {e}") return 'count not serialize NSData'
def to_dict(self): try: operations_dicts = [] for operation in self.operations: operations_dicts.append(operation.to_dict())
operation_data_dicts = []
for data in self.operation_data:
operation_data_dicts.append(data.to_dict())
return {"name": self.name, "type": self.type, "id": self.id, "version": self.version, "url": self.url,
"env": self.env, "sealId": self.sealId, "operations": operations_dicts,
"operation_data": operation_data_dicts}
except Exception as e:
NSData.LOGGER.error(f"Exception: {e}")
return None
def to_yaml(self, skipoperations: bool, skipdata: bool) -> str: try: dict_dump = self.to_dict() if skipoperations is True: dict_dump.pop("operations") if skipdata is True: dict_dump.pop("operation_data") return yaml.dump(dict_dump, sort_keys=False, default_flow_style=False) except Exception as e: NSData.LOGGER.error(f"Exception: {e}") return ""
def to_str(self, skipoperations: bool, skipdata: bool) -> str: try: return self.to_yaml(skipoperations, skipdata) except Exception as e: NSData.LOGGER.error(f"Exception: {e}") return ""
def get_datum(self, dataid: str) -> Optional[Datum]: if self.operation_data is None: return None return next((item for item in self.operation_data if item.data_id == dataid), None)
def remove_datum(self, dataid: str) -> bool: if self.operation_data is None: return False
item_to_remove = next((item for item in self.operation_data if item.data_id == data_id_), None)
if item_to_remove is None:
return False
self.operation_data.remove(item_to_remove)
return True
def set_datum(self, dataid: str, value=None) -> Optional[Datum]: try: datum = self.get_datum(dataid) if datum is not None: if value is not None: datum.set_value(value)
return datum
else:
new_datum = Datum(data_id=data_id_, value=value or 0, recorded_time=TimeUtils.get_est_now())
self.operation_data.append(new_datum)
return new_datum
except Exception as e:
NSData.LOGGER.error(f"Exception: {e}")
return None
import datetime import logging from typing import Optional
import pytz import yaml
from eks_hk.nsmanager.application.models.deployments.nsData import NSData from eks_hk.nsmanager.application.models.operations.datum import Datum from eks_hk.nsmanager.application.models.operations.operation import Operation from eks_hk.nsmanager.application.utils.string_utils import StringUtils
class NSDeployment: """ Manages Kubernetes deployment configurations. This class is designed to handle the lifecycle of Kubernetes deployment configurations, mainly watermarking, swcaling down and scaling up.
Main API:
get_operation_data(self): Retrieves the operation data encapsulated within the deployment. """
LOGGER = logging.getLogger(name)
def init(self, name, type, id, version, url, env, sealId, operations, operation_data, skip_kubernetes_scale=False, skip_parameterstore_write=False, active=True, timecreated = datetime.datetime.now(pytz.utc).astimezone(pytz.timezone("America/New_York"))): self.name = name # which is rather eks_deployment_name, for now we're not touching serialized fileds at Deployment level self.type = type self.id = id self.version = version self.url = url self.env = env self.sealId = sealId self.operations = operations
self.skip_kubernetes_scale = skip_kubernetes_scale
self.skip_parameterstore_write = skip_parameterstore_write
self.active = active
self.source = None
self.parameter_store_key = None
self._transaction_id = None
self._transaction_id_prefix = "default"
self._nsdata = NSData(name, type, id, version, url, env, sealId, operations, operation_data)
self.time_created = time_created_
def initialize(self, stringutils: StringUtils, kubernetesservice, tickengine): try: self._string_utils = stringutils self._kubernetes_service = kubernetesservice self._tick_engine = tickengine except Exception as e: NSDeployment.LOGGER.error(f"Exception: {e}") return None
def str(self): return f"{self.id}\n{self.parameter_store_key}\n {self.to_yaml(False, False)}"
def apply_operations(self, housekeeper_anotherinstance): try: self.operations = self._string_utils.apply_list_by_id(self.operations, housekeeper_anotherinstance.operations)
# self._nsdata.operation_data = self._string_utils.apply_list_by_id(self._nsdata.operation_data, housekeeper_another_instance_.operation_data)
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return None
def to_dict(self): try: operations_dicts = [] for operation in self.operations: operations_dicts.append(operation.to_dict())
operation_data_dicts = []
for data in self._nsdata.operation_data:
operation_data_dicts.append(data.to_dict())
# self.skip_kubernetes_scale = skip_kubernetes_scale
# self.skip_parameterstore_write = skip_parameterstore_write
# self.active = active
return {"name": self.name, "type": self.type, "id": self.id, "version": self.version, "url": self.url,
"env": self.env, "sealId": self.sealId, "skip_kubernetes_scale": self.skip_kubernetes_scale,
"skip_parameterstore_write": self.skip_parameterstore_write, "active": self.active,
"operations": operations_dicts, "operation_data": operation_data_dicts}
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return None
def to_yaml(self, skipoperations: bool, skipdata: bool) -> str: try: dict_dump = self.to_dict() if skipoperations is True: dict_dump.pop("operations") if skipdata is True: dict_dump.pop("operation_data") return yaml.dump(dict_dump, sort_keys=False, default_flow_style=False) except Exception as e: NSDeployment.LOGGER.error(f"Exception: {e}") return ""
def to_str(self, skipoperations: bool, skipdata: bool) -> str: try: return self.to_yaml(skipoperations, skipdata) except Exception as e: NSDeployment.LOGGER.error(f"Exception: {e}") return ""
def get_datum(self, dataid: str) -> Optional[Datum]: try: return self._nsdata.get_datum(dataid=dataid) except Exception as e: NSDeployment.LOGGER.error(f"Exception: {e}") return None
def remove_datum(self, dataid: str) -> bool: try: return self._nsdata.remove_datum(dataid=dataid) except Exception as e: NSDeployment.LOGGER.error(f"Exception: {e}") return False
def set_datum(self, dataid: str, value=None) -> Optional[Datum]: try: return self._nsdata.set_datum(dataid=dataid, value=value) except Exception as e: NSDeployment.LOGGER.error(f"Exception: {e}") return None
def getoperation(self, id: str): try: return next((item for item in self.operations if item.id == id_), None) except Exception as e: NSDeployment.LOGGER.error(f"Exception: {e}") return None
def setoperation(self, id: str, type=None, execution_time=None, target_percentage=None) -> Optional[Operation]: try: operation = self.getoperation(id) if operation is not None: if type is not None: operation.type = type if execution_time is not None: operation.execution_time = execution_time if target_percentage is not None: operation.target_percentage = target_percentage return operation else: newoperation = Operation(id=id, type=type or "", execution_time=execution_time or 0, target_percentage=target_percentage or 0) self.operations.append(new_operation)
return new_operation
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return None
def set_transaction_id_prefix(self, transaction_idprefix: str) -> str: try: self._transaction_id_prefix = transaction_idprefix return self._transaction_id_prefix except Exception as e: NSDeployment.LOGGER.error(f"Exception: {e}") return ''
def normalize_transaction_id(self, tidstr: str) -> str: try: ret = tidstr for i in range(9): ret = ret.replace(f"p1{i}_p1{i}", f"p1{i}") ret = ret.replace(f"realtime", f"rt") ret = ret.replace(f"transcribe-engine", f"tr-en") ret = ret.replace(f"synthetic", f"syn")
return ret
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return ''
def create_transaction_id(self, transaction_idprefix: str) -> str: try: region = self._kubernetes_service.get_region() cluster = self._kubernetes_service.get_cluster_name() namespace = self._kubernetes_service.get_namespace() host_name = self._kubernetes_service.get_hostname() self._transaction_id_prefix = transaction_idprefix
self._transaction_id = self.normalize_transaction_id(
f"{transaction_id_prefix_}{region}_{cluster}_{namespace}_{self.name}")
return self._transaction_id
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return ''
def create_default_transaction_id(self) -> str: try: region = self._kubernetes_service.get_region() cluster = self._kubernetes_service.get_cluster_name() namespace = self._kubernetes_service.get_namespace()
# self._transaction_id = f"{self.transaction_id_prefix}_{region}_{cluster}_{namespace}_{host_name}_{self.name}_{self.housekeeper_controller._tick_counter}"
self._transaction_id = self.normalize_transaction_id(
f"{self.get_transaction_id_prefix()}{region}_{cluster}_{namespace}_{self.name}")
return self._transaction_id
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return ''
def get_transaction_id(self) -> str: if not self._transaction_id: self.create_default_transaction_id()
return self._transaction_id
def tick(self, currenttime) -> (bool, bool): try: return self._tick_engine.do_schedule(self, currenttime) except Exception as e: NSDeployment.LOGGER.error(f"Exception: {e}") return False, False
def get_transaction_id_prefix(self) -> str: return self._transaction_id_prefix
def get_operation_data(self) -> NSData: return self._nsdata
import logging
from eks_hk.nsmanager.application.utils.time_utils import TimeUtils
""" The Datum class encapsulates data records with identifiers, values, and timestamps. It supports:
data_id
, value
, and recorded_time
.set_value
.to_dict
and from_dict
.get_value
.
Designed for robust data handling and integrity maintenance with error logging.
"""class Datum: LOGGER = logging.getLogger(name)
def __init__(self, data_id, value, recorded_time):
self.data_id = data_id
self._value = value
self.recorded_time = recorded_time
def to_dict(self):
try:
return {"data_id": self.data_id, "value": self._value, "recorded_time": self.recorded_time}
except Exception as e:
Datum.LOGGER.error(Datum.LOGGER, f"Exception: {e}")
return None
def set_value(self, value):
self._value = value
current_time = TimeUtils.get_est_now() # datetime.datetime.now(ZoneInfo("US/Eastern"))
self.recorded_time = current_time
@classmethod
def from_dict(cls, data_dict):
try:
return cls(**data_dict)
except Exception as e:
Datum.LOGGER.error(f"Exception: {e}")
return None
def get_value(self):
return self._value
import logging
from eks_hk.nsmanager.application.models.operations.scaleOperation import ScaleOperation
""" A subclass of ScaleOperation designed for immediate and unconditional scaling of Kubernetes pods. It is uuseful for debugging or testing scenarios where immediate pod scaling is required.
class ImmediateScaleOperation(ScaleOperation): LOGGER = logging.getLogger(name)
def __init__(self, id, type, scale_to):
super().__init__(id, type, None, None, None)
self.scale_to = scale_to
def is_relevant(self, current_time_):
return True
# Returns tuple of booleans (Succeded, Executed, Data were updated)
def execute(self, current_time_, housekeeper_deployment_):
try:
if self.is_relevant(current_time_):
res = False
current_num_of_pods = -1
replicas = self.scale_to
if not housekeeper_deployment_.skip_kubernetes_scale:
ImmediateScaleOperation.LOGGER.info(
f"Before scale_deployment {housekeeper_deployment_.name} to {replicas}")
res, current_num_of_pods = self.scale_deployment(housekeeper_deployment_, replicas)
else:
ImmediateScaleOperation.LOGGER.info(
f"Deployment {housekeeper_deployment_.name} not scaling to {replicas} since skip_kubernetes_scale==True")
if res:
if current_num_of_pods != replicas:
self.report_execute(housekeeper_deployment_=housekeeper_deployment_,
current_num_of_pods_=current_num_of_pods, target_num_of_pods=replicas,
message_=f"Immediate scale: scaling deployment {housekeeper_deployment_.name} to {self.scale_to} pods.")
else:
self.report_execute(housekeeper_deployment_=housekeeper_deployment_,
current_num_of_pods_=current_num_of_pods, target_num_of_pods=replicas,
message_=f"Immediate scale: not scaling (not need to scale?) deployment {housekeeper_deployment_.name} to {self.scale_to} pods.")
else:
self.report_execute(housekeeper_deployment_=housekeeper_deployment_,
current_num_of_pods_=current_num_of_pods, target_num_of_pods=replicas,
message_=f"Immediate Scaling error.")
return True, True, False
else:
return True, False, False
except Exception as e:
ImmediateScaleOperation.LOGGER.error(f"Exception: {e}")
return False, False, False
def to_dict(self):
try:
return {"id": self.id, "type": self.type, "scale_to": self.scale_to}
except Exception as e:
ImmediateScaleOperation.LOGGER.error(f"Exception: {e}")
return None
import logging
import yaml from kubernetes import client
from eks_hk.nsmanager.application.services.platform.aws_service import AWSService from eks_hk.nsmanager.application.services.platform.kubernetes_service import KubernetesService from eks_hk.nsmanager.application.utils.string_utils import StringUtils from eks_hk.nsmanager.application.utils.time_utils import TimeUtils, reset_to_full_hour, \ reset_to_previous_middle_of_hour, reset_to_half_hour, extract_minute_from_string, extract_minute_from_string_w_every
""" The Operation class manages operations with specific timing and target specifications. It supports:
to_dict
and from_dict
.execute
and is_relevant
.get_n_pods
and scale_deployment
.
Designed for structured execution of timed operations within a Kubernetes and AWS environment, with extensive logging and error handling.
"""class Operation: LOGGER = logging.getLogger(name)
def __init__(self, id, type, execution_time, target_percentage, time_span=None):
self.id = id
self.type = type
self.execution_time = execution_time
self.target_percentage = target_percentage
self.time_span = 180
def initialize(self, ssm_client_, string_utils_: StringUtils, aws_service_: AWSService,
kubernetes_service_: KubernetesService):
self._ssm_client = ssm_client_
self._string_utils = string_utils_
self._aws_service = aws_service_
self._kubernetes_service = kubernetes_service_
def to_dict(self):
try:
return {
"id": self.id,
"type": self.type,
"execution_time": self.execution_time,
}
except Exception as e:
Operation.LOGGER.error(f"Exception: {e}")
return None
@classmethod
def from_dict(cls, data_dict):
try:
return cls(**data_dict)
except Exception as e:
Operation.LOGGER.error(f"Exception: {e}")
return None
# Base method. If relevant (means current_time_ overlaps with the lifespan), them log.
# Returns tuple of booleans (Succeded, Executed, Data were updated)
def execute(self, current_time_, housekeeper_deployment_) -> (bool, bool, bool):
try:
if self.is_relevant(current_time_):
return True, True, False
else:
return True, False, False
except Exception as e:
Operation.LOGGER.error(f"Exception: {e}")
return False, False, False
# Returns True if the given operaiton is relevant == current_time_ belongs to the operation's live span
def is_relevant(self, current_time_) -> bool:
try:
if self.execution_time.lower().startswith("now"):
return True
if self.execution_time.lower() == "Hour".lower():
adjusted_execution_time = str(reset_to_full_hour(current_time_))
elif self.execution_time.lower() == "MiddleOfHour".lower():
adjusted_execution_time = str(reset_to_previous_middle_of_hour(current_time_))
elif self.execution_time.lower() == "HalfHour".lower():
adjusted_execution_time = str(reset_to_half_hour(current_time_))
elif not self.execution_time.lower().startswith("every_") and self.execution_time.lower().endswith("_min"):
min = extract_minute_from_string(self.execution_time.lower())
adj_t = current_time_.replace(minute=min)
adjusted_execution_time = str(adj_t)
elif self.execution_time.lower().startswith("every_") and self.execution_time.lower().endswith("_min"):
every_x_min = extract_minute_from_string_w_every(self.execution_time.lower())
minutes = current_time_.minute
multiple = round(minutes / every_x_min) * every_x_min
if multiple >= 60:
multiple -= every_x_min
adj_t = current_time_.replace(minute=multiple)
adjusted_execution_time = str(adj_t)
else:
adjusted_execution_time = TimeUtils.adjust_pattern_tine_to_current_time(current_time_,
self.execution_time)
if not adjusted_execution_time:
return False
## the operation is still relevant if (now()~(current_time_) - self.execution_time < self.time_span)
time_diff = TimeUtils.time_diff(current_time_, adjusted_execution_time)
# if time_diff.total_seconds() > 0 and time_diff.total_seconds() <= self.time_span:
if 0 < time_diff <= self.time_span:
return True
else:
return False
except Exception as e:
Operation.LOGGER.error(f"Exception: {e}")
return False
def __str__(self):
dict_dump = self.to_dict()
return yaml.dump(dict_dump, sort_keys=False, default_flow_style=False)
# Retrieves the number of pods for a given deployment.
def get_n_pods(self, housekeeper_deployment_) -> int:
namespace = self._kubernetes_service.get_namespace()
deployment_name = housekeeper_deployment_.name
try:
deployment = self._kubernetes_service.read_namespaced_deployment(deployment_name, namespace)
Operation.LOGGER.info(
f"Deployement {housekeeper_deployment_.name} currently has {deployment.spec.replicas} replicas")
return deployment.spec.replicas
except client.exceptions.ApiException as e:
Operation.LOGGER.error("Kubernetes API error: " + str(e))
return 0
except Exception as e:
Operation.LOGGER.error(f"Error: {str(e)}")
return -1
def scale_deployment(self, housekeeper_deployment_, replicas) -> (bool, int):
try:
namespace = self._kubernetes_service.get_namespace()
deployment_name = housekeeper_deployment_.name
Operation.LOGGER.info(
f"scale_deployment namespace={namespace} deplayment_name={deployment_name}, replicas={replicas}")
deployment = self._kubernetes_service.read_namespaced_deployment(deployment_name, namespace)
current_num_of_pods = deployment.spec.replicas
Operation.LOGGER.info(f"Deployment {deployment_name} currently has {current_num_of_pods} replicas")
if current_num_of_pods != replicas:
deployment.spec.replicas = replicas
self._kubernetes_service.replace_namespaced_deployment(deployment_name_=deployment_name,
namespace_=namespace, body_=deployment)
Operation.LOGGER.info("Deployment scaled.")
return True, current_num_of_pods
else:
Operation.LOGGER.info("No need to scale")
return True, current_num_of_pods
except Exception as e:
Operation.LOGGER.error("Error: " + str(e))
return False
import logging from typing import Union
from eks_hk.nsmanager.application.engine.tickEngine import TickEngine from eks_hk.nsmanager.application.models.deployments.nsDeployment import NSDeployment from eks_hk.nsmanager.application.services.deployments.nsDeploymentFactory import NSDeploymentFactory from eks_hk.nsmanager.application.services.platform.aws_service import AWSService from eks_hk.nsmanager.application.services.platform.kubernetes_service import KubernetesService from eks_hk.nsmanager.application.utils.log_event import log_housekeeper_event from eks_hk.nsmanager.application.utils.string_utils import StringUtils
class NSDeploymentManager: """ Manages NSDeployments which are specific deployment configurations within a Kubernetes and AWS environment.
This manager class integrates services to facilitate the creation, retrieval, and management of NSDeployment
instances. It handles operations such as loading deployments
from AWS Parameter Store and maintaining a map of Kubernetes deployment names to NSDeployment instances.
Methods:
__init__: Initializes the NSDeploymentManager with necessary service clients and utilities.
get_deployment_instances_list: Returns a list of tuples containing the Kubernetes deployment names and their corresponding NSDeployment instances.
load_deployment: Attempts to load a deployment by its name, resolving its configuration via the AWS Parameter Store and updates the internal map with the deployment instance.
In case of failure during the deployment loading process, logs the error and the operation's result, and returns None indicating that the deployment could not be loaded or resolved.
"""
LOGGER = logging.getLogger(__name__)
def __init__(self, ssm_client_, ns_deployment_factory_: NSDeploymentFactory, string_utils_: StringUtils,
aws_service_: AWSService, kubernetes_service_: KubernetesService, tick_engine_: TickEngine):
self._map_kubedeploymentnames_to_nsdeploymentinstances_dict = {}
self._ns_deployment_factory = ns_deployment_factory_
self._ssm_client = ssm_client_
self._string_utils = string_utils_
self._aws_service = aws_service_
self._kubernetes_service = kubernetes_service_
self._tick_engine = tick_engine_
def get_deployment_instances_list(self):
return list(self._map_kubedeploymentnames_to_nsdeploymentinstances_dict.items())
def load_deployment(self, deployment_name_) -> Union[None, NSDeployment]:
try:
prefix = self._aws_service.get_parameter_store_prefix()
region = self._kubernetes_service.get_region()
cluster = self._kubernetes_service.get_cluster_name()
namespace = self._kubernetes_service.get_namespace()
# !!!!!!!!!!!! pls. move all these variables into a config file
VERSION = "1.0.0"
NSDeploymentManager.LOGGER.info(
f"load_deployment {prefix}/{region}/{VERSION}/{cluster}/{namespace}/{deployment_name_}")
parameter_store_key = f"{prefix}/{region}/{VERSION}/{cluster}/{namespace}/{deployment_name_}"
deployment = self._ns_deployment_factory.make_nsdeployment(parameter_store_key,
ssm_client_=self._ssm_client,
string_utils_=self._string_utils,
kubernetes_service_=self._kubernetes_service,
aws_service_=self._aws_service,
tick_engine_=self._tick_engine)
if deployment is None:
log_housekeeper_event(event="load_deployment", operation="", eventRetCode_=0,
deployment_=deployment_name_, target_percentage_="", current_num_of_pods_="",
target_num_of_pods_="", execution_time_=0, message_="Could not load")
return None
log_housekeeper_event(event="load_deployment", operation="", eventRetCode_=0, deployment_=deployment_name_,
target_percentage_="", current_num_of_pods_="", target_num_of_pods_="",
execution_time_=0,
message_=f"Loading deployment {deployment_name_} in {namespace} {cluster} {region} ")
deployment.deployment_name = deployment_name_
deployment.housekeeper_deployment = deployment
NSDeploymentManager.LOGGER.info("{}".format("- - " * 10))
NSDeploymentManager.LOGGER.info(f"Deployment from {parameter_store_key} was resolved as:\n")
NSDeploymentManager.LOGGER.info(deployment)
NSDeploymentManager.LOGGER.info("{}".format("- - " * 10))
self._map_kubedeploymentnames_to_nsdeploymentinstances_dict[deployment_name_] = deployment
return deployment
except Exception as e:
NSDeploymentManager.LOGGER.error(f"Exception: {e}")
self._map_kubedeploymentnames_to_nsdeploymentinstances_dict[deployment_name_] = None
return None
import inspect import logging from typing import Optional, Union
import yaml from botocore.exceptions import ClientError
from eks_hk.nsmanager.application.engine.tickEngine import TickEngine from eks_hk.nsmanager.application.models.deployments.nsDeployment import NSDeployment from eks_hk.nsmanager.application.models.operations.datum import Datum from eks_hk.nsmanager.application.models.operations.defineTransactionIdPrefixOperation import \ DefineTransactionIdPrefixOperation from eks_hk.nsmanager.application.models.operations.immediateScaleOperation import ImmediateScaleOperation from eks_hk.nsmanager.application.models.operations.operation import Operation from eks_hk.nsmanager.application.models.operations.scaleOperation import ScaleOperation from eks_hk.nsmanager.application.models.operations.transactionBeginOperation import TransactionBeginOperation from eks_hk.nsmanager.application.models.operations.transactionEndOperation import TransactionEndOperation from eks_hk.nsmanager.application.models.operations.watcherOperation import WatcherOperation from eks_hk.nsmanager.application.models.operations.watermarkOperation import WatermarkOperation from eks_hk.nsmanager.application.services.platform.aws_service import AWSService from eks_hk.nsmanager.application.services.platform.kubernetes_service import KubernetesService from eks_hk.nsmanager.application.utils.string_utils import StringUtils
class NSDeploymentFactory: """ The NSDeploymentFactory class provides method for creating NSDeployment instances based on configuration data stored in AWS SSM Parameter Store. It encapsulates the logic for loading deployment configurations either from JSON or YAML formats, handling exceptions, and logging pertinent information throughout the process. The class supports dynamic deployment creation. """
LOGGER = logging.getLogger(__name__)
@staticmethod
def make_new() -> "NSDeploymentFactory":
nseploymentFactory = None
try:
nseploymentFactory = NSDeploymentFactory()
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception when creating HousekeeperController instance: {e}")
return nseploymentFactory
def __init__(self):
pass
# @staticmethod should _load_from_parameterstore etc become static?
def make_nsdeployment(self, key_: str, ssm_client_, string_utils_: StringUtils, aws_service_: AWSService,
kubernetes_service_: KubernetesService, tick_engine_: TickEngine) -> Union[
NSDeployment, None]:
try:
deployment = self._load_from_parameterstore(parameter_store_key_=key_, ssm_client_=ssm_client_,
string_utils_=string_utils_, aws_service_=aws_service_,
kubernetes_service_=kubernetes_service_)
deployment.initialize(string_utils_=string_utils_, kubernetes_service_=kubernetes_service_,
tick_engine_=tick_engine_)
deployments_to_apply = [] # .append
self.collect_applies(deployment, deployments_to_apply, ssm_client_=ssm_client_, string_utils_=string_utils_,
aws_service_=aws_service_, kubernetes_service_=kubernetes_service_)
for d in deployments_to_apply:
if d is not deployment:
deployment.apply_operations(d)
return deployment
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception: {e}")
return None
@staticmethod
def _load_from_parameterstore(parameter_store_key_: str, ssm_client_, string_utils_: StringUtils,
aws_service_: AWSService, kubernetes_service_: KubernetesService):
try:
housekeeper_deployment = NSDeploymentFactory._load_from_ps(parameter_store_key_, ssm_client_=ssm_client_,
string_utils_=string_utils_,
aws_service_=aws_service_,
kubernetes_service_=kubernetes_service_)
if housekeeper_deployment is not None:
housekeeper_deployment_operation_data = NSDeploymentFactory._load_from_ps(
parameter_store_key_ + "_data", ssm_client_=ssm_client_, string_utils_=string_utils_,
aws_service_=aws_service_, kubernetes_service_=kubernetes_service_)
if housekeeper_deployment_operation_data is not None:
housekeeper_deployment.operation_data = housekeeper_deployment_operation_data.get_operation_data()
return housekeeper_deployment
else:
return None
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception: {e}")
return None
@staticmethod
def _load_from_ps(parameter_store_key, ssm_client_, string_utils_: StringUtils, aws_service_: AWSService,
kubernetes_service_: KubernetesService) -> Optional[NSDeployment]:
"""Load deployment configuration from Parameter Store using the injected SSM client."""
try:
response = ssm_client_.get_parameter(Name=parameter_store_key, WithDecryption=True)
parameter_value = response["Parameter"]["Value"]
ret = NSDeploymentFactory.load_from_string(parameter_value, ssm_client_=ssm_client_,
string_utils_=string_utils_, aws_service_=aws_service_,
kubernetes_service_=kubernetes_service_)
if not ret:
NSDeploymentFactory.LOGGER.info(f"Could not load or parse {parameter_store_key}")
return None
NSDeploymentFactory.LOGGER.info(f"{parameter_store_key} read.")
ret.source = parameter_value
ret.parameter_store_key = parameter_store_key
return ret
except ClientError as e:
if e.response["Error"]["Code"] == "ParameterNotFound":
NSDeploymentFactory.LOGGER.info(f"Parameter {parameter_store_key} not found.")
else:
NSDeploymentFactory.LOGGER.error(f"Error when retrieving {parameter_store_key}: {e}")
return None
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"General exception: {e}")
return None
@staticmethod
def collect_applies(current_deployment_, deployments_to_apply_, ssm_client_, string_utils_: StringUtils,
aws_service_: AWSService, kubernetes_service_: KubernetesService):
try:
if not current_deployment_:
return
strings = current_deployment_.source.splitlines()
deployments_to_apply_.append(current_deployment_)
for s in strings:
if s.startswith("#apply "):
import_key = s[len("#apply "):]
print(f"******** applying {import_key}")
d = NSDeploymentFactory._load_from_parameterstore(import_key, ssm_client_=ssm_client_,
string_utils_=string_utils_,
aws_service_=aws_service_,
kubernetes_service_=kubernetes_service_)
# deployments_to_apply.append(d)
NSDeploymentFactory.collect_applies(d, deployments_to_apply_, ssm_client_=ssm_client_,
string_utils_=string_utils_, aws_service_=aws_service_,
kubernetes_service_=kubernetes_service_)
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception: {e}")
@staticmethod
def load_from_string(input_str_, ssm_client_, string_utils_: StringUtils, aws_service_: AWSService,
kubernetes_service_: KubernetesService) -> Optional[NSDeployment]:
if string_utils_.is_yaml(input_str_) is True:
depl = NSDeploymentFactory.load_from_yaml(input_str_, ssm_client_=ssm_client_, string_utils_=string_utils_,
aws_service_=aws_service_, kubernetes_service_=kubernetes_service_)
if depl is not None:
depl._is_yaml = True
return depl
else:
NSDeploymentFactory.LOGGER.error("Input string is neither valid JSON nor YAML.")
return None
@staticmethod
def _load_from_yaml(yaml_str_, ssm_client_, string_utils_: StringUtils, aws_service_: AWSService,
kubernetes_service_: KubernetesService):
try:
data = yaml.safe_load(yaml_str_)
return NSDeploymentFactory._load(data, ssm_client_=ssm_client_, string_utils_=string_utils_,
aws_service_=aws_service_, kubernetes_service_=kubernetes_service_)
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception: {e}")
return None
@staticmethod
def load_from_yaml(yaml_str_, ssm_client_, string_utils_: StringUtils, aws_service_: AWSService,
kubernetes_service_: KubernetesService) -> Optional[NSDeployment]:
"""Loads a deployment from a YAML string."""
try:
data = yaml.safe_load(yaml_str_)
return NSDeploymentFactory._load(data, ssm_client_=ssm_client_, string_utils_=string_utils_,
aws_service_=aws_service_, kubernetes_service_=kubernetes_service_)
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception in load_from_yaml: {e}")
return None
@staticmethod
def _load(data_, ssm_client_, string_utils_: StringUtils, aws_service_: AWSService,
kubernetes_service_: KubernetesService) -> Optional[NSDeployment]:
"""Internal method to reconstruct HouseKeeperDeployment from data."""
try:
operations = []
if data_.get("operations"):
for item in data_["operations"]:
if item["type"] == "ScaleOperation":
operation = ScaleOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"), item.get("watermark_id"))
elif item["type"] == "WatcherOperation":
operation = WatcherOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("last_target_scale_id"))
elif item["type"] == "ImmediateScaleOperation":
operation = ImmediateScaleOperation(item.get("id"), item.get("type"), item.get("scale_to"))
elif item["type"] == "WatermarkOperation":
operation = WatermarkOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"), item.get("watermark_id"))
elif item["type"] == "TransactionBeginOperation":
operation = TransactionBeginOperation(item.get("id"), item.get("type"),
item.get("execution_time"), item.get("target_percentage"))
elif item["type"] == "DefineTransactionIdPrefixOperation":
operation = DefineTransactionIdPrefixOperation(item.get("id"), item.get("type"),
item.get("execution_time"),
item.get("target_percentage"),
item.get("transaction_id_prefix"))
elif item["type"] == "TransactionEndOperation":
operation = TransactionEndOperation(item.get("id"), item.get("type"),
item.get("execution_time"), item.get("target_percentage"))
else:
operation = Operation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"))
operation.initialize(ssm_client_=ssm_client_, string_utils_=string_utils_,
aws_service_=aws_service_, kubernetes_service_=kubernetes_service_)
operations.append(operation)
operation_data = []
if data_.get("operation_data") is not None:
try:
operation_data = [Datum.from_dict(datum) for datum in data_["operation_data"]]
except Exception as e:
operation_data = []
return NSDeployment(name=data_.get("name"), type=data_.get("type"), id=data_.get("id"),
version=data_.get("version"), url=data_.get("url"), env=data_.get("env"),
sealId=data_.get("sealId"), operations=operations, operation_data=operation_data,
skip_kubernetes_scale=data_.get("skip_kubernetes_scale"),
skip_parameterstore_write=data_.get("skip_parameterstore_write"),
active=data_.get("active"))
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception in {inspect.currentframe().f_code.co_name}: {e}")
return None
import logging import time
from eks_hk.nsmanager.application.services.deployments.nsDeploymentManager import NSDeploymentManager from eks_hk.nsmanager.application.services.platform.aws_service import AWSService from eks_hk.nsmanager.application.services.platform.kubernetes_service import KubernetesService from eks_hk.nsmanager.application.utils.time_utils import TimeUtils
""" The NSController class manages the lifecycle and operations of NSDeployment instances. The class supports operations such as:
class NSController: LOGGER = logging.getLogger(name)