Open afaisman opened 3 months ago
``import datetime import logging from typing import Optional
import pytz import yaml
from eks_hk.nsmanager.models.deployments.datum import Datum from eks_hk.nsmanager.utils.string_utils import StringUtils from eks_hk.nsmanager.utils.time_utils import TimeUtils
class NSData: """ Manages the data related to a specific network deployment. Provides mechanisms to serialize deployment data. The purpose of the class is to encapsulate and manage ata related to a specific deployment.
Main API:
set_datum(self, data_id, value): Sets or updates the value of a specific datum. """ LOGGER = logging.getLogger(name)
def init(self, name, type, id, version, url, env, sealId, operations, operation_data, time_created=datetime.datetime.now(pytz.utc).astimezone(pytz.timezone("America/New_York"))): self.name = name self.type = type self.id = id self.version = version self.url = url self.env = env self.sealId = sealId self.operations = operations self.operation_data = operation_data self.source = None self.parameter_store_key = None self._transaction_id = None self._transaction_id_prefix = "default" self.time_created = time_created
def initialize(self, _string_utils: StringUtils): self._string_utils = _string_utils
def str(self): try: return f"{self.id}\n{self.parameter_store_key}\n {self.to_yaml(False, False)}" except Exception as e: NSData.LOGGER.error(f"Exception: {e}") return 'count not serialize NSData'
def to_dict(self): try: operations_dicts = [] for operation in self.operations: operations_dicts.append(operation.to_dict())
operation_data_dicts = []
for data in self.operation_data:
operation_data_dicts.append(data.to_dict())
return {"name": self.name, "type": self.type, "id": self.id, "version": self.version, "url": self.url,
"env": self.env, "sealId": self.sealId, "operations": operations_dicts,
"operation_data": operation_data_dicts}
except Exception as e:
NSData.LOGGER.error(f"Exception: {e}")
return None
def to_yaml(self, skipoperations: bool, skipdata: bool) -> str: try: dict_dump = self.to_dict() if skipoperations is True: dict_dump.pop("operations") if skipdata is True: dict_dump.pop("operation_data") return yaml.dump(dict_dump, sort_keys=False, default_flow_style=False) except Exception as e: NSData.LOGGER.error(f"Exception: {e}") return ""
def to_str(self, skipoperations: bool, skipdata: bool) -> str: try: return self.to_yaml(skipoperations, skipdata) except Exception as e: NSData.LOGGER.error(f"Exception: {e}") return ""
def get_datum(self, dataid: str) -> Optional[Datum]: if self.operation_data is None: return None return next((item for item in self.operation_data if item.data_id == dataid), None)
def remove_datum(self, dataid: str) -> bool: if self.operation_data is None: return False
item_to_remove = next((item for item in self.operation_data if item.data_id == data_id_), None)
if item_to_remove is None:
return False
self.operation_data.remove(item_to_remove)
return True
def set_datum(self, dataid: str, value=None) -> Optional[Datum]: try: datum = self.get_datum(dataid) if datum is not None: if value is not None: datum.set_value(value)
return datum
else:
new_datum = Datum(data_id=data_id_, value=value or 0, recorded_time=TimeUtils.get_est_now())
self.operation_data.append(new_datum)
return new_datum
except Exception as e:
NSData.LOGGER.error(f"Exception: {e}")
return None
`import logging
import yaml from kubernetes import client
from eks_hk.nsmanager.services.platform.aws_service import AWSService from eks_hk.nsmanager.services.platform.kubernetes_service import KubernetesService from eks_hk.nsmanager.utils.string_utils import StringUtils from eks_hk.nsmanager.utils.time_utils import TimeUtils, reset_to_full_hour, \ reset_to_previous_middle_of_hour, reset_to_half_hour, extract_minute_from_string, extract_minute_from_string_w_every
""" The Operation class manages operations with specific timing and target specifications. It supports:
to_dict
and from_dict
.execute
and is_relevant
.get_n_pods
and scale_deployment
.
Designed for structured execution of timed operations within a Kubernetes and AWS environment, with extensive logging and error handling.
"""class Operation: LOGGER = logging.getLogger(name)
def __init__(self, id, type, execution_time, target_percentage, time_span=None):
self.id = id
self.type = type
self.execution_time = execution_time
self.target_percentage = target_percentage
self.time_span = 180
def initialize(self, ssm_client_, string_utils_: StringUtils, aws_service_: AWSService,
kubernetes_service_: KubernetesService):
self._ssm_client = ssm_client_
self._string_utils = string_utils_
self._aws_service = aws_service_
self._kubernetes_service = kubernetes_service_
def to_dict(self):
try:
return {
"id": self.id,
"type": self.type,
"execution_time": self.execution_time,
}
except Exception as e:
Operation.LOGGER.error(f"Exception: {e}")
return None
@classmethod
def from_dict(cls, data_dict):
try:
return cls(**data_dict)
except Exception as e:
Operation.LOGGER.error(f"Exception: {e}")
return None
# Base method. If relevant (means current_time_ overlaps with the lifespan), them log.
# Returns tuple of booleans (Succeded, Executed, Data were updated)
def execute(self, current_time_, housekeeper_deployment_) -> (bool, bool, bool):
try:
if self.is_relevant(current_time_):
return True, True, False
else:
return True, False, False
except Exception as e:
Operation.LOGGER.error(f"Exception: {e}")
return False, False, False
# Returns True if the given operaiton is relevant == current_time_ belongs to the operation's live span
def is_relevant(self, current_time_) -> bool:
try:
if self.execution_time.lower().startswith("now"):
return True
if self.execution_time.lower() == "Hour".lower():
adjusted_execution_time = str(reset_to_full_hour(current_time_))
elif self.execution_time.lower() == "MiddleOfHour".lower():
adjusted_execution_time = str(reset_to_previous_middle_of_hour(current_time_))
elif self.execution_time.lower() == "HalfHour".lower():
adjusted_execution_time = str(reset_to_half_hour(current_time_))
elif not self.execution_time.lower().startswith("every_") and self.execution_time.lower().endswith("_min"):
min = extract_minute_from_string(self.execution_time.lower())
adj_t = current_time_.replace(minute=min)
adjusted_execution_time = str(adj_t)
elif self.execution_time.lower().startswith("every_") and self.execution_time.lower().endswith("_min"):
every_x_min = extract_minute_from_string_w_every(self.execution_time.lower())
minutes = current_time_.minute
multiple = round(minutes / every_x_min) * every_x_min
if multiple >= 60:
multiple -= every_x_min
adj_t = current_time_.replace(minute=multiple)
adjusted_execution_time = str(adj_t)
else:
adjusted_execution_time = TimeUtils.adjust_pattern_tine_to_current_time(current_time_,
self.execution_time)
if not adjusted_execution_time:
return False
## the operation is still relevant if (now()~(current_time_) - self.execution_time < self.time_span)
time_diff = TimeUtils.time_diff(current_time_, adjusted_execution_time)
# if time_diff.total_seconds() > 0 and time_diff.total_seconds() <= self.time_span:
if 0 < time_diff <= self.time_span:
return True
else:
return False
except Exception as e:
Operation.LOGGER.error(f"Exception: {e}")
return False
def __str__(self):
dict_dump = self.to_dict()
return yaml.dump(dict_dump, sort_keys=False, default_flow_style=False)
# Retrieves the number of pods for a given deployment.
def get_n_pods(self, housekeeper_deployment_) -> int:
namespace = self._kubernetes_service.get_namespace()
deployment_name = housekeeper_deployment_.name
try:
deployment = self._kubernetes_service.read_namespaced_deployment(deployment_name, namespace)
Operation.LOGGER.info(
f"Deployement {housekeeper_deployment_.name} currently has {deployment.spec.replicas} replicas")
return deployment.spec.replicas
except client.exceptions.ApiException as e:
Operation.LOGGER.error("Kubernetes API error: " + str(e))
return 0
except Exception as e:
Operation.LOGGER.error(f"Error: {str(e)}")
return -1
def scale_deployment(self, housekeeper_deployment_, replicas) -> (bool, int):
try:
namespace = self._kubernetes_service.get_namespace()
deployment_name = housekeeper_deployment_.name
Operation.LOGGER.info(
f"scale_deployment namespace={namespace} deplayment_name={deployment_name}, replicas={replicas}")
deployment = self._kubernetes_service.read_namespaced_deployment(deployment_name, namespace)
current_num_of_pods = deployment.spec.replicas
Operation.LOGGER.info(f"Deployment {deployment_name} currently has {current_num_of_pods} replicas")
if current_num_of_pods != replicas:
deployment.spec.replicas = replicas
self._kubernetes_service.replace_namespaced_deployment(deployment_name_=deployment_name,
namespace_=namespace, body_=deployment)
Operation.LOGGER.info("Deployment scaled.")
return True, current_num_of_pods
else:
Operation.LOGGER.info("No need to scale")
return True, current_num_of_pods
except Exception as e:
Operation.LOGGER.error("Error: " + str(e))
return False
`
`import calendar import datetime import json import logging from datetime import timedelta from functools import lru_cache
import dateparser import pytz import yaml from dateutil.tz import tzlocal
class TimeUtils: LOGGER = logging.getLogger(name)
days_of_week = {"Sunday": 0, "Monday": 1, "Tuesday": 2, "Wednesday": 3, "Thursday": 4, "Friday": 5, "Saturday": 6}
@staticmethod
def beginning_of_week_from_time(some_time):
week_day = some_time.weekday()
wd = (week_day + 1) % 7
beginning_of_week__ = some_time - datetime.timedelta(days=wd)
beginning_of_week = datetime.datetime(beginning_of_week__.year, beginning_of_week__.month, beginning_of_week__.day, 0, 0)
return beginning_of_week
@staticmethod
def normalize_hh_mm(time_str):
"""Converts a time string to a normalized 'HH:MM' format.
Parameters:
- time_str (str): A string representing the time, which may or may not include minutes.
Returns:
- str: A time string in 'HH:MM' format."""
parts = time_str.split(":")
# If there's no colon, assume it's just an hour and add ":00"
if len(parts) == 1:
# Pad single digit hour with 0 to match 'HH:MM' format
return f"{time_str.zfill(2)}:00"
elif len(parts) == 2:
# Pad single digit hour/minute with 0 if necessary
parts[0] = parts[0].zfill(2)
parts[1] = parts[1].zfill(2)
return ":".join(parts)
else:
raise ValueError("Time string is not in a recognized format")
@staticmethod
def seconds_since_beginning_of_week(some_time):
week_day = some_time.weekday()
wd = (week_day + 1) % 7
beginning_of_week__ = some_time - datetime.timedelta(days=wd)
beginning_of_week = datetime.datetime(beginning_of_week__.year, beginning_of_week__.month, beginning_of_week__.day, 0, 0)
time_difference = some_time - beginning_of_week
seconds_passed_since_beginning_of_the_week = time_difference.total_seconds()
return seconds_passed_since_beginning_of_the_week
@staticmethod
def is_json(myjson):
try:
json.loads(myjson)
except Exception as e:
# Utils.LOGGER.info(f'this is not json ({e})')
return None
return True
@staticmethod
def is_yaml(myyaml):
try:
loaded_yaml = yaml.safe_load(myyaml)
# Additionally, check if the YAML content is a dictionary or a list,
# as YAML can also be a simple scalar which we may not want to count as valid YAML content.
is_valid_yaml_type = isinstance(loaded_yaml, (dict, list))
return is_valid_yaml_type and loaded_yaml is not None
except Exception as e:
TimeUtils.LOGGER.info(f"this is not yaml ({e})")
return None
@staticmethod
def datetime_info(dt):
formatted_info = f"{dt.strftime('%A, %B %d, %Y %H:%M:%S')} - " f"Year: {dt.year}, Month: {dt.month}, Day: {dt.day}, " f"{dt.strftime('%A')}"
# Print the compact formatted information
return formatted_info
@staticmethod
@lru_cache(maxsize=512) # Cache the most recent 512 calls
def parse_to_aware(datetime_str, default_timezone=tzlocal()):
# Parse the string into a datetime object
dt = dateparser.parse(datetime_str)
# If the datetime object is naive (has no timezone info), set the default timezone
if dt.tzinfo is None:
dt = dt.replace(tzinfo=default_timezone)
return dt
@staticmethod
def time_difference(time_str1, time_str2):
dt1 = TimeUtils.parse_to_aware(time_str1)
dt2 = TimeUtils.parse_to_aware(time_str2)
time_diff = dt1 - dt2
return time_diff
@staticmethod
def calculate_time_difference_in_sec(dt1, dt2):
# Convert both datetime objects to UTC to avoid timezone-related errors
dt1_utc = dt1.astimezone(pytz.utc)
dt2_utc = dt2.astimezone(pytz.utc)
# Calculate the time difference
time_difference = (dt1_utc - dt2_utc).total_seconds()
return time_difference
@staticmethod
def time_diff(dt1, time_str2):
if not isinstance(time_str2, datetime.datetime):
dt2 = TimeUtils.parse_to_aware(time_str2)
else:
dt2 = time_str2
time_diff = TimeUtils.calculate_time_difference_in_sec(dt1, dt2)
return time_diff
# time_diff = abs(dt1 - dt2)
# if dt2 > dt1:
# return -time_diff.total_seconds()
# else:
# return time_diff.total_seconds()
@staticmethod
def get_est_now():
# Set the timezone to Eastern Time (will handle EST/EDT automatically)
eastern = pytz.timezone("America/New_York")
# Get the current time in UTC
now_utc = datetime.datetime.now(pytz.utc)
# Convert the current time in UTC to Eastern Time
now_est = now_utc.astimezone(eastern)
return now_est
@staticmethod
def dry_run(time1, time2, time_step_in_sec, callback=None):
dt1 = TimeUtils.parse_to_aware(time1)
dt2 = TimeUtils.parse_to_aware(time2)
# Start the loop
current_time = dt1
while current_time <= dt2:
# Print current simulated time
# print(current_time.strftime('%Y-%m-%d %H:%M:%S'))
if callback:
callback(current_time)
current_time += timedelta(seconds=time_step_in_sec)
@staticmethod
def days_of_week_substring(s):
days_of_week = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]
for day in days_of_week:
if day.lower() in s.lower():
return day
return None
@staticmethod
def is_weekday(s):
weekdays = ["monday", "tuesday", "wednesday", "thursday", "friday"]
ls = s.lower()
if ls in weekdays:
return True
return False
@staticmethod
def adjust_pattern_tine_to_current_time(current_time, pattern_tine):
# if possible, replace the pattern time pattern_tine to the current data
current_day_of_week = calendar.day_name[current_time.weekday()]
if "anyday" in pattern_tine.lower():
pattern_tine = pattern_tine.lower().replace("anyday", current_day_of_week)
elif TimeUtils.is_weekday(current_day_of_week.lower()) and "weekday" in pattern_tine.lower():
pattern_tine = pattern_tine.lower().replace("weekday", current_day_of_week)
elif not TimeUtils.is_weekday(current_day_of_week.lower()) and ("weekday" in pattern_tine.lower()):
return None
# weekday_substring may be sunday...momday or None
weekday_substring = TimeUtils.days_of_week_substring(pattern_tine)
if not weekday_substring:
return pattern_tine
if weekday_substring.lower() == current_day_of_week.lower():
pattern_tine = pattern_tine.lower().replace(weekday_substring.lower(), current_time.strftime("%Y-%m-%d"))
return pattern_tine
@staticmethod
def update_list_by_id(original_list, update_list):
"""
Updates the original list with the update list.
If an object with the same id exists in both lists, it will be replaced.
If an object from the update list does not exist in the original list, it will be added.
:param original_list: The original list.
:param update_list: The list to update the original list with.
:return: The updated list of operations.
"""
# Create a dictionary from the original list using the id as the key
items_dict = {}
for item in original_list:
items_dict[item.id] = item
# Update the dictionary with the operations from the update list
for item in update_list:
items_dict[item.id] = item
# Extract the values from the dictionary and return as a list
updated_list = []
for key in items_dict:
updated_list.append(items_dict[key])
return updated_list
@staticmethod
def apply_list_by_id(original_list, update_list):
"""
Apply list on top of another list.
If an object with the same id exists in both lists, it will be skipped.
If an object from the update list does not exist in the original list, it will be added.
:param original_list: The original list.
:param update_list: The list to update the original list with.
:return: The updated list of operations.
"""
if not update_list:
return original_list
# Create a dictionary from the original list using the id as the key
items_dict = {}
for item in original_list:
items_dict[item.id] = item
# Update the dictionary with the items from the update list
# Skip if the item id is already in the original list
for item in update_list:
if item.id not in items_dict:
items_dict[item.id] = item
# Extract the values from the dictionary and return as a list
updated_list = []
for key in items_dict:
updated_list.append(items_dict[key])
return updated_list
@staticmethod
def convert_prefix(prefix):
# daily_prefix='dailyprefix_', weekly_prefix='weeklyprefix_'
today = TimeUtils.get_est_now()
if prefix.lower() == "dailyprefix_":
daily_date = today.strftime("%a_%b%d_")
return daily_date
elif prefix.lower() == "hourlyprefix_":
hourly_date = today.strftime("%a_%b%d_%Hh_")
return hourly_date
elif prefix.lower() == "weeklyprefix_":
start_of_week = today - timedelta(days=today.weekday() + 1) # Assumes week starts on Monday
end_of_week = start_of_week + timedelta(days=6)
weekly_date = start_of_week.strftime("%a_%b%d_") + end_of_week.strftime("%a_%b%d_")
return weekly_date
else:
return prefix
def reset_to_full_hour(dt): """ Adjusts a given datetime object back to the beginning of the hour.
Parameters:
dt (datetime): The datetime object to adjust.
Returns:
datetime: A datetime object adjusted to the beginning of the hour.
"""
# Replace minutes and seconds with 0 to get the start of the hour
return dt.replace(minute=0, second=0, microsecond=0)
def reset_to_half_hour(dt): """ Adjusts a given datetime object back to the previous half of the hour.
Examples:
- If the minutes are 00-29, it will adjust back to 00.
- If the minutes are 30-59, it will adjust back to 30.
Parameters:
dt (datetime): The datetime object to adjust.
Returns:
datetime: A datetime object adjusted to the previous half of the hour.
"""
# Replace minutes with 0 or 30 depending on the current minute, seconds with 0
half_hour = 0 if dt.minute < 30 else 30
return dt.replace(minute=half_hour, second=0, microsecond=0)
def reset_to_previous_middle_of_hour(dt): """ Adjusts a given datetime object back to the middle of the current hour or the previous one.
Examples:
- If the minutes are 00-29, it will adjust back to 30 minutes of the previous hour.
- If the minutes are 30-59, it will adjust back to 30 minutes of the current hour.
Parameters:
dt (datetime): The datetime object to adjust.
Returns:
datetime: A datetime object adjusted to the middle of the current or previous hour.
"""
if dt.minute < 30:
# If minutes are less than 30, go back one hour and set minutes to 30
try:
return dt.replace(hour=dt.hour - 1, minute=30, second=0, microsecond=0)
except Exception as e:
dt_30_min_before = dt - timedelta(minutes=30)
return dt_30_min_before.replace(minute=30, second=0, microsecond=0)
pass
else:
# If minutes are 30 or more, keep the current hour and set minutes to 30
return dt.replace(minute=30, second=0, microsecond=0)
def round_down_datetime1(dt, interval_minutes):
minutes_since_hour_start = dt.minute + dt.second / 60 + dt.microsecond / 1e6
# Calculate how many intervals have passed since the beginning of the hour
intervals_since_hour_start = minutes_since_hour_start // interval_minutes
# Calculate the number of minutes to subtract to round down to the previous interval
minutes_to_subtract = minutes_since_hour_start % interval_minutes
# If the time is exactly on an interval, subtract one more interval to ensure the result is less than the original time
if minutes_to_subtract == 0 and (dt.second > 0 or dt.microsecond > 0):
minutes_to_subtract += interval_minutes
# Subtract the minutes to round down to the previous interval
rounded_down_time = dt - timedelta(minutes=minutes_to_subtract)
# Reset seconds and microseconds to zero
rounded_down_time = rounded_down_time.replace(second=0, microsecond=0)
return rounded_down_time
def round_down_datetime2(dt, interval_minutes):
year, month, day, hour, minute = dt.year, dt.month, dt.day, dt.hour, dt.minute
# Calculate total minutes since the start of the hour
total_minutes = hour * 60 + minute
# Find the largest multiple of interval_minutes that is less than or equal to total_minutes
rounded_minutes = total_minutes // interval_minutes * interval_minutes
# Calculate the new hour and minute
new_hour = rounded_minutes // 60
new_minute = rounded_minutes % 60
# Construct a new datetime object with the rounded down values
return datetime.datetime(year, month, day, new_hour, new_minute)
def round_down_datetime(dt, interval_minutes):
year, month, day, hour, minute = dt.year, dt.month, dt.day, dt.hour, dt.minute
# Find the largest multiple of interval_minutes that is less than the current minute
rounded_minute = (minute // interval_minutes) * interval_minutes
# If the current minute is exactly on the interval, subtract one interval to ensure the result is less than the original time
if minute == rounded_minute:
rounded_minute -= interval_minutes
if rounded_minute < 0:
rounded_minute += 60
hour -= 1
if hour < 0:
hour += 24
# Adjust the day as well. This example assumes we won't go back past the start of the month
day -= 1
# In a full implementation, you would need to account for month and year rollover as well
# Construct a new datetime object with the rounded down values
return datetime.datetime(year, month, day, hour, rounded_minute)
#
#
def get_latest_datetime_with_specified_minutes(original_datetime, specified_minutes):
# but with the specified minutes. If this new datetime is greater than the original,
# subtract an hour from it.
new_datetime = original_datetime.replace(minute=specified_minutes, second=0, microsecond=0)
if new_datetime > original_datetime:
new_datetime -= timedelta(hours=1)
return new_datetime
def extract_minute_from_string(s):
"""
Extracts and returns the minute number from a string formatted as "
try:
return int(s.split("_min")[0])
except ValueError as e:
print(f"Error converting {s} to an integer: {e}")
return None
def extract_minute_from_string_w_every(s):
"""
Extracts and returns the minute number from a string formatted as "
try:
left = s.split("_min")[0]
middle = left.split("every_")[1]
return int(middle)
except ValueError as e:
print(f"Error converting {s} to an integer: {e}")
return None
`
`import json import logging
import yaml
class StringUtils: LOGGER = logging.getLogger(name)
def __init__(self):
pass
def apply_list_by_id(self, original_list_, update_list_):
"""
Apply list on top of another list.
If an object with the same id exists in both lists, it will be skipped.
If an object from the update list does not exist in the original list, it will be added.
:param original_list_: The original list.
:param update_list_: The list to update the original list with.
:return: The updated list of operations.
"""
try:
if not update_list_:
return original_list_
# Create a dictionary from the original list using the id as the key
items_dict = {}
for item in original_list_:
items_dict[item.id] = item
# Update the dictionary with the items from the update list
# Skip if the item id is already in the original list
for item in update_list_:
if item.id not in items_dict:
items_dict[item.id] = item
# Extract the values from the dictionary and return as a list
updated_list = []
for key in items_dict:
updated_list.append(items_dict[key])
return updated_list
except Exception as e:
StringUtils.LOGGER.error(f"Exception: {e}")
return None
def is_json(self, myjson):
try:
json.loads(myjson)
except Exception as e:
# Utils.LOGGER.info(f'this is not json ({e})')
return None
return True
def is_yaml(self, myyaml) -> bool:
try:
loaded_yaml = yaml.safe_load(myyaml)
# Additionally, check if the YAML content is a dictionary or a list,
# as YAML can also be a simple scalar which we may not want to count as valid YAML content.
is_valid_yaml_type = isinstance(loaded_yaml, (dict, list))
return is_valid_yaml_type and loaded_yaml is not None
except Exception as e:
StringUtils.LOGGER.info(f"this is not yaml ({e})")
return False
@staticmethod
def update_list_by_id(self, original_list, update_list):
"""
Updates the original list with the update list.
If an object with the same id exists in both lists, it will be replaced.
If an object from the update list does not exist in the original list, it will be added.
:param original_list: The original list.
:param update_list: The list to update the original list with.
:return: The updated list of operations.
"""
# Create a dictionary from the original list using the id as the key
try:
items_dict = {}
for item in original_list:
items_dict[item.id] = item
# Update the dictionary with the operations from the update list
for item in update_list:
items_dict[item.id] = item
# Extract the values from the dictionary and return as a list
updated_list = []
for key in items_dict:
updated_list.append(items_dict[key])
return updated_list
except Exception as e:
StringUtils.LOGGER.error(f"Exception: {e}")
return None
`
'import datetime import logging from typing import Optional
import pytz import yaml
from eks_hk.nsmanager.models.deployments.datum import Datum from eks_hk.nsmanager.models.deployments.nsData import NSData from eks_hk.nsmanager.utils.string_utils import StringUtils
class NSDeployment: """ Manages Kubernetes deployment configurations. This class is designed to handle the lifecycle of Kubernetes deployment configurations, mainly watermarking, swcaling down and scaling up.
Main API:
- __init__(self, name, type, id, version, url, env, sealId, operations, operation_data, skip_kubernetes_scale, skip_parameterstore_write, active): Initializes a new deployment instance.
- initialize(self, string_utils_: StringUtils, kubernetes_service_: KubernetesService, tick_engine_): Prepares the deployment for operation.
- apply_operations(self, housekeeper_another_instance_): Updates the operations of the deployment with those of another instance.
- to_dict(self): Converts the deployment to a dictionary format.
- to_yaml(self, skip_operations_, skip_data_): Serializes the deployment to YAML format.
- get_datum(self, data_id_): Retrieves a specific datum from the deployment.
- set_datum(self, data_id_, value): Sets or updates a specific datum within the deployment.
- get_operation(self, id_): Retrieves a specific operation based on its ID.
- set_operation(self, id_, type, execution_time, target_percentage): Sets or updates an operation in the deployment.
- tick(self, current_time_): Updates the state of the deployment based on the current time.
- get_transaction_id_prefix(self): Retrieves the transaction ID prefix.
- get_nsdata(self): Retrieves the operation data encapsulated within the deployment.
"""
LOGGER = logging.getLogger(__name__)
def __init__(self, name, type, id, version, url, env, sealId, operations, operation_data, skip_kubernetes_scale=False, skip_parameterstore_write=False, active=True, time_created_=datetime.datetime.now(pytz.utc).astimezone(pytz.timezone("America/New_York"))):
self.name = name # which is rather eks_deployment_name, for now we're not touching serialized fileds at Deployment level
self.type = type
self.id = id
self.version = version
self.url = url
self.env = env
self.sealId = sealId
self.operations = operations
# self.operation_data = operation_data
self.skip_kubernetes_scale = skip_kubernetes_scale
self.skip_parameterstore_write = skip_parameterstore_write
self.active = active
self.source = None
self.parameter_store_key = None
self._transaction_id = None
self._transaction_id_prefix = "default"
self._nsdata = NSData(name, type, id, version, url, env, sealId, operations, operation_data)
self.time_created = time_created_
def initialize(self, string_utils_: StringUtils, kubernetes_service_):
try:
self._string_utils = string_utils_
self._kubernetes_service = kubernetes_service_
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return None
def __str__(self):
return f"{self.id}\n{self.parameter_store_key}\n {self.to_yaml(False, False)}"
def apply_operations(self, housekeeper_another_instance_):
try:
self.operations = self._string_utils.apply_list_by_id(self.operations, housekeeper_another_instance_.operations)
# self.operation_data = self._string_utils.apply_list_by_id(self.operation_data, housekeeper_another_instance_.operation_data)
# self._nsdata.operation_data = self._string_utils.apply_list_by_id(self._nsdata.operation_data, housekeeper_another_instance_.operation_data)
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return None
# Converts the deployment object to a dictionary.
def to_dict(self):
try:
operations_dicts = []
for operation in self.operations:
operations_dicts.append(operation.to_dict())
operation_data_dicts = []
for data in self._nsdata.operation_data:
operation_data_dicts.append(data.to_dict())
# self.skip_kubernetes_scale = skip_kubernetes_scale
# self.skip_parameterstore_write = skip_parameterstore_write
# self.active = active
return {"name": self.name, "type": self.type, "id": self.id, "version": self.version, "url": self.url, "env": self.env, "sealId": self.sealId, "skip_kubernetes_scale": self.skip_kubernetes_scale, "skip_parameterstore_write": self.skip_parameterstore_write, "active": self.active, "operations": operations_dicts, "operation_data": operation_data_dicts}
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return None
def to_yaml(self, skip_operations_: bool, skip_data_: bool) -> str:
try:
dict_dump = self.to_dict()
if skip_operations_ is True:
dict_dump.pop("operations")
if skip_data_ is True:
dict_dump.pop("operation_data")
return yaml.dump(dict_dump, sort_keys=False, default_flow_style=False)
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return ""
# Converts the deployment object to a string representation.
def to_str(self, skip_operations_: bool, skip_data_: bool) -> str:
try:
return self.to_yaml(skip_operations_, skip_data_)
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return ""
# Retrieves a specific datum from the deployment.
def get_datum(self, data_id_: str) -> Optional[Datum]:
try:
return self._nsdata.get_datum(data_id_=data_id_)
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return None
def remove_datum(self, data_id_: str) -> bool:
try:
return self._nsdata.remove_datum(data_id_=data_id_)
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return False
# Sets a specific datum for the deployment.
# returns the datume or None if the datum is not found
def set_datum(self, data_id_: str, value=None) -> Optional[Datum]:
try:
return self._nsdata.set_datum(data_id_=data_id_, value=value)
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return None
def get_operation(self, id_: str):
try:
return next((item for item in self.operations if item.id == id_), None)
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return None
# # Sets or updates an operation in the deployment.
# def set_operation(self, id_: str, type=None, execution_time=None, target_percentage=None) -> Optional[Operation]:
# try:
# operation = self.get_operation(id_)
# if operation is not None:
# if type is not None:
# operation.type = type
# if execution_time is not None:
# operation.execution_time = execution_time
# if target_percentage is not None:
# operation.target_percentage = target_percentage
# return operation
# else:
# new_operation = Operation(id=id_, type=type or "", execution_time=execution_time or 0,
# target_percentage=target_percentage or 0)
# self.operations.append(new_operation)
#
# return new_operation
# except Exception as e:
# NSDeployment.LOGGER.error(f"Exception: {e}")
# return None
def set_transaction_id_prefix(self, transaction_id_prefix_: str) -> str:
try:
self._transaction_id_prefix = transaction_id_prefix_
return self._transaction_id_prefix
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return ""
def normalize_transaction_id(self, tid_str_: str) -> str:
try:
ret = tid_str_
for i in range(9):
ret = ret.replace(f"p1{i}_p1{i}", f"p1{i}")
ret = ret.replace(f"realtime", f"rt")
ret = ret.replace(f"transcribe-engine", f"tr-en")
ret = ret.replace(f"synthetic", f"syn")
return ret
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return ""
def create_transaction_id(self, transaction_id_prefix_: str) -> str:
try:
region = self._kubernetes_service.get_region()
cluster = self._kubernetes_service.get_cluster_name()
namespace = self._kubernetes_service.get_namespace()
host_name = self._kubernetes_service.get_hostname()
self._transaction_id_prefix = transaction_id_prefix_
# self._transaction_id = f"{transaction_id_prefix}_{region}_{cluster}_{namespace}_{host_name}_{self.name}_{self.housekeeper_controller._tick_counter}"
self._transaction_id = self.normalize_transaction_id(f"{transaction_id_prefix_}{region}_{cluster}_{namespace}_{self.name}")
return self._transaction_id
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return ""
def create_default_transaction_id(self) -> str:
try:
region = self._kubernetes_service.get_region()
cluster = self._kubernetes_service.get_cluster_name()
namespace = self._kubernetes_service.get_namespace()
# host_name = self._kubernetes_service.get_hostname()
# self._transaction_id = f"{self.transaction_id_prefix}_{region}_{cluster}_{namespace}_{host_name}_{self.name}_{self.housekeeper_controller._tick_counter}"
self._transaction_id = self.normalize_transaction_id(f"{self.get_transaction_id_prefix()}{region}_{cluster}_{namespace}_{self.name}")
return self._transaction_id
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return ""
def get_transaction_id(self) -> str:
if not self._transaction_id:
self.create_default_transaction_id()
return self._transaction_id
def get_transaction_id_prefix(self) -> str:
return self._transaction_id_prefix
def get_nsdata(self) -> NSData:
return self._nsdata
'
`import inspect import logging from typing import Optional, Union
import yaml from botocore.exceptions import ClientError from eks_hk.nsmanager.models.deployments.datum import Datum
from eks_hk.nsmanager.engine.operations.defineTransactionIdPrefixOperation import DefineTransactionIdPrefixOperation from eks_hk.nsmanager.engine.operations.immediateScaleOperation import ImmediateScaleOperation from eks_hk.nsmanager.engine.operations.operation import Operation from eks_hk.nsmanager.engine.operations.scaleOperation import ScaleOperation from eks_hk.nsmanager.engine.operations.transactionBeginOperation import TransactionBeginOperation from eks_hk.nsmanager.engine.operations.transactionEndOperation import TransactionEndOperation from eks_hk.nsmanager.engine.operations.watcherOperation import WatcherOperation from eks_hk.nsmanager.engine.operations.watermarkOperation import WatermarkOperation from eks_hk.nsmanager.models.deployments.nsDeployment import NSDeployment from eks_hk.nsmanager.services.platform.aws_service import AWSService from eks_hk.nsmanager.services.platform.kubernetes_service import KubernetesService from eks_hk.nsmanager.utils.string_utils import StringUtils
class NSDeploymentFactory: """ The NSDeploymentFactory class provides method for creating NSDeployment instances based on configuration data stored in AWS SSM Parameter Store. It encapsulates the logic for loading deployment configurations either from JSON or YAML formats, handling exceptions, and logging pertinent information throughout the process. The class supports dynamic deployment creation. """
LOGGER = logging.getLogger(__name__)
@staticmethod
def make_new() -> "NSDeploymentFactory":
nseploymentFactory = None
try:
nseploymentFactory = NSDeploymentFactory()
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception when creating HousekeeperController instance: {e}")
return nseploymentFactory
def __init__(self):
pass
# @staticmethod should _load_from_parameterstore etc become static?
def make_nsdeployment(self, key_: str, ssm_client_, string_utils_: StringUtils, aws_service_: AWSService, kubernetes_service_: KubernetesService) -> Union[NSDeployment, None]:
try:
deployment = self._load_from_parameterstore(parameter_store_key_=key_, ssm_client_=ssm_client_, string_utils_=string_utils_, aws_service_=aws_service_, kubernetes_service_=kubernetes_service_)
deployment.initialize(string_utils_=string_utils_, kubernetes_service_=kubernetes_service_)
deployments_to_apply = [] # .append
self.collect_applies(deployment, deployments_to_apply, ssm_client_=ssm_client_, string_utils_=string_utils_, aws_service_=aws_service_, kubernetes_service_=kubernetes_service_)
for d in deployments_to_apply:
if d is not deployment:
deployment.apply_operations(d)
return deployment
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception: {e}")
return None
@staticmethod
def _load_from_parameterstore(parameter_store_key_: str, ssm_client_, string_utils_: StringUtils, aws_service_: AWSService, kubernetes_service_: KubernetesService):
try:
housekeeper_deployment = NSDeploymentFactory._load_from_ps(parameter_store_key_, ssm_client_=ssm_client_, string_utils_=string_utils_, aws_service_=aws_service_, kubernetes_service_=kubernetes_service_)
if housekeeper_deployment is not None:
housekeeper_deployment_operation_data = NSDeploymentFactory._load_from_ps(parameter_store_key_ + "_data", ssm_client_=ssm_client_, string_utils_=string_utils_, aws_service_=aws_service_, kubernetes_service_=kubernetes_service_)
if housekeeper_deployment_operation_data is not None:
housekeeper_deployment.operation_data = housekeeper_deployment_operation_data.get_nsdata().operation_data
return housekeeper_deployment
else:
return None
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception: {e}")
return None
@staticmethod
def _load_from_ps(parameter_store_key, ssm_client_, string_utils_: StringUtils, aws_service_: AWSService, kubernetes_service_: KubernetesService) -> Optional[NSDeployment]:
"""Load deployment configuration from Parameter Store using the injected SSM client."""
try:
# special case for testing
if parameter_store_key.startswith("text:"):
parameter_value = parameter_store_key[len("text:") :]
else:
response = ssm_client_.get_parameter(Name=parameter_store_key, WithDecryption=True)
parameter_value = response["Parameter"]["Value"]
ret = NSDeploymentFactory.load_from_string(parameter_value, ssm_client_=ssm_client_, string_utils_=string_utils_, aws_service_=aws_service_, kubernetes_service_=kubernetes_service_)
if not ret:
NSDeploymentFactory.LOGGER.info(f"Could not load or parse {parameter_store_key}")
return None
NSDeploymentFactory.LOGGER.info(f"{parameter_store_key} read.")
ret.source = parameter_value
ret.parameter_store_key = parameter_store_key
return ret
except ClientError as e:
if e.response["Error"]["Code"] == "ParameterNotFound":
NSDeploymentFactory.LOGGER.info(f"Parameter {parameter_store_key} not found.")
else:
NSDeploymentFactory.LOGGER.error(f"Error when retrieving {parameter_store_key}: {e}")
return None
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"General exception: {e}")
return None
@staticmethod
def collect_applies(current_deployment_, deployments_to_apply_, ssm_client_, string_utils_: StringUtils, aws_service_: AWSService, kubernetes_service_: KubernetesService):
try:
if not current_deployment_:
return
strings = current_deployment_.source.splitlines()
deployments_to_apply_.append(current_deployment_)
for s in strings:
if s.startswith("#apply "):
import_key = s[len("#apply ") :]
print(f"******** applying {import_key}")
d = NSDeploymentFactory._load_from_parameterstore(import_key, ssm_client_=ssm_client_, string_utils_=string_utils_, aws_service_=aws_service_, kubernetes_service_=kubernetes_service_)
# deployments_to_apply.append(d)
NSDeploymentFactory.collect_applies(d, deployments_to_apply_, ssm_client_=ssm_client_, string_utils_=string_utils_, aws_service_=aws_service_, kubernetes_service_=kubernetes_service_)
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception: {e}")
@staticmethod
def load_from_string(input_str_, ssm_client_, string_utils_: StringUtils, aws_service_: AWSService, kubernetes_service_: KubernetesService) -> Optional[NSDeployment]:
if string_utils_.is_yaml(input_str_) is True:
depl = NSDeploymentFactory.load_from_yaml(input_str_, ssm_client_=ssm_client_, string_utils_=string_utils_, aws_service_=aws_service_, kubernetes_service_=kubernetes_service_)
if depl is not None:
depl._is_yaml = True
return depl
else:
NSDeploymentFactory.LOGGER.error("Input string is neither valid JSON nor YAML.")
return None
@staticmethod
def _load_from_yaml(yaml_str_, ssm_client_, string_utils_: StringUtils, aws_service_: AWSService, kubernetes_service_: KubernetesService):
try:
data = yaml.safe_load(yaml_str_)
return NSDeploymentFactory._load(data, ssm_client_=ssm_client_, string_utils_=string_utils_, aws_service_=aws_service_, kubernetes_service_=kubernetes_service_)
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception: {e}")
return None
@staticmethod
def load_from_yaml(yaml_str_, ssm_client_, string_utils_: StringUtils, aws_service_: AWSService, kubernetes_service_: KubernetesService) -> Optional[NSDeployment]:
"""Loads a deployment from a YAML string."""
try:
data = yaml.safe_load(yaml_str_)
return NSDeploymentFactory._load(data, ssm_client_=ssm_client_, string_utils_=string_utils_, aws_service_=aws_service_, kubernetes_service_=kubernetes_service_)
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception in load_from_yaml: {e}")
return None
@staticmethod
def _load(data_, ssm_client_, string_utils_: StringUtils, aws_service_: AWSService, kubernetes_service_: KubernetesService) -> Optional[NSDeployment]:
"""Internal method to reconstruct HouseKeeperDeployment from data."""
try:
operations = []
if data_.get("operations"):
for item in data_["operations"]:
if item["type"] == "ScaleOperation":
operation = ScaleOperation(item.get("id"), item.get("type"), item.get("execution_time"), item.get("target_percentage"), item.get("watermark_id"))
elif item["type"] == "WatcherOperation":
operation = WatcherOperation(item.get("id"), item.get("type"), item.get("execution_time"), item.get("last_target_scale_id"))
elif item["type"] == "ImmediateScaleOperation":
operation = ImmediateScaleOperation(item.get("id"), item.get("type"), item.get("scale_to"))
elif item["type"] == "WatermarkOperation":
operation = WatermarkOperation(item.get("id"), item.get("type"), item.get("execution_time"), item.get("target_percentage"), item.get("watermark_id"))
elif item["type"] == "TransactionBeginOperation":
operation = TransactionBeginOperation(item.get("id"), item.get("type"), item.get("execution_time"), item.get("target_percentage"))
elif item["type"] == "DefineTransactionIdPrefixOperation":
operation = DefineTransactionIdPrefixOperation(item.get("id"), item.get("type"), item.get("execution_time"), item.get("target_percentage"), item.get("transaction_id_prefix"))
elif item["type"] == "TransactionEndOperation":
operation = TransactionEndOperation(item.get("id"), item.get("type"), item.get("execution_time"), item.get("target_percentage"))
else:
operation = Operation(item.get("id"), item.get("type"), item.get("execution_time"), item.get("target_percentage"))
operation.initialize(ssm_client_=ssm_client_, string_utils_=string_utils_, aws_service_=aws_service_, kubernetes_service_=kubernetes_service_)
operations.append(operation)
operation_data = []
if data_.get("operation_data") is not None:
try:
operation_data = [Datum.from_dict(datum) for datum in data_["operation_data"]]
except Exception as e:
operation_data = []
return NSDeployment(name=data_.get("name"), type=data_.get("type"), id=data_.get("id"), version=data_.get("version"), url=data_.get("url"), env=data_.get("env"), sealId=data_.get("sealId"), operations=operations, operation_data=operation_data, skip_kubernetes_scale=data_.get("skip_kubernetes_scale"), skip_parameterstore_write=data_.get("skip_parameterstore_write"), active=data_.get("active"))
except Exception as e:
NSDeploymentFactory.LOGGER.error(f"Exception in {inspect.currentframe().f_code.co_name}: {e}")
return None
`
import logging import time
from eks_hk.nsmanager.engine.tickEngine import TickEngine from eks_hk.nsmanager.services.deployments.nsDeploymentManager import NSDeploymentManager from eks_hk.nsmanager.services.platform.aws_service import AWSService from eks_hk.nsmanager.services.platform.kubernetes_service import KubernetesService from eks_hk.nsmanager.utils.time_utils import TimeUtils
""" The NSController class manages the lifecycle and operations of NSDeployment instances. The class supports operations such as:
class NSController: LOGGER = logging.getLogger(name)
@staticmethod
def make_new() -> "NSController":
controller = None
try:
controller = NSController(tick_counter_=0, deployment_names_=[], eks_deployments_list_=[], eks_deployment_names_list_=[], time_to_update_houskeeper_instance_sec_=60, is_initialized_=False)
except Exception as e:
NSController.LOGGER.error(f"Exception when creating HousekeeperController instance: {e}")
return controller
def __init__(self, tick_counter_: int, deployment_names_: list, eks_deployments_list_: list, eks_deployment_names_list_: list, time_to_update_houskeeper_instance_sec_: int, is_initialized_: bool):
self._tick_counter = tick_counter_
self._deployment_names = deployment_names_
self._eks_deployments_list = eks_deployments_list_
self._eks_deployment_names_list = eks_deployment_names_list_
self._time_to_update_houskeeper_instance_sec = time_to_update_houskeeper_instance_sec_
self._is_initialized = is_initialized_
def initialize(self, ssm_client_, aws_service_: AWSService, kubernetes_service_: KubernetesService, nsdeployment_manager_: NSDeploymentManager, tickEngine_: TickEngine) -> bool:
self._ssm_client = ssm_client_
self._aws_service = aws_service_
self._kubernetes_service = kubernetes_service_
self._ns_deployment_manager = nsdeployment_manager_
self._tick_engine = tickEngine_
ret_code = True
try:
self._is_initialized = False
try:
# Get the list of deployments in the current namespace
self._eks_deployments_list = self._kubernetes_service.list_namespaced_deployment(namespace_=self._kubernetes_service.get_namespace())
NSController.LOGGER.info(f"self._eks_deployments_list={self._eks_deployments_list}")
self._eks_deployment_names_list = [deployment.metadata.name for deployment in self._eks_deployments_list.items]
except Exception as e:
NSController.LOGGER.error(f"Exception when calling AppsV1Api->list_namespaced_deployment: {e}")
ret_code = False
# --- Debug code begin if not in cluster, set some values for debugging
if self._kubernetes_service._incluster is False:
self._kubernetes_service._namespace = "p11-realtime-1"
self._eks_deployment_names_list = ["transcribe-engine"]
self._kubernetes_service._cluster_name = "p11"
self._kubernetes_service._region_name = "us-east-1"
ret_code = True
# --- Debug code end
for eks_deployment_name in self._eks_deployment_names_list:
self._ns_deployment_manager.load_deployment(eks_deployment_name)
if ret_code is True:
self._is_initialized = True
except Exception as e:
NSController.LOGGER.error(f"Exception when calling AppsV1Api->list_namespaced_deployment: {e}")
ret_code = False
return ret_code
def time_to_refresh_housekeeper_deployment(self, housekeeper_deployment_):
if housekeeper_deployment_ is None:
return True
time_diff = TimeUtils.get_est_now() - housekeeper_deployment_.time_created
if time_diff.total_seconds() > self._time_to_update_houskeeper_instance_sec:
return True
return False
def set_updated_datum(self, housekeeper_deployment_):
updateddatum = housekeeper_deployment_.get_datum(data_id_="updated")
if updateddatum is not None:
updateddatum.set_value(updateddatum.get_value() + 1)
# Update state of all managed deployments based on the current time
# Note: deployment's tick() returns tuple of booleans (Succeded, Data were updated)
def tick(self, current_time_, refresh_deployments=True):
self._tick_counter = self._tick_counter + 1
try:
region = self._kubernetes_service.get_region()
items = self._ns_deployment_manager.get_deployment_instances_list() # to avoid 'dictionary changes size during iteration error
for deployment_name, nsDeployment in items:
curr_deployment = nsDeployment
if nsDeployment and not nsDeployment.active: # "nsDeployment is not None" makes IntelliJ crazy
continue
if (refresh_deployments is not None) and self.time_to_refresh_housekeeper_deployment(nsDeployment):
curr_deployment = self._ns_deployment_manager.load_deployment(deployment_name)
data_updated = False
if curr_deployment is not None:
# succeded, data_updated = curr_deployment.tick(current_time_)
succeded, data_updated = self._tick_engine.do_schedule(curr_deployment, current_time_)
if data_updated is True:
NSController.LOGGER.info(f"Data updated, saving to Parameter Store")
self.set_updated_datum(nsDeployment)
self._aws_service.save_data_to_parameterstore(nsDeployment=nsDeployment, region_=region)
return True
except Exception as e:
NSController.LOGGER.error(f"Exception: {e}")
return False
def main_loop(self):
while True:
NSController.LOGGER.info("{}".format("- - " * 10))
NSController.LOGGER.info(f"Tick started\n")
current_time = TimeUtils.get_est_now()
self.tick(current_time)
NSController.LOGGER.info(f"Tick ended\n")
NSController.LOGGER.info("{}".format("- - " * 10))
time.sleep(30)
#######
`import logging import os import socket
from kubernetes import config, client
""" Provides methods to interact with the Kubernetes API from within a cluster or externally. This service facilitates various operations such as retrieving cluster metadata (namespace, region, cluster name) and interacting directly with Kubernetes deployments. It abstracts the complexity of dealing with the Kubernetes client libraries and environment configurations.
class KubernetesService: LOGGER = logging.getLogger(name)
def __init__(self):
# self.init_cube_config()
self._namespace = "default"
self._apps_v1 = None
self._cluster_name = None
self._region_name = None
self._aws_environment = None
self._incluster = True
def get_region_name(self):
return self._region_name
def init_cube_config(self):
KubernetesService.LOGGER.info(f"init_cube_config")
KubernetesService.LOGGER.info(f"namespace={self._namespace}")
self.get_namespace()
self.get_region()
self.get_aws_environment()
try:
# In-cluster configuration
config.load_incluster_config()
except config.config_exception.ConfigException:
# Outside-cluster configuration
self._incluster = False
config.load_kube_config()
self._apps_v1 = client.AppsV1Api()
KubernetesService.LOGGER.info(f"_incluster={self._incluster}")
KubernetesService.LOGGER.info(f"_region_name={self._region_name}")
def get_namespace(self):
if self._namespace != "default":
return self._namespace
try:
with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f:
self._namespace = f.read()
except IOError:
KubernetesService.LOGGER.info("Fallback to the default namespace if outside the cluster")
self._namespace = "default"
return self._namespace
def get_aws_environment(self):
try:
if self._aws_environment is not None:
return self._aws_environment
self._aws_environment = os.environ.get("AWS_ENVIRONMENT")
return self._aws_environment
except Exception as e:
KubernetesService.LOGGER.error(f"Exception: {e}")
return "dev"
def get_region(self):
try:
if self._region_name is not None:
return self._region_name
self._region_name = os.environ.get("AWS_REGION")
return self._region_name
except Exception as e:
KubernetesService.LOGGER.error(f"Exception: {e}")
return None
def get_cluster_name(self):
try:
if self._cluster_name is not None:
return self._cluster_name
self._cluster_name = os.environ.get("EKS_CLUSTER_NAME")
return self._cluster_name
except Exception as e:
KubernetesService.LOGGER.error(f"Exception: {e}")
return None
def get_hostname(self):
try:
hostname = "probably_local_"
try:
hostname = socket.gethostname()
except:
pass
return hostname
except Exception as e:
KubernetesService.LOGGER.error(f"Exception: {e}")
return None
def get_housekeeper_id(self):
try:
region = self.get_region()
cluster = self.get_cluster_name()
namespace = self.get_namespace()
host_name = self.get_hostname()
transaction_id = f"hkid_{region}_{cluster}_{namespace}_{host_name}"
return transaction_id
except Exception as e:
KubernetesService.LOGGER.error(f"Exception: {e}")
return None
def get_apps_v1(self):
return self._apps_v1
def read_namespaced_deployment(self, deployment_name_: str, namespace_: str):
try:
ret = self.get_apps_v1().read_namespaced_deployment(deployment_name_, namespace_)
return ret
except Exception as e:
KubernetesService.LOGGER.error(f"Exception: {e}")
return None
def replace_namespaced_deployment(self, deployment_name_: str, namespace_: str, body_: str):
try:
ret = self.get_apps_v1().replace_namespaced_deployment(name=deployment_name_, namespace=namespace_, body=body_)
return ret
except Exception as e:
KubernetesService.LOGGER.error(f"Exception: {e}")
return None
def list_namespaced_deployment(self, namespace_: str):
try:
KubernetesService.LOGGER.info(f"list_namespaced_deployment namespace_={namespace_}")
KubernetesService.LOGGER.info(f"self.get_apps_v1()={self.get_apps_v1()}")
return self.get_apps_v1().list_namespaced_deployment(namespace=namespace_)
except Exception as e:
KubernetesService.LOGGER.error(f"Exception: {e}")
return None
`
`import datetime import logging from typing import Optional
import pytz import yaml
from eks_hk.nsmanager.models.deployments.datum import Datum from eks_hk.nsmanager.models.deployments.nsData import NSData from eks_hk.nsmanager.utils.string_utils import StringUtils
add class level description
class NSDeployment: """ Manages Kubernetes deployment configurations. This class is designed to handle the lifecycle of Kubernetes deployment configurations, mainly watermarking, swcaling down and scaling up.
Main API:
get_operation_data(self): Retrieves the operation data encapsulated within the deployment. """
LOGGER = logging.getLogger(name)
!!!!!!!! search for self._nsdata = NSData(name, type, id, version, url, env, sealId, operations, operation_data)
in https://bitbucketdc-cluster08.jpmchase.net/projects/SPEECH2TEXT/repos/aws-transcribe-eks-housekeeper/pull-requests/1/overview?commentId=7284725
def init(self, name, type, id, version, url, env, sealId, operations, operation_data, skip_kubernetes_scale=False, skip_parameterstore_write=False, active=True, timecreated=datetime.datetime.now(pytz.utc).astimezone(pytz.timezone("America/New_York"))): self.name = name # which is rather eks_deployment_name, for now we're not touching serialized fileds at Deployment level self.type = type self.id = id self.version = version self.url = url self.env = env self.sealId = sealId self.operations = operations
self.operation_data = operation_data
def initialize(self, stringutils: StringUtils, kubernetesservice): try: self._string_utils = stringutils self._kubernetes_service = kubernetesservice except Exception as e: NSDeployment.LOGGER.error(f"Exception: {e}") return None
def str(self): return f"{self.id}\n{self.parameter_store_key}\n {self.to_yaml(False, False)}"
def apply_operations(self, housekeeper_anotherinstance): try: self.operations = self._string_utils.apply_list_by_id(self.operations, housekeeper_anotherinstance.operations)
self.operation_data = self._string_utils.apply_list_by_id(self.operation_data, housekeeper_anotherinstance.operation_data)
Converts the deployment object to a dictionary.
def to_dict(self): try: operations_dicts = [] for operation in self.operations: operations_dicts.append(operation.to_dict())
def to_yaml(self, skipoperations: bool, skipdata: bool) -> str: try: dict_dump = self.to_dict() if skipoperations is True: dict_dump.pop("operations") if skipdata is True: dict_dump.pop("operation_data") return yaml.dump(dict_dump, sort_keys=False, default_flow_style=False) except Exception as e: NSDeployment.LOGGER.error(f"Exception: {e}") return ""
Converts the deployment object to a string representation.
def to_str(self, skipoperations: bool, skipdata: bool) -> str: try: return self.to_yaml(skipoperations, skipdata) except Exception as e: NSDeployment.LOGGER.error(f"Exception: {e}") return ""
Retrieves a specific datum from the deployment.
def get_datum(self, dataid: str) -> Optional[Datum]: try: return self._nsdata.get_datum(dataid=dataid) except Exception as e: NSDeployment.LOGGER.error(f"Exception: {e}") return None
def remove_datum(self, dataid: str) -> bool: try: return self._nsdata.remove_datum(dataid=dataid) except Exception as e: NSDeployment.LOGGER.error(f"Exception: {e}") return False
Sets a specific datum for the deployment.
returns the datume or None if the datum is not found
def set_datum(self, dataid: str, value=None) -> Optional[Datum]: try: return self._nsdata.set_datum(dataid=dataid, value=value) except Exception as e: NSDeployment.LOGGER.error(f"Exception: {e}") return None
def getoperation(self, id: str): try: return next((item for item in self.operations if item.id == id_), None) except Exception as e: NSDeployment.LOGGER.error(f"Exception: {e}") return None
Sets or updates an operation in the deployment.
def setoperation(self, id: str, type=None, execution_time=None, target_percentage=None) -> Optional[Operation]:
try:
operation = self.getoperation(id)
if operation is not None:
if type is not None:
operation.type = type
if execution_time is not None:
operation.execution_time = execution_time
if target_percentage is not None:
operation.target_percentage = target_percentage
return operation
else:
newoperation = Operation(id=id, type=type or "", execution_time=execution_time or 0,
target_percentage=target_percentage or 0)
self.operations.append(new_operation)
#
return new_operation
except Exception as e:
NSDeployment.LOGGER.error(f"Exception: {e}")
return None
def set_transaction_id_prefix(self, transaction_idprefix: str) -> str: try: self._transaction_id_prefix = transaction_idprefix return self._transaction_id_prefix except Exception as e: NSDeployment.LOGGER.error(f"Exception: {e}") return ''
def normalize_transaction_id(self, tidstr: str) -> str: try: ret = tidstr for i in range(9): ret = ret.replace(f"p1{i}_p1{i}", f"p1{i}") ret = ret.replace(f"realtime", f"rt") ret = ret.replace(f"transcribe-engine", f"tr-en") ret = ret.replace(f"synthetic", f"syn")
def create_transaction_id(self, transaction_idprefix: str) -> str: try: region = self._kubernetes_service.get_region() cluster = self._kubernetes_service.get_cluster_name() namespace = self._kubernetes_service.get_namespace() host_name = self._kubernetes_service.get_hostname() self._transaction_id_prefix = transaction_idprefix
self._transaction_id = f"{transaction_idprefix}{region}{cluster}{namespace}_{hostname}{self.name}_{self.housekeeper_controller._tick_counter}"
def create_default_transaction_id(self) -> str: try: region = self._kubernetes_service.get_region() cluster = self._kubernetes_service.get_cluster_name() namespace = self._kubernetes_service.get_namespace()
host_name = self._kubernetes_service.get_hostname()
def get_transaction_id(self) -> str: if not self._transaction_id: self.create_default_transaction_id()
def get_transaction_id_prefix(self) -> str: return self._transaction_id_prefix
def get_operation_data(self) -> NSData: return self._nsdata `