Open afaisman opened 6 months ago
import inspect import json import logging
import boto3 import yaml from botocore.exceptions import ClientError
from eks_hk.awm.app.models.datum import Datum from eks_hk.awm.app.models.defineTransactionIdPrefixOperation import DefineTransactionIdPrefixOperation from eks_hk.awm.app.models.immediateScaleOperation import ImmediateScaleOperation from eks_hk.awm.app.models.nsMgrDeployment import NSMgrDeployment from eks_hk.awm.app.models.operation import Operation from eks_hk.awm.app.models.scaleOperation import ScaleOperation from eks_hk.awm.app.models.transactionBeginOperation import TransactionBeginOperation from eks_hk.awm.app.models.transactionEndOperation import TransactionEndOperation from eks_hk.awm.app.models.watcherOperation import WatcherOperation from eks_hk.awm.app.models.watermarkOperation import WatermarkOperation from eks_hk.awm.app.utils import Utils from eks_hk.awm.app.utils.globals import VERSION from eks_hk.awm.app.utils.log_event import log_housekeeper_event
class NSMgrDeploymentManager: def init(self, ssm_client, logger=None): self.ssm_client = ssm_client self.logger = logger if logger else logging.getLogger(name) self._map_depl_to_housekeeperinst = {}
def load_deployment(self, housekeeper_controller_, deployment_name, tick_counter=-1):
try:
prefix = Utils.get_parameter_store_prefix()
region = housekeeper_controller_.get_region_name()
cluster = housekeeper_controller_.get_cluster_name()
namespace = housekeeper_controller_.get_namespace()
NSMgrDeployment.LOGGER.info(
f"load_deployment {prefix}/{region}/{VERSION}/{cluster}/{namespace}/{deployment_name}")
parameter_store_key = f"{prefix}/{region}/{VERSION}/{cluster}/{namespace}/{deployment_name}"
ret = self.load_deployment_with_applies(parameter_store_key)
if not ret:
log_housekeeper_event(event="load_deployment", operation="", eventRetCode=0, region=region,
cluster=cluster, namespace=namespace, deployment=deployment_name,
target_percentage="", current_num_of_pods="", target_num_of_pods="",
execution_time=0, message="Could not load")
return None
log_housekeeper_event(event="load_deployment", operation="", eventRetCode=0, region=region, cluster=cluster,
namespace=namespace, deployment=deployment_name, target_percentage="",
current_num_of_pods="", target_num_of_pods="", execution_time=0,
message=f"Loading deployment {deployment_name} in {namespace} {cluster} {region} ")
ret.deployment_name = deployment_name
ret._housekeeper_controller = housekeeper_controller_
ret.housekeeper_deployment = ret
NSMgrDeployment.LOGGER.info("{}".format("- - " * 10))
NSMgrDeployment.LOGGER.info(f"Deployment from {parameter_store_key} was resolved as:\n")
NSMgrDeployment.LOGGER.info(ret)
NSMgrDeployment.LOGGER.info("{}".format("- - " * 10))
self._map_depl_to_housekeeperinst[deployment_name] = ret
return ret
except Exception as e:
NSMgrDeployment.LOGGER.error(f"Exception: {e}")
self._map_depl_to_housekeeperinst[deployment_name] = None
return None
def _load(self, data_):
""" Internal method to reconstruct HouseKeeperDeployment from data. """
try:
operations = []
if data_.get("operations"):
for item in data_["operations"]:
if item["type"] == "ScaleOperation":
operation = ScaleOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"), item.get("watermark_id"))
elif item["type"] == "WatcherOperation":
operation = WatcherOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("last_target_scale_id"))
elif item["type"] == "ImmediateScaleOperation":
operation = ImmediateScaleOperation(item.get("id"), item.get("type"), item.get("scale_to"))
elif item["type"] == "WatermarkOperation":
operation = WatermarkOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"), item.get("watermark_id"))
elif item["type"] == "TransactionBeginOperation":
operation = TransactionBeginOperation(item.get("id"), item.get("type"),
item.get("execution_time"), item.get("target_percentage"))
elif item["type"] == "DefineTransactionIdPrefixOperation":
operation = DefineTransactionIdPrefixOperation(item.get("id"), item.get("type"),
item.get("execution_time"),
item.get("target_percentage"),
item.get("transaction_id_prefix"))
elif item["type"] == "TransactionEndOperation":
operation = TransactionEndOperation(item.get("id"), item.get("type"),
item.get("execution_time"), item.get("target_percentage"))
else:
operation = Operation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"))
operations.append(operation)
operation_data = []
if data_.get("operation_data"):
try:
operation_data = [Datum.from_dict(datum) for datum in data_["operation_data"]]
except Exception as e:
operation_data = []
return NSMgrDeployment(name=data_.get("name"), type=data_.get("type"), id=data_.get("id"),
version=data_.get("version"),
url=data_.get("url"), env=data_.get("env"), sealId=data_.get("sealId"),
operations=operations,
operation_data=operation_data,
skip_kubernetes_scale=data_.get("skip_kubernetes_scale"),
skip_parameterstore_write=data_.get("skip_parameterstore_write"),
active=data_.get("active"))
except Exception as e:
NSMgrDeployment.LOGGER.error(f"Exception in {inspect.currentframe().f_code.co_name}: {e}")
return None
def load_from_json(self, json_str):
""" Loads a deployment from a JSON string. """
try:
data = json.loads(json_str)
return self._load(data)
except Exception as e:
self.logger.error(f"Exception in load_from_json: {e}")
return None
def load_from_yaml(self, yaml_str):
""" Loads a deployment from a YAML string. """
try:
data = yaml.safe_load(yaml_str)
return self._load(data)
except Exception as e:
self.logger.error(f"Exception in load_from_yaml: {e}")
return None
def load_from_parameterstore(self, parameter_store_key):
""" Loads deployment configuration from AWS Parameter Store. """
try:
response = self.ssm_client.get_parameter(Name=parameter_store_key, WithDecryption=True)
parameter_value = response['Parameter']['Value']
return self.load_from_string(parameter_value)
except ClientError as e:
self.logger.error(f"Error retrieving {parameter_store_key}: {e}")
return None
except Exception as e:
self.logger.error(f"General exception in load_from_parameterstore: {e}")
return None
def load_from_string(self, input_str):
""" Decides whether to load from JSON or YAML based on the content format. """
if Utils.is_json(input_str):
return self.load_from_json(input_str)
elif Utils.is_yaml(input_str):
return self.load_from_yaml(input_str)
else:
self.logger.error("Input string is neither valid JSON nor YAML.")
return None
@staticmethod
def create_default():
""" Factory method to create an instance with the default AWS SSM client. """
region = Utils.get_region() # Assumes Utils.get_region() is a static method that retrieves the AWS region
ssm_client = boto3.client('ssm', region_name=region)
return NSMgrDeploymentManager(ssm_client=ssm_client)
def get_map_depl_to_housekeeperinst(self):
return self._map_depl_to_housekeeperinst
def __load(self, data_):
try:
operations = []
if data_.get("operations"):
for item in data_["operations"]:
if item["type"] == "ScaleOperation":
operation = ScaleOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"), item.get("watermark_id"))
elif item["type"] == "WatcherOperation":
operation = WatcherOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("last_target_scale_id"))
elif item["type"] == "ImmediateScaleOperation":
operation = ImmediateScaleOperation(item.get("id"), item.get("type"), item.get("scale_to"))
elif item["type"] == "WatermarkOperation":
operation = WatermarkOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"), item.get("watermark_id"))
elif item["type"] == "TransactionBeginOperation":
operation = TransactionBeginOperation(item.get("id"), item.get("type"),
item.get("execution_time"), item.get("target_percentage"))
elif item["type"] == "DefineTransactionIdPrefixOperation":
operation = DefineTransactionIdPrefixOperation(item.get("id"), item.get("type"),
item.get("execution_time"),
item.get("target_percentage"),
item.get("transaction_id_prefix"))
elif item["type"] == "TransactionEndOperation":
operation = TransactionEndOperation(item.get("id"), item.get("type"),
item.get("execution_time"), item.get("target_percentage"))
else:
operation = Operation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"))
operations.append(operation)
operation_data = []
if data_.get("operation_data"):
try:
operation_data = [Datum.from_dict(datum) for datum in data_["operation_data"]]
except Exception as e:
operation_data = []
return NSMgrDeployment(name=data_.get("name"), type=data_.get("type"), id=data_.get("id"),
version=data_.get("version"),
url=data_.get("url"), env=data_.get("env"), sealId=data_.get("sealId"),
operations=operations,
operation_data=operation_data,
skip_kubernetes_scale=data_.get("skip_kubernetes_scale"),
skip_parameterstore_write=data_.get("skip_parameterstore_write"),
active=data_.get("active"))
except Exception as e:
NSMgrDeployment.LOGGER.error(f"Exception in {inspect.currentframe().f_code.co_name}: {e}")
return None
def _load_from_json(self, json_str_):
try:
data = json.loads(json_str_)
return self._load(data)
except Exception as e:
NSMgrDeployment.LOGGER.error(f"Exception: {e}")
return None
def _load_from_yaml(self, yaml_str_):
try:
data = yaml.safe_load(yaml_str_)
return self._load(data)
except Exception as e:
NSMgrDeployment.LOGGER.error(f"Exception: {e}")
return None
def _load_from_ps(self, parameter_store_key):
""" Load deployment configuration from Parameter Store using the injected SSM client. """
try:
response = self.ssm_client.get_parameter(Name=parameter_store_key, WithDecryption=True)
parameter_value = response['Parameter']['Value']
ret = self.load_from_string(parameter_value)
if ret is None:
self.logger.info(f"Could not load or parse {parameter_store_key}")
return None
self.logger.info(f"{parameter_store_key} read.")
ret.source = parameter_value
ret.parameter_store_key = parameter_store_key
return ret
except ClientError as e:
if e.response['Error']['Code'] == "ParameterNotFound":
self.logger.info(f"Parameter {parameter_store_key} not found.")
else:
self.logger.error(f"Error when retrieving {parameter_store_key}: {e}")
return None
except Exception as e:
self.logger.error(f"General exception: {e}")
return None
def _load_from_parameterstore(self, parameter_store_key_: str):
try:
housekeeper_deployment = self._load_from_ps(parameter_store_key_)
if housekeeper_deployment:
housekeeper_deployment_operation_data = self._load_from_ps(parameter_store_key_ + "_data")
if housekeeper_deployment_operation_data:
housekeeper_deployment.operation_data = housekeeper_deployment_operation_data.operation_data
return housekeeper_deployment
else:
return None
except Exception as e:
NSMgrDeployment.LOGGER.error(f"Exception: {e}")
return None
def collect_applies(self, current_deployment, deployments_to_apply):
try:
# Check !!!
if not current_deployment:
return
strings = current_deployment.source.splitlines()
deployments_to_apply.append(current_deployment)
for s in strings:
if s.startswith("#apply "):
import_key = s[len("#apply "):]
print(f"******** applying {import_key}")
d = self._load_from_parameterstore(import_key)
# deployments_to_apply.append(d)
self.collect_applies(d, deployments_to_apply)
except Exception as e:
NSMgrDeployment.LOGGER.error(f"Exception: {e}")
def load_deployment_with_applies(self, key_):
try:
deployment = self._load_from_parameterstore(key_)
deployments_to_apply = [] # .append
self.collect_applies(deployment, deployments_to_apply)
for d in deployments_to_apply:
if d is not deployment:
deployment.apply_operations_and_data(d)
return deployment
except Exception as e:
NSMgrDeployment.LOGGER.error(f"Exception: {e}")
return None
# Loads a HouseKeeperDeployment from a string representation.
def _load_from_string(self, input_str_):
try:
if Utils.is_json(input_str_):
ret = self._load_from_json(input_str_)
if ret:
ret._is_yaml = False
ret._is_json = True
return ret
elif Utils.is_yaml(input_str_):
ret = self._load_from_yaml(input_str_)
if ret:
ret._is_yaml = True
ret._is_json = False
return ret
else:
return None
except Exception as e:
NSMgrDeployment.LOGGER.error(f"Exception: {e}")
return None
import inspect import json import logging from typing import Optional
import boto3 import yaml from botocore.exceptions import ClientError from s2t.infra.auto.services.aws.parameterStore.parameterStoreClientProxy import ParameterStoreClientProxy
from eks_hk.awm.app.models.datum import Datum from eks_hk.awm.app.models.defineTransactionIdPrefixOperation import DefineTransactionIdPrefixOperation from eks_hk.awm.app.models.immediateScaleOperation import ImmediateScaleOperation from eks_hk.awm.app.models.nsMgrDeployment import NSMgrDeployment from eks_hk.awm.app.models.operation import Operation from eks_hk.awm.app.models.scaleOperation import ScaleOperation from eks_hk.awm.app.models.transactionBeginOperation import TransactionBeginOperation from eks_hk.awm.app.models.transactionEndOperation import TransactionEndOperation from eks_hk.awm.app.models.watcherOperation import WatcherOperation from eks_hk.awm.app.models.watermarkOperation import WatermarkOperation from eks_hk.awm.app.utils import Utils from eks_hk.awm.app.utils.globals import VERSION from eks_hk.awm.app.utils.log_event import log_housekeeper_event
class NSMgrDeploymentManager: def init(self, ssm_client, logger=None): self.ssm_client = ssm_client self.logger = logger if logger else logging.getLogger(name) self._map_depl_to_housekeeperinst = {}
def load_deployment(self, housekeeper_controller_, deployment_name, tick_counter=-1) -> Optional[NSMgrDeployment]:
try:
prefix = Utils.get_parameter_store_prefix()
region = housekeeper_controller_.get_region_name()
cluster = housekeeper_controller_.get_cluster_name()
namespace = housekeeper_controller_.get_namespace()
self.logger.info(
f"load_deployment {prefix}/{region}/{VERSION}/{cluster}/{namespace}/{deployment_name}")
parameter_store_key = f"{prefix}/{region}/{VERSION}/{cluster}/{namespace}/{deployment_name}"
ret = self.load_deployment_with_applies(parameter_store_key)
if not ret:
log_housekeeper_event(event="load_deployment", operation="", eventRetCode=0, region=region,
cluster=cluster, namespace=namespace, deployment=deployment_name,
target_percentage="", current_num_of_pods="", target_num_of_pods="",
execution_time=0, message="Could not load")
return None
log_housekeeper_event(event="load_deployment", operation="", eventRetCode=0, region=region, cluster=cluster,
namespace=namespace, deployment=deployment_name, target_percentage="",
current_num_of_pods="", target_num_of_pods="", execution_time=0,
message=f"Loading deployment {deployment_name} in {namespace} {cluster} {region} ")
ret.deployment_name = deployment_name
ret._housekeeper_controller = housekeeper_controller_
ret.housekeeper_deployment = ret
self.logger.info("{}".format("- - " * 10))
self.logger.info(f"Deployment from {parameter_store_key} was resolved as:\n")
self.logger.info(ret)
self.logger.info("{}".format("- - " * 10))
self._map_depl_to_housekeeperinst[deployment_name] = ret
return ret
except Exception as e:
self.logger.error(f"Exception: {e}")
self._map_depl_to_housekeeperinst[deployment_name] = None
return None
def _load(self, data_)->Optional[NSMgrDeployment]:
""" Internal method to reconstruct HouseKeeperDeployment from data. """
try:
operations = []
if data_.get("operations"):
for item in data_["operations"]:
if item["type"] == "ScaleOperation":
operation = ScaleOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"), item.get("watermark_id"))
elif item["type"] == "WatcherOperation":
operation = WatcherOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("last_target_scale_id"))
elif item["type"] == "ImmediateScaleOperation":
operation = ImmediateScaleOperation(item.get("id"), item.get("type"), item.get("scale_to"))
elif item["type"] == "WatermarkOperation":
operation = WatermarkOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"), item.get("watermark_id"))
elif item["type"] == "TransactionBeginOperation":
operation = TransactionBeginOperation(item.get("id"), item.get("type"),
item.get("execution_time"), item.get("target_percentage"))
elif item["type"] == "DefineTransactionIdPrefixOperation":
operation = DefineTransactionIdPrefixOperation(item.get("id"), item.get("type"),
item.get("execution_time"),
item.get("target_percentage"),
item.get("transaction_id_prefix"))
elif item["type"] == "TransactionEndOperation":
operation = TransactionEndOperation(item.get("id"), item.get("type"),
item.get("execution_time"), item.get("target_percentage"))
else:
operation = Operation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"))
operations.append(operation)
operation_data = []
if data_.get("operation_data"):
try:
operation_data = [Datum.from_dict(datum) for datum in data_["operation_data"]]
except Exception as e:
operation_data = []
return NSMgrDeployment(name=data_.get("name"), type=data_.get("type"), id=data_.get("id"),
version=data_.get("version"),
url=data_.get("url"), env=data_.get("env"), sealId=data_.get("sealId"),
operations=operations,
operation_data=operation_data,
skip_kubernetes_scale=data_.get("skip_kubernetes_scale"),
skip_parameterstore_write=data_.get("skip_parameterstore_write"),
active=data_.get("active"))
except Exception as e:
self.logger.error(f"Exception in {inspect.currentframe().f_code.co_name}: {e}")
return None
def load_from_json(self, json_str) -> Optional[NSMgrDeployment]:
""" Loads a deployment from a JSON string. """
try:
data = json.loads(json_str)
return self._load(data)
except Exception as e:
self.logger.error(f"Exception in load_from_json: {e}")
return None
def load_from_yaml(self, yaml_str) -> Optional[NSMgrDeployment]:
""" Loads a deployment from a YAML string. """
try:
data = yaml.safe_load(yaml_str)
return self._load(data)
except Exception as e:
self.logger.error(f"Exception in load_from_yaml: {e}")
return None
def load_from_parameterstore(self, parameter_store_key):
""" Loads deployment configuration from AWS Parameter Store. """
try:
response = self.ssm_client.get_parameter(Name=parameter_store_key, WithDecryption=True)
parameter_value = response['Parameter']['Value']
return self.load_from_string(parameter_value)
except ClientError as e:
self.logger.error(f"Error retrieving {parameter_store_key}: {e}")
return None
except Exception as e:
self.logger.error(f"General exception in load_from_parameterstore: {e}")
return None
def load_from_string(self, input_str) -> Optional[NSMgrDeployment]:
""" Decides whether to load from JSON or YAML based on the content format. """
if Utils.is_json(input_str):
ret = self.load_from_json(input_str)
if ret:
ret._is_yaml = False
ret._is_json = True
return ret
elif Utils.is_yaml(input_str):
ret = self.load_from_yaml(input_str)
if ret:
ret._is_yaml = True
ret._is_json = False
return ret
else:
self.logger.error("Input string is neither valid JSON nor YAML.")
return None
@staticmethod
def create_default():
""" Factory method to create an instance with the default AWS SSM client. """
region = Utils.get_region() # Assumes Utils.get_region() is a static method that retrieves the AWS region
ssm_client = boto3.client('ssm', region_name=region)
return NSMgrDeploymentManager(ssm_client=ssm_client)
def get_map_depl_to_housekeeperinst(self):
return self._map_depl_to_housekeeperinst
def get_items(self):
return self._map_depl_to_housekeeperinst.items()
def list_deployments(self):
return list(self._map_depl_to_housekeeperinst.items())
def __load(self, data_):
try:
operations = []
if data_.get("operations"):
for item in data_["operations"]:
if item["type"] == "ScaleOperation":
operation = ScaleOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"), item.get("watermark_id"))
elif item["type"] == "WatcherOperation":
operation = WatcherOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("last_target_scale_id"))
elif item["type"] == "ImmediateScaleOperation":
operation = ImmediateScaleOperation(item.get("id"), item.get("type"), item.get("scale_to"))
elif item["type"] == "WatermarkOperation":
operation = WatermarkOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"), item.get("watermark_id"))
elif item["type"] == "TransactionBeginOperation":
operation = TransactionBeginOperation(item.get("id"), item.get("type"),
item.get("execution_time"), item.get("target_percentage"))
elif item["type"] == "DefineTransactionIdPrefixOperation":
operation = DefineTransactionIdPrefixOperation(item.get("id"), item.get("type"),
item.get("execution_time"),
item.get("target_percentage"),
item.get("transaction_id_prefix"))
elif item["type"] == "TransactionEndOperation":
operation = TransactionEndOperation(item.get("id"), item.get("type"),
item.get("execution_time"), item.get("target_percentage"))
else:
operation = Operation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"))
operations.append(operation)
operation_data = []
if data_.get("operation_data"):
try:
operation_data = [Datum.from_dict(datum) for datum in data_["operation_data"]]
except Exception as e:
operation_data = []
return NSMgrDeployment(name=data_.get("name"), type=data_.get("type"), id=data_.get("id"),
version=data_.get("version"),
url=data_.get("url"), env=data_.get("env"), sealId=data_.get("sealId"),
operations=operations,
operation_data=operation_data,
skip_kubernetes_scale=data_.get("skip_kubernetes_scale"),
skip_parameterstore_write=data_.get("skip_parameterstore_write"),
active=data_.get("active"))
except Exception as e:
self.logger.error(f"Exception in {inspect.currentframe().f_code.co_name}: {e}")
return None
def _load_from_json(self, json_str_):
try:
data = json.loads(json_str_)
return self._load(data)
except Exception as e:
self.logger.error(f"Exception: {e}")
return None
def _load_from_yaml(self, yaml_str_):
try:
data = yaml.safe_load(yaml_str_)
return self._load(data)
except Exception as e:
self.logger.error(f"Exception: {e}")
return None
def _load_from_ps(self, parameter_store_key) -> Optional[NSMgrDeployment]:
""" Load deployment configuration from Parameter Store using the injected SSM client. """
try:
response = self.ssm_client.get_parameter(Name=parameter_store_key, WithDecryption=True)
parameter_value = response['Parameter']['Value']
ret = self.load_from_string(parameter_value)
if ret is None:
self.logger.info(f"Could not load or parse {parameter_store_key}")
return None
self.logger.info(f"{parameter_store_key} read.")
ret.source = parameter_value
ret.parameter_store_key = parameter_store_key
return ret
except ClientError as e:
if e.response['Error']['Code'] == "ParameterNotFound":
self.logger.info(f"Parameter {parameter_store_key} not found.")
else:
self.logger.error(f"Error when retrieving {parameter_store_key}: {e}")
return None
except Exception as e:
self.logger.error(f"General exception: {e}")
return None
def _load_from_parameterstore(self, parameter_store_key_: str):
try:
housekeeper_deployment = self._load_from_ps(parameter_store_key_)
if housekeeper_deployment:
housekeeper_deployment_operation_data = self._load_from_ps(parameter_store_key_ + "_data")
if housekeeper_deployment_operation_data:
housekeeper_deployment.operation_data = housekeeper_deployment_operation_data.operation_data
return housekeeper_deployment
else:
return None
except Exception as e:
self.logger.error(f"Exception: {e}")
return None
def collect_applies(self, current_deployment, deployments_to_apply):
try:
# Check !!!
if not current_deployment:
return
strings = current_deployment.source.splitlines()
deployments_to_apply.append(current_deployment)
for s in strings:
if s.startswith("#apply "):
import_key = s[len("#apply "):]
print(f"******** applying {import_key}")
d = self._load_from_parameterstore(import_key)
# deployments_to_apply.append(d)
self.collect_applies(d, deployments_to_apply)
except Exception as e:
self.logger.error(f"Exception: {e}")
def load_deployment_with_applies(self, key_):
try:
deployment = self._load_from_parameterstore(key_)
deployments_to_apply = [] # .append
self.collect_applies(deployment, deployments_to_apply)
for d in deployments_to_apply:
if d is not deployment:
deployment.apply_operations_and_data(d)
return deployment
except Exception as e:
self.logger.error(f"Exception: {e}")
return None
# Loads a HouseKeeperDeployment object from dict data_.
def save_to_parameterstore(self, housekeeper_deployment):
if housekeeper_deployment is None:
return
# str_serialized = self.to_str(skip_operations=False, skip_data=True)
parameter_store_key_ = housekeeper_deployment.url
# self._save_to_ps(parameter_store_key_, str_serialized)
str_serialized_data = housekeeper_deployment.to_str(skip_operations=True, skip_data=False)
self._save_to_ps(parameter_store_key_ + "_data", str_serialized_data)
def _save_to_ps(self, parameter_store_key_, str_serialized_):
try:
ssm_client = boto3.client("ssm", region_name=Utils.get_region())
parameter_store_proxy = ParameterStoreClientProxy(client_=ssm_client)
response = parameter_store_proxy.put_parameter(parameter_store_key_, "", str_serialized_, dry_run_=False)
self.logger.info(f"Parameter {parameter_store_key_} set.")
return response
except ClientError as e:
self.logger.error(f"Exception: {e}")
return None
{ "channelName": "AWM-ServiceConnect-Zoom1-89749", "channelType": "AUXILIARY", "channelId": "id", "channelVersion": "1.0.1", "channelUrl": "/application/s2t/esp/rt/channelConfig/us-east-2/1.0.1/AWM-ServiceConnect-Zoom1-89749", "subscriberConfig": { "id": "Standard 1.0", "subscriberTelephoneNumbersSet": [ ], "subscriberTelephoneNumberURLsSet": [] }, "requestConfig": { "id": "Standard 1.0", "persistAudioToS3": false, "persistTranscriptionsToS3": true, "persistToS3BucketName": "app-id-104120-dep-id-104121-uu-id-og7nwvnq9ykf", "enableTranscribe": true, "enablePartialResults": false, "persistToDb": false, "persistToWebsocket": false, "publishToConsole": true, "publishToClients": true, "publishToClientsServerType": "GKS", "publishToClientsServerName": "NAU1720", "publishToClientsTopic": "s2t_cda_p2_realtime_awm_serviceconnect_1-na1720",
"publishTelemetryToS3": true,
"publishTelemetryToS3BucketARN": "104120-110663-pxvwf6-tagpg8igq3jfpuu6pspbf5x15mp4huse2b-s3alias",
"enableThreadModel2": true,
"enableTranscribeProxy": false,
"awsCurrentRegion": "US_EAST_2",
"startSelectorType": "NOW"
},
"realTimeTranscriptionConfig": {
"id": "Standard 1.0",
"contentIdentificationType": "",
"contentRedactionType": "",
"enableChannelIdentification": true,
"enablePartialResultsStabilization": false,
"identifyLanguage": false,
"languageOptions": "",
"languageCode": "EN_US",
"languageModelName": "",
"partialResultsStability": "",
"piiEntityTypes": "",
"preferredLanguage": "",
"mediaSamplingRate": 32000,
"mediaEncoding": "PCM",
"numberOfChannels": 2,
"showSpeakerLabel": false,
"vocabularyName": "s2t-cda-uat-p3-realtime-customvocab-na-1",
"vocabularyFilterName": ""
}
}
-- channelConfig definition
CREATE TABLE channelConfig ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, name TEXT(256) NOT NULL, regionId INTEGER NOT NULL, envId INTEGER NOT NULL, clientId INTEGER NOT NULL, createDateTime TEXT, updateDateTime INTEGER, description TEXT(256), CONSTRAINT channelConfig_UN UNIQUE (id), CONSTRAINT channelConfig_FK_1 FOREIGN KEY (clientId) REFERENCES clients(id), CONSTRAINT channelConfig_FK FOREIGN KEY (envId) REFERENCES envs(id), CONSTRAINT channelConfig_FK_2 FOREIGN KEY (regionId) REFERENCES regions(id) );
import inspect import json import logging from typing import Optional, Union
import yaml from botocore.exceptions import ClientError
from eks_hk.awm.app.models.deployments.nsDeployment import NSDeployment from eks_hk.awm.app.models.operations.datum import Datum from eks_hk.awm.app.models.operations.defineTransactionIdPrefixOperation import DefineTransactionIdPrefixOperation from eks_hk.awm.app.models.operations.immediateScaleOperation import ImmediateScaleOperation from eks_hk.awm.app.models.operations.operation import Operation from eks_hk.awm.app.models.operations.scaleOperation import ScaleOperation from eks_hk.awm.app.models.operations.transactionBeginOperation import TransactionBeginOperation from eks_hk.awm.app.models.operations.transactionEndOperation import TransactionEndOperation from eks_hk.awm.app.models.operations.watcherOperation import WatcherOperation from eks_hk.awm.app.models.operations.watermarkOperation import WatermarkOperation from eks_hk.awm.app.services.platform.aws_service import AWSService from eks_hk.awm.app.services.platform.kubernetes_service import KubernetesService from eks_hk.awm.app.utils.string_utils import StringUtils
class NSDeploymentFactory: LOGGER = logging.getLogger(name)
@staticmethod
def make_new(ssm_client_) -> "NSDeploymentFactory":
nseploymentFactory = None
try:
nseploymentFactory = NSDeploymentFactory(ssm_client_)
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception when creating HousekeeperController instance: {e}")
return nseploymentFactory
def __init__(self, ssm_client_):
self._ssm_client = ssm_client_
def makeNSDeployment(self, key_: str,
ssm_client_,
stringUtils_:StringUtils,
awsService_: AWSService,
kubernetesService_:KubernetesService
) -> Union[NSDeployment, None]:
try:
deployment = self._load_from_parameterstore(parameter_store_key_=key_, ssm_client_ = ssm_client_, stringUtils_ = stringUtils_, awsService_ = awsService_,
kubernetesService_ = kubernetesService_)
deployment.initialize(ssm_client_ = ssm_client_, stringUtils_ = stringUtils_, awsService_ = awsService_,
kubernetesService_ = kubernetesService_)
deployments_to_apply = [] # .append
self.collect_applies(deployment, deployments_to_apply, ssm_client_ = ssm_client_, stringUtils_ = stringUtils_, awsService_ = awsService_,
kubernetesService_ = kubernetesService_)
for d in deployments_to_apply:
if d is not deployment:
deployment.apply_operations_and_data(d)
return deployment
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception: {e}")
return None
def _load_from_parameterstore(self, parameter_store_key_: str,
ssm_client_,
stringUtils_:StringUtils,
awsService_: AWSService,
kubernetesService_:KubernetesService):
try:
housekeeper_deployment = self._load_from_ps(parameter_store_key_, ssm_client_ = ssm_client_, stringUtils_ = stringUtils_, awsService_ = awsService_,
kubernetesService_ = kubernetesService_)
if housekeeper_deployment:
housekeeper_deployment_operation_data = self._load_from_ps(parameter_store_key_ + "_data", ssm_client_ = ssm_client_, stringUtils_ = stringUtils_, awsService_ = awsService_,
kubernetesService_ = kubernetesService_)
if housekeeper_deployment_operation_data:
housekeeper_deployment.operation_data = housekeeper_deployment_operation_data.operation_data
return housekeeper_deployment
else:
return None
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception: {e}")
return None
def _load_from_ps(self, parameter_store_key,
ssm_client_,
stringUtils_:StringUtils,
awsService_: AWSService,
kubernetesService_:KubernetesService) -> Optional[NSDeployment]:
""" Load deployment configuration from Parameter Store using the injected SSM client. """
try:
response = self._ssm_client.get_parameter(Name=parameter_store_key, WithDecryption=True)
parameter_value = response['Parameter']['Value']
ret = self.load_from_string(parameter_value, ssm_client_ = ssm_client_, stringUtils_ = stringUtils_, awsService_ = awsService_,
kubernetesService_ = kubernetesService_)
if not ret:
NSDeploymentFactory.LOGGER.info(f"Could not load or parse {parameter_store_key}")
return None
NSDeploymentFactory.LOGGER.info(f"{parameter_store_key} read.")
ret.source = parameter_value
ret.parameter_store_key = parameter_store_key
return ret
except ClientError as e:
if e.response['Error']['Code'] == "ParameterNotFound":
NSDeploymentFactory.LOGGER.info(f"Parameter {parameter_store_key} not found.")
else:
NSDeploymentFactory.LOGGER.error(f"Error when retrieving {parameter_store_key}: {e}")
return None
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"General exception: {e}")
return None
def collect_applies(self, current_deployment_, deployments_to_apply_,
ssm_client_,
stringUtils_:StringUtils,
awsService_: AWSService,
kubernetesService_:KubernetesService):
try:
if not current_deployment_:
return
strings = current_deployment_.source.splitlines()
deployments_to_apply_.append(current_deployment_)
for s in strings:
if s.startswith("#apply "):
import_key = s[len("#apply "):]
print(f"******** applying {import_key}")
d = self._load_from_parameterstore(import_key, ssm_client_ = ssm_client_, stringUtils_ = stringUtils_, awsService_ = awsService_,
kubernetesService_ = kubernetesService_)
# deployments_to_apply.append(d)
self.collect_applies(d, deployments_to_apply_, ssm_client_ = ssm_client_, stringUtils_ = stringUtils_, awsService_ = awsService_,
kubernetesService_ = kubernetesService_)
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception: {e}")
def load_from_string(self, input_str_,
ssm_client_,
stringUtils_:StringUtils,
awsService_: AWSService,
kubernetesService_:KubernetesService) -> Optional[NSDeployment]:
""" Decides whether to load from JSON or YAML based on the content format. """
if stringUtils_.is_json(input_str_):
ret = self.load_from_json(input_str_, ssm_client_ = ssm_client_, stringUtils_ = stringUtils_, awsService_ = awsService_,
kubernetesService_ = kubernetesService_)
if ret:
ret._is_yaml = False
ret._is_json = True
return ret
elif stringUtils_.is_yaml(input_str_):
ret = self.load_from_yaml(input_str_, ssm_client_ = ssm_client_, stringUtils_ = stringUtils_, awsService_ = awsService_,
kubernetesService_ = kubernetesService_)
if ret:
ret._is_yaml = True
ret._is_json = False
return ret
else:
NSDeploymentFactory.LOGGER.error("Input string is neither valid JSON nor YAML.")
return None
def _load_from_json(self, json_str_,
ssm_client_,
stringUtils_:StringUtils,
awsService_: AWSService,
kubernetesService_:KubernetesService
):
try:
data = json.loads(json_str_)
return self._load(data, ssm_client_ = ssm_client_, stringUtils_ = stringUtils_, awsService_ = awsService_,
kubernetesService_ = kubernetesService_)
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception: {e}")
return None
def _load_from_yaml(self, yaml_str_,
ssm_client_,
stringUtils_:StringUtils,
awsService_: AWSService,
kubernetesService_:KubernetesService
):
try:
data = yaml.safe_load(yaml_str_)
return self._load(data, ssm_client_ = ssm_client_, stringUtils_ = stringUtils_, awsService_ = awsService_,
kubernetesService_ = kubernetesService_)
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception: {e}")
return None
def load_from_json(self, json_str_,
ssm_client_,
stringUtils_:StringUtils,
awsService_: AWSService,
kubernetesService_:KubernetesService) -> Optional[NSDeployment]:
""" Loads a deployment from a JSON string. """
try:
data = json.loads(json_str_)
return self._load(data, ssm_client_ = ssm_client_, stringUtils_ = stringUtils_, awsService_ = awsService_,
kubernetesService_ = kubernetesService_)
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception in load_from_json: {e}")
return None
def load_from_yaml(self, yaml_str_,
ssm_client_,
stringUtils_:StringUtils,
awsService_: AWSService,
kubernetesService_:KubernetesService) -> Optional[NSDeployment]:
""" Loads a deployment from a YAML string. """
try:
data = yaml.safe_load(yaml_str_)
return self._load(data, ssm_client_ = ssm_client_, stringUtils_ = stringUtils_, awsService_ = awsService_,
kubernetesService_ = kubernetesService_)
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception in load_from_yaml: {e}")
return None
def _load(self, data_,
ssm_client_,
stringUtils_:StringUtils,
awsService_: AWSService,
kubernetesService_:KubernetesService
) -> Optional[NSDeployment]:
""" Internal method to reconstruct HouseKeeperDeployment from data. """
try:
operations = []
if data_.get("operations"):
for item in data_["operations"]:
if item["type"] == "ScaleOperation":
operation = ScaleOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"), item.get("watermark_id"))
elif item["type"] == "WatcherOperation":
operation = WatcherOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("last_target_scale_id"))
elif item["type"] == "ImmediateScaleOperation":
operation = ImmediateScaleOperation(item.get("id"), item.get("type"), item.get("scale_to"))
elif item["type"] == "WatermarkOperation":
operation = WatermarkOperation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"), item.get("watermark_id"))
elif item["type"] == "TransactionBeginOperation":
operation = TransactionBeginOperation(item.get("id"), item.get("type"),
item.get("execution_time"), item.get("target_percentage"))
elif item["type"] == "DefineTransactionIdPrefixOperation":
operation = DefineTransactionIdPrefixOperation(item.get("id"), item.get("type"),
item.get("execution_time"),
item.get("target_percentage"),
item.get("transaction_id_prefix"))
elif item["type"] == "TransactionEndOperation":
operation = TransactionEndOperation(item.get("id"), item.get("type"),
item.get("execution_time"), item.get("target_percentage"))
else:
operation = Operation(item.get("id"), item.get("type"), item.get("execution_time"),
item.get("target_percentage"))
operation.initialize(ssm_client_ = ssm_client_, stringUtils_ = stringUtils_, awsService_ = awsService_,
kubernetesService_ = kubernetesService_)
operations.append(operation)
operation_data = []
if data_.get("operation_data"):
try:
operation_data = [Datum.from_dict(datum) for datum in data_["operation_data"]]
except Exception as e:
operation_data = []
return NSDeployment(name=data_.get("name"), type=data_.get("type"), id=data_.get("id"),
version=data_.get("version"),
url=data_.get("url"), env=data_.get("env"), sealId=data_.get("sealId"),
operations=operations,
operation_data=operation_data,
skip_kubernetes_scale=data_.get("skip_kubernetes_scale"),
skip_parameterstore_write=data_.get("skip_parameterstore_write"),
active=data_.get("active"))
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception in {inspect.currentframe().f_code.co_name}: {e}")
return None
import logging from typing import Optional
from eks_hk.awm.app.models.deployments.nsDeployment import NSDeployment from eks_hk.awm.app.services.deployments.nsDeploymentFactory import NSDeploymentFactory from eks_hk.awm.app.services.platform.aws_service import AWSService from eks_hk.awm.app.services.platform.kubernetes_service import KubernetesService from eks_hk.awm.app.utils.log_event import log_housekeeper_event from eks_hk.awm.app.utils.string_utils import StringUtils
class NSDeploymentManager: LOGGER = logging.getLogger(name)
def __init__(self, ssm_client_, nsDeploymentFactory_:NSDeploymentFactory,
stringUtils_:StringUtils, awsService_: AWSService, kubernetesService_:KubernetesService
):
self._map_kubedeploymentnames_to_nsdeploymentinstances = {}
self._nsDeploymentFactory = nsDeploymentFactory_
self._ssm_client = ssm_client_
self._stringUtils = stringUtils_
self._awsService = awsService_
self._kubernetesService = kubernetesService_
def get_deployment_instances_list(self):
return list(self._map_kubedeploymentnames_to_nsdeploymentinstances.items())
def load_deployment(self, deployment_name_) -> Optional[NSDeployment]:
try:
prefix = self._awsService.get_parameter_store_prefix()
region = self._kubernetesService.get_region()
cluster = self._kubernetesService.get_cluster_name()
namespace = self._kubernetesService.get_namespace()
VERSION = "1.0.0"
NSDeploymentManager.LOGGER.info(
f"load_deployment {prefix}/{region}/{VERSION}/{cluster}/{namespace}/{deployment_name_}")
parameter_store_key = f"{prefix}/{region}/{VERSION}/{cluster}/{namespace}/{deployment_name_}"
ret = self._nsDeploymentFactory.makeNSDeployment(parameter_store_key,
ssm_client_=self._ssm_client,
stringUtils_=self._stringUtils,
kubernetesService_=self._kubernetesService,
awsService_=self._awsService)
if not ret:
log_housekeeper_event(event="load_deployment", operation="", eventRetCode_=0,
deployment_=deployment_name_,
target_percentage_="", current_num_of_pods_="", target_num_of_pods_="",
execution_time_=0, message_="Could not load")
return None
log_housekeeper_event(event="load_deployment", operation="", eventRetCode_=0,
deployment_=deployment_name_, target_percentage_="",
current_num_of_pods_="", target_num_of_pods_="", execution_time_=0,
message_=f"Loading deployment {deployment_name_} in {namespace} {cluster} {region} ")
ret.deployment_name = deployment_name_
ret.housekeeper_deployment = ret
NSDeploymentManager.LOGGER.info("{}".format("- - " * 10))
NSDeploymentManager.LOGGER.info(f"Deployment from {parameter_store_key} was resolved as:\n")
NSDeploymentManager.LOGGER.info(ret)
NSDeploymentManager.LOGGER.info("{}".format("- - " * 10))
self._map_kubedeploymentnames_to_nsdeploymentinstances[deployment_name_] = ret
return ret
except Exception as e:
NSDeploymentManager.LOGGER.error(f"Exception: {e}")
self._map_kubedeploymentnames_to_nsdeploymentinstances[deployment_name_] = None
return None
ERROR:main:Failed to upsert data for AWM-AdvisorCallTranscript-Zoom1-109725: ON CONFLICT clause does not match any PRIMARY KEY or UNIQUE constraint
assume SQLLIte database with an empty table defined by DDL below.
sql_upsert = """ INSERT INTO channelConfig (name, regionId, envId, clientId, description) VALUES (?, ?, ?, ?, ?) ON CONFLICT(name) DO UPDATE SET regionId = excluded.regionId, envId = excluded.envId, clientId = excluded.clientId, description = excluded.description; """ The input variables are: clientId = {int} 1 description = {str} 'Configuration for AWM-AdvisorCallTranscript-Zoom1-109725' envId = {int} 1 name = {str} 'AWM-AdvisorCallTranscript-Zoom1-109725' regionId = {int} 1
The python code is: def upsert_data(name, regionId, envId, clientId, description): """ Upsert data into the SQLite database. """ try: cursor.execute(sql_upsert, (name, regionId, envId, clientId, description)) conn.commit() logger.info(f"Upserted data for {name}") except Exception as e: logger.error(f"Failed to upsert data for {name}: {e}")
DDL: -- channelConfig definition
CREATE TABLE channelConfig ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, name TEXT(256) NOT NULL, regionId INTEGER NOT NULL, envId INTEGER NOT NULL, clientId INTEGER NOT NULL, createDateTime TEXT, updateDateTime INTEGER, description TEXT(256), CONSTRAINT channelConfig_UN UNIQUE (id), CONSTRAINT channelConfig_FK_1 FOREIGN KEY (clientId) REFERENCES clients(id), CONSTRAINT channelConfig_FK FOREIGN KEY (envId) REFERENCES envs(id), CONSTRAINT channelConfig_FK_2 FOREIGN KEY (regionId) REFERENCES regions(id) );
The error is: OperationalError('ON CONFLICT clause does not match any PRIMARY KEY or UNIQUE constraint')
import logging from threading import Thread
import boto3 from flask import Flask, request, jsonify from flask_cors import CORS from s2t.infra.auto.services.aws.parameterStore.parameterStoreClientProxy import ParameterStoreClientProxy from s2t.infra.auto.services.log.logController import LogController
from eks_hk.awm.app.services.nsMgrController import NSMgrController from eks_hk.awm.app.utils import Utils
import pydevd_pycharm
pydevd_pycharm.settrace('0.0.0.0', port=12345, stdoutToServer=True, stderrToServer=True, suspend=False)
==============================================================================
NSMgrLauncher
- invoked using two parameters
-- env = [ DEV | UAT | PRD ]
-- created date = 2022-10-01
- loads properties file based on passed in env
- invokes the artifactHelper, passing in full argument list
==============================================================================
class NSMgrLauncher: PROPERTIES_PATH = "eks_hk.awm.configs" PROPERTIES_ROOT_NAME = "awm_sa_pipeline.properties" PROPERTIES_SECTION_NAME = "sales_assist" LOGGER = logging.getLogger(name)
app = Flask(name) CORS(app)
service_is_ready = False service_is_alive = False
def background_task(): nsMgrLauncher = NSMgrController.make() if nsMgrLauncher is None: NSMgrLauncher.LOGGER.info("Failed to create NSMgrController instance") else:
Use the housekeeperController instance
@app.route("/readiness") def readiness_probe(): return "Service is ready", 200
@app.route("/liveness") def liveness_probe(): return "Service is alive", 200
def run_app(): app.run(host="0.0.0.0", port=5000)
@app.route("/get-markup-data") def get_markup_data(): try: markup_data = [] for name, value in Utils.get_parameters_by_path(): markup_type = "yaml" if Utils.is_json(value): markup_type = "json" markup_data.append({"markup_string": value, "key": name, "format": markup_type}) sorted_markup_data = sorted(markup_data, key=lambda x: x.get("key")) return jsonify(sorted_markup_data) except Exception as e: NSMgrLauncher.LOGGER.error("Exception: {e}") return jsonify({"message": "Error getting markup data"}), 500
@app.route("/update-markup-data", methods=["POST"]) def update_markup_data(): try: data = request.get_json() index = data.get("index") parameter_storekey = data.get("key") edited_markup = data.get("markup_string") NSMgrLauncher.LOGGER.info(f"New markup content: {edited_markup}") ssm_client = boto3.client("ssm", region_name=Utils.get_region()) parameter_storeproxy = ParameterStoreClientProxy(client=ssm_client) response = parameter_store_proxy.put_parameter(parameter_storekey, "", edited_markup, dryrun=False) return jsonify({"message": "markup data updated."}) except Exception as e: NSMgrLauncher.LOGGER.error("Exception: {e}") return jsonify({"message": "Error updating markup data"}), 500
if name == "main":
logging.basicConfig(level=logging.DEBUG)