Open afaisman opened 6 months ago
import json import logging import yaml from eks_hk.awm.app.models.datum import Datum from eks_hk.awm.app.models.operation import Operation from eks_hk.awm.app.utils import Utils from eks_hk.awm.app.utils import setup_logger from eks_hk.awm.app.utils.parameter_store import _save_to_ps
class HouseKeeperDeployment: LOGGER = logging.getLogger(name) setup_logger(LOGGER)
def __init__(self, name, type, id, version, url, env, sealId, operations, operation_data, is_json=False,
is_yaml=False, skip_kubernetes_scale=False, skip_parameterstore_write=False, active=True):
self.name = name
self.type = type
self.id = id
self.version = version
self.url = url
self.env = env
self.sealId = sealId
self.operations = operations
self.operation_data = operation_data
self._is_json = is_json
self._is_yaml = is_yaml
self.skip_kubernetes_scale = skip_kubernetes_scale
self.skip_parameterstore_write = skip_parameterstore_write
self.time_created = Utils.get_est_now()
self.active = active
self.source = None
self.parameter_store_key = None
self._housekeeper_controller = None
self._transaction_id = None
self._transaction_id_prefix = 'default'
def __str__(self):
return f"{self.id}\n{self.parameter_store_key}\n {self.to_yaml(False, False)}"
def apply_operations_and_data(self, housekeeper_another_instance_):
self.operations = Utils.apply_list_by_id(self.operations, housekeeper_another_instance_.operations)
self.operation_data = Utils.apply_list_by_id(self.operation_data, housekeeper_another_instance_.operation_data)
# Converts the deployment object to a dictionary.
def to_dict(self):
try:
operations_dicts = []
for operation in self.operations:
operations_dicts.append(operation.to_dict())
operation_data_dicts = []
for data in self.operation_data:
operation_data_dicts.append(data.to_dict())
# self.skip_kubernetes_scale = skip_kubernetes_scale
# self.skip_parameterstore_write = skip_parameterstore_write
# self.active = active
return {"name": self.name, "type": self.type, "id": self.id, "version": self.version, "url": self.url,
"env": self.env, "sealId": self.sealId, "skip_kubernetes_scale": self.skip_kubernetes_scale,
"skip_parameterstore_write": self.skip_parameterstore_write, "active": self.active,
"operations": operations_dicts, "operation_data": operation_data_dicts}
except Exception as e:
HouseKeeperDeployment.LOGGER.error(f"Exception: {e}")
return None
# Loads a HouseKeeperDeployment object from dict data_.
def save_to_parameterstore(self):
# str_serialized = self.to_str(skip_operations=False, skip_data=True)
parameter_store_key_ = self.url
# self._save_to_ps(parameter_store_key_, str_serialized)
str_serialized_data = self.to_str(skip_operations=True, skip_data=False)
_save_to_ps(parameter_store_key_ + "_data", str_serialized_data)
def to_json(self, skip_operations, skip_data):
try:
dict_dump = self.to_dict()
if skip_operations:
dict_dump.pop("operations")
if skip_data:
dict_dump.pop("operation_data")
return json.dumps(dict_dump, indent=4)
except Exception as e:
HouseKeeperDeployment.LOGGER.error(f"Exception: {e}")
return ""
def to_yaml(self, skip_operations, skip_data):
try:
dict_dump = self.to_dict()
if skip_operations:
dict_dump.pop("operations")
if skip_data:
dict_dump.pop("operation_data")
return yaml.dump(dict_dump, sort_keys=False, default_flow_style=False)
except Exception as e:
HouseKeeperDeployment.LOGGER.error(f"Exception: {e}")
return ""
# Converts the deployment object to a string representation.
def to_str(self, skip_operations, skip_data):
try:
if self._is_json:
return self.to_json(skip_operations, skip_data)
elif self._is_yaml:
return self.to_yaml(skip_operations, skip_data)
else:
HouseKeeperDeployment.LOGGER.error(f"Neither JSON nor YAML format is specified for serialization")
return ""
except Exception as e:
HouseKeeperDeployment.LOGGER.error(f"Exception: {e}")
return ""
# Retrieves a specific datum from the deployment.
def get_datum(self, data_id_):
if self.operation_data is None:
return None
return next((item for item in self.operation_data if item.data_id == data_id_), None)
def remove_datum(self, data_id):
if self.operation_data is None:
return False
# Find the item with the specified data_id
item_to_remove = next((item for item in self.operation_data if item.data_id == data_id), None)
if item_to_remove is None:
# No item found with the given data_id
return False
# Remove the item
self.operation_data.remove(item_to_remove)
return True
# Sets a specific datum for the deployment.
def set_datum(self, data_id_, value=None):
try:
datum = self.get_datum(data_id_)
if datum:
if value is not None:
datum.set_value(value)
return datum
else:
new_datum = Datum(data_id=data_id_, value=value or 0, recorded_time=Utils.get_est_now())
self.operation_data.append(new_datum)
return new_datum
except Exception as e:
HouseKeeperDeployment.LOGGER.error(f"Exception: {e}")
return None
def get_operation(self, id_):
try:
return next((item for item in self.operations if item.id == id_), None)
except Exception as e:
HouseKeeperDeployment.LOGGER.error(f"Exception: {e}")
return None
# Sets or updates an operation in the deployment.
def set_operation(self, id_, type=None, execution_time=None, target_percentage=None):
try:
operation = self.get_operation(id_)
if operation:
if type is not None:
operation.type = type
if execution_time is not None:
operation.execution_time = execution_time
if target_percentage is not None:
operation.target_percentage = target_percentage
return operation
else:
new_operation = Operation(id=id_, type=type or "", execution_time=execution_time or 0,
target_percentage=target_percentage or 0)
self.operations.append(new_operation)
return new_operation
except Exception as e:
HouseKeeperDeployment.LOGGER.error(f"Exception: {e}")
return None
def set_transaction_id_prefix(self, transaction_id_prefix):
self._transaction_id_prefix = transaction_id_prefix
return self._transaction_id_prefix
def normalize_transaction_id(self, tid_str):
ret = tid_str
for i in range(9):
ret = ret.replace(f'p1{i}_p1{i}', f'p1{i}')
ret = ret.replace(f'realtime', f'rt')
ret = ret.replace(f'transcribe-engine', f'tr-en')
ret = ret.replace(f'synthetic', f'syn')
return ret
def create_transaction_id(self, transaction_id_prefix):
region = self._housekeeper_controller.get_region_name()
cluster = self._housekeeper_controller.get_cluster_name()
namespace = self._housekeeper_controller.get_namespace()
host_name = Utils.get_hostname()
self._transaction_id_prefix = transaction_id_prefix
# self._transaction_id = f"{transaction_id_prefix}_{region}_{cluster}_{namespace}_{host_name}_{self.name}_{self.housekeeper_controller._tick_counter}"
self._transaction_id = self.normalize_transaction_id(
f"{transaction_id_prefix}{region}_{cluster}_{namespace}_{self.name}")
return self._transaction_id
def create_default_transaction_id(self):
region = self._housekeeper_controller.get_region_name()
cluster = self._housekeeper_controller.get_cluster_name()
namespace = self._housekeeper_controller.get_namespace()
host_name = Utils.get_hostname()
# self._transaction_id = f"{self.transaction_id_prefix}_{region}_{cluster}_{namespace}_{host_name}_{self.name}_{self.housekeeper_controller._tick_counter}"
self._transaction_id = self.normalize_transaction_id(
f"{self.get_transaction_id_prefix()}{region}_{cluster}_{namespace}_{self.name}")
return self._transaction_id
def get_transaction_id(self):
if not self._transaction_id:
self.create_default_transaction_id()
return self._transaction_id
# Note: execute() returns tuple of booleans (Succeded, Executed, Data were updated)
# Main method to update the state of the deployment based on current time.
# The method returns tuple of booleans (Succeded, Data were updated)
def tick(self, current_time_):
try:
# Returns: (success, data_updated). data_updated is used by housekeepercontroller to decide if serializing tp the paramter store is needed
updated_required = False
for operation in self.operations:
succeded, executed, data_updated = operation.execute(current_time_, self)
if data_updated:
updated_required = True
return True, updated_required
except Exception as e:
HouseKeeperDeployment.LOGGER.error(f"Exception: {e}")
return False, False
# Getters for private variables
def get_is_json(self):
return self._is_json
def get_is_yaml(self):
return self._is_yaml
def get_housekeeper_controller(self):
return self._housekeeper_controller
def get_transaction_id_prefix(self):
return self._transaction_id_prefix
import inspect import json
import boto3 import yaml from botocore.exceptions import ClientError
from eks_hk.awm.app.models.datum import Datum from eks_hk.awm.app.models.defineTransactionIdPrefixOperation import DefineTransactionIdPrefixOperation from eks_hk.awm.app.models.housekeeperDeployment import HouseKeeperDeployment from eks_hk.awm.app.models.immediateScaleOperation import ImmediateScaleOperation from eks_hk.awm.app.models.operation import Operation from eks_hk.awm.app.models.scaleOperation import ScaleOperation from eks_hk.awm.app.models.transactionBeginOperation import TransactionBeginOperation from eks_hk.awm.app.models.transactionEndOperation import TransactionEndOperation from eks_hk.awm.app.models.watcherOperation import WatcherOperation from eks_hk.awm.app.models.watermarkOperation import WatermarkOperation from eks_hk.awm.app.utils import Utils from eks_hk.awm.app.utils.globals import VERSION from eks_hk.awm.app.utils.log_event import log_housekeeper_event from eks_hk.awm.app.utils.parameter_store import get_parameter_and_resolve_imports
class HouseKeeperDeploymentManager: def init(self): self._map_depl_to_housekeeperinst = {}
def get_map_depl_to_housekeeperinst(self):
return self._map_depl_to_housekeeperinst
@classmethod
def _load(cls, data_):
try:
operations = []
if data_.get("operations"):
for item in data_["operations"]:
if item["type"] == "ScaleOperation":
operation = ScaleOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"), item.get("watermark_id"))
elif item["type"] == "WatcherOperation":
operation = WatcherOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("last_target_scale_id"))
elif item["type"] == "ImmediateScaleOperation":
operation = ImmediateScaleOperation(item.get("id"), item.get("type"), item.get("scale_to"))
elif item["type"] == "WatermarkOperation":
operation = WatermarkOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"), item.get("watermark_id"))
elif item["type"] == "TransactionBeginOperation":
operation = TransactionBeginOperation(item.get("id"), item.get("type"),
item.get("execution_time"), item.get("target_percentage"))
elif item["type"] == "DefineTransactionIdPrefixOperation":
operation = DefineTransactionIdPrefixOperation(item.get("id"), item.get("type"),
item.get("execution_time"),
item.get("target_percentage"),
item.get("transaction_id_prefix"))
elif item["type"] == "TransactionEndOperation":
operation = TransactionEndOperation(item.get("id"), item.get("type"),
item.get("execution_time"), item.get("target_percentage"))
else:
operation = Operation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"))
operations.append(operation)
operation_data = []
if data_.get("operation_data"):
try:
operation_data = [Datum.from_dict(datum) for datum in data_["operation_data"]]
except Exception as e:
operation_data = []
return HouseKeeperDeployment(name=data_.get("name"), type=data_.get("type"), id=data_.get("id"), version=data_.get("version"),
url=data_.get("url"), env=data_.get("env"), sealId=data_.get("sealId"), operations=operations,
operation_data=operation_data, skip_kubernetes_scale=data_.get("skip_kubernetes_scale"),
skip_parameterstore_write=data_.get("skip_parameterstore_write"), active=data_.get("active"))
except Exception as e:
HouseKeeperDeployment.LOGGER.error(f"Exception in {inspect.currentframe().f_code.co_name}: {e}")
return None
@classmethod
def load_from_json(cls, json_str_) :
try:
data = json.loads(json_str_)
return HouseKeeperDeploymentManager._load(data)
except Exception as e:
HouseKeeperDeployment.LOGGER.error(f"Exception: {e}")
return None
@classmethod
def load_from_yaml(cls, yaml_str_) :
try:
data = yaml.safe_load(yaml_str_)
return HouseKeeperDeploymentManager._load(data)
except Exception as e:
HouseKeeperDeployment.LOGGER.error(f"Exception: {e}")
return None
# Loads deployment configuration from AWS Parameter Store.
@classmethod
def load_deployment(cls, housekeeper_controller_, deployment_name, tick_counter=-1) :
try:
prefix = Utils.get_parameter_store_prefix()
region = housekeeper_controller_.get_region_name()
cluster = housekeeper_controller_.get_cluster_name()
namespace = housekeeper_controller_.get_namespace()
HouseKeeperDeployment.LOGGER.info(
f"load_deployment {prefix}/{region}/{VERSION}/{cluster}/{namespace}/{deployment_name}")
parameter_store_key = f"{prefix}/{region}/{VERSION}/{cluster}/{namespace}/{deployment_name}"
ret = cls.load_deployment_with_applies(parameter_store_key)
if not ret:
log_housekeeper_event(event="load_deployment", operation="", eventRetCode=0, region=region,
cluster=cluster, namespace=namespace, deployment=deployment_name,
target_percentage="", current_num_of_pods="", target_num_of_pods="",
execution_time=0, message="Could not load")
return None
log_housekeeper_event(event="load_deployment", operation="", eventRetCode=0, region=region, cluster=cluster,
namespace=namespace, deployment=deployment_name, target_percentage="",
current_num_of_pods="", target_num_of_pods="", execution_time=0,
message=f"Loading deployment {deployment_name} in {namespace} {cluster} {region} ")
ret.deployment_name = deployment_name
ret._housekeeper_controller = housekeeper_controller_
ret.housekeeper_deployment = ret
HouseKeeperDeployment.LOGGER.info("{}".format("- - " * 10))
HouseKeeperDeployment.LOGGER.info(f"Deployment from {parameter_store_key} was resolved as:\n")
HouseKeeperDeployment.LOGGER.info(ret)
HouseKeeperDeployment.LOGGER.info("{}".format("- - " * 10))
return ret
except Exception as e:
HouseKeeperDeployment.LOGGER.error(f"Exception: {e}")
return None
@classmethod
def _load_from_ps(cls, parameter_store_key_: str) :
try:
ssm_client = boto3.client("ssm", region_name=Utils.get_region())
parameter_str = get_parameter_and_resolve_imports(ssm_client, parameter_store_key_)
if parameter_str is None:
HouseKeeperDeployment.LOGGER.info(f"Parameter {parameter_store_key_} is None.")
return None
ret = HouseKeeperDeploymentManager.load_from_string(parameter_str)
if ret == None:
HouseKeeperDeployment.LOGGER.info(f"Could not load or parse {parameter_store_key_}")
return None
HouseKeeperDeployment.LOGGER.info(f"{parameter_store_key_} read.")
ret.source = parameter_str
ret.parameter_store_key = parameter_store_key_
return ret
except ClientError as e:
if e.response["Error"]["Code"] == "ParameterNotFound":
HouseKeeperDeployment.LOGGER.info(f"{parameter_store_key_} not found in Parameter Store.")
else:
HouseKeeperDeployment.LOGGER.error(f"Error when retrieving {parameter_store_key_}: {e}")
return None
except Exception as e:
HouseKeeperDeployment.LOGGER.error(f"Exception: {e}")
return None
@classmethod
def load_from_parameterstore(cls, parameter_store_key_: str):
try:
housekeeper_deployment = HouseKeeperDeploymentManager._load_from_ps(parameter_store_key_)
if housekeeper_deployment:
housekeeper_deployment_operation_data = HouseKeeperDeploymentManager._load_from_ps(
parameter_store_key_ + "_data")
if housekeeper_deployment_operation_data:
housekeeper_deployment.operation_data = housekeeper_deployment_operation_data.operation_data
return housekeeper_deployment
else:
return None
except Exception as e:
HouseKeeperDeployment.LOGGER.error(f"Exception: {e}")
return None
@classmethod
def collect_applies(cls, current_deployment, deployments_to_apply):
try:
# Check !!!
if not current_deployment:
return
strings = current_deployment.source.splitlines()
deployments_to_apply.append(current_deployment)
for s in strings:
if s.startswith("#apply "):
import_key = s[len("#apply "):]
print(f"******** applying {import_key}")
d = cls.load_from_parameterstore(import_key)
# deployments_to_apply.append(d)
cls.collect_applies(d, deployments_to_apply)
except Exception as e:
HouseKeeperDeployment.LOGGER.error(f"Exception: {e}")
@classmethod
def load_deployment_with_applies(cls, key_):
try:
deployment = cls.load_from_parameterstore(key_)
deployments_to_apply = [] # .append
cls.collect_applies(deployment, deployments_to_apply)
for d in deployments_to_apply:
if d is not deployment:
deployment.apply_operations_and_data(d)
return deployment
except Exception as e:
HouseKeeperDeployment.LOGGER.error(f"Exception: {e}")
return None
# Loads a HouseKeeperDeployment from a string representation.
@classmethod
def load_from_string(cls, input_str_) :
try:
if Utils.is_json(input_str_):
ret = cls.load_from_json(input_str_)
if ret:
ret._is_yaml = False
ret._is_json = True
return ret
elif Utils.is_yaml(input_str_):
ret = cls.load_from_yaml(input_str_)
if ret:
ret._is_yaml = True
ret._is_json = False
return ret
else:
return None
except Exception as e:
HouseKeeperDeployment.LOGGER.error(f"Exception: {e}")
return None
import logging import time
import boto3 from kubernetes import config, client from kubernetes.client import ApiException
from eks_hk.awm.app.models.housekeeperDeploymentManager import HouseKeeperDeploymentManager from eks_hk.awm.app.utils import setup_logger, Utils
class HousekeeperController: LOGGER = logging.getLogger(name) setup_logger(LOGGER)