I've been working on adding container metrics (cpu, memory). As part of this I refactored the k8s-stats.py script to use the kubernetes & click python library. The CLI is exactly the same, but I think the inner workings are much cleaner and easier to extend.
I'm going to leave it here in case it is useful to yourself or anyone else. I may have time to do a PR if a maintainer can confirm it would be welcome.
#!/usr/bin/env python3
import json
import os
from enum import Enum
import urllib3
from kubernetes import client, config
import click
from kubernetes.client import ApiException
if __name__ == "__main__":
# Configuration must happen before creating the clients.
# There is another __main__ section at the bottom of this file
api_server = os.environ.get("API_SERVER_URL", "__API_SERVER_URL__")
token = os.environ.get("TOKEN", "__TOKEN__")
in_cluster = os.environ.get("IN_CLUSTER")
if api_server and token:
config = client.Configuration()
config.host = api_server
config.api_key = {"authorization": f"Bearer {token}"}
config.verify_ssl = False
client.Configuration.set_default(config)
# Hide warnings about unverified HTTPS requests
urllib3.disable_warnings()
elif in_cluster:
config.load_incluster_config()
else:
config.load_kube_config()
k8s_custom_objects = client.CustomObjectsApi()
k8s_core_v1 = client.CoreV1Api()
k8s_apps_v1 = client.AppsV1Api()
k8s_api_registration_v1 = client.ApiregistrationV1Api()
k8s_api_custom = client.CustomObjectsApi()
class PodMetric(str, Enum):
status_phase = "statusPhase"
status_reason = "statusReason"
status_ready = "statusReady"
container_ready = "containerReady"
container_restarts = "containerRestarts"
resources = "resources"
class DeploymentMetric(str, Enum):
status_ready = "statusReady"
replicas = "Replicas"
updated_replicas = "updatedReplicas"
@click.group()
def cli():
# Run before every command
pass
@cli.group()
def discovery():
# Run before every discovery command
pass
@discovery.command("apiservices")
@click.pass_context
def discover_apiservices(ctx):
response = k8s_api_registration_v1.list_api_service()
api_service: client.V1APIService
data = []
for api_service in response.items:
data.append(
{
"{#NAME}": api_service.metadata.name,
}
)
print(_render_json(ctx, data))
@discovery.command("componentstatuses")
@click.pass_context
def discover_componentstatuses(ctx):
response = k8s_core_v1.list_component_status()
component_status: client.V1ComponentStatus
data = []
for component_status in response.items:
data.append(
{
"{#NAME}": component_status.metadata.name,
}
)
print(_render_json(ctx, data))
@discovery.command("deployments")
@click.pass_context
def discover_deployments(ctx):
response = k8s_apps_v1.list_deployment_for_all_namespaces()
deployment: client.V1Deployment
data = []
for deployment in response.items:
data.append(
{
"{#NAMESPACE}": deployment.metadata.namespace,
"{#NAME}": deployment.metadata.name,
}
)
print(_render_json(ctx, data))
@discovery.command("nodes")
@click.pass_context
def discover_nodes(ctx):
response = k8s_core_v1.list_node()
node: client.V1Node
data = []
for node in response.items:
data.append(
{
"{#NAME}": node.metadata.name,
}
)
print(_render_json(ctx, data))
@discovery.command("pods")
@click.pass_context
def discover_pods(ctx):
response = k8s_core_v1.list_pod_for_all_namespaces()
pod: client.V1Pod
data = []
for pod in response.items:
data.append(
{
"{#NAMESPACE}": pod.metadata.namespace,
"{#NAME}": pod.metadata.name,
}
)
print(_render_json(ctx, data))
@discovery.command("containers")
@click.pass_context
def discover_containers(ctx):
response = k8s_core_v1.list_pod_for_all_namespaces()
pod: client.V1Pod
data = []
for pod in response.items:
container: client.V1Container
for container in pod.spec.containers:
data.append(
{
"{#NAMESPACE}": pod.metadata.namespace,
"{#NAME}": pod.metadata.name,
"{#CONTAINER}": container.name,
}
)
print(_render_json(ctx, data))
@cli.group()
def stats():
# Run before every stats command
pass
@stats.command("pods")
@click.argument("namespace")
@click.argument("pod_name")
@click.argument("status", type=click.Choice(PodMetric))
@click.argument("container_name", default="")
@click.pass_context
def stats_for_pods(ctx, namespace, pod_name, status, container_name):
container_status: client.V1ContainerStatus
container_state: client.V1ContainerState
condition: client.V1PodCondition
response: client.V1Pod = k8s_core_v1.read_namespaced_pod(pod_name, namespace)
pod_status: client.V1PodStatus = response.status
if status == PodMetric.status_phase:
print(pod_status.phase)
elif status == PodMetric.status_reason:
if pod_status.reason:
print(pod_status.reason)
elif status == PodMetric.status_ready:
for condition in pod_status.conditions:
if condition.type == "Ready":
print(condition.status)
elif status == PodMetric.container_restarts:
assert (
container_name
), f"CONTAINER_NAME is required for {PodMetric.status_ready.value}"
for container_status in pod_status.container_statuses:
if container_status.name != container_name:
continue
print(container_status.restart_count)
elif status == PodMetric.container_ready:
assert (
container_name
), f"CONTAINER_NAME is required for {PodMetric.container_ready.value}"
is_ready = None
for container_status in pod_status.container_statuses:
if container_status.name != container_name:
continue
container_state = container_status.state
if (
container_state.terminated
and container_state.terminated.reason == "Completed"
):
is_ready = True
else:
is_ready = container_status.ready
if is_ready is not None:
print(is_ready)
elif status == PodMetric.resources:
try:
response: dict = k8s_api_custom.get_namespaced_custom_object(
"metrics.k8s.io", "v1beta1", namespace, "pods", pod_name
)
except ApiException as e:
if e.status == 404:
return
raise
cpu = 0
memory = 0
for container in response["containers"]:
if not container_name or container["name"] == container_name:
cpu += _normalise_unit(container.get("usage", {}).get("cpu", 0))
memory += _normalise_unit(container.get("usage", {}).get("memory", 0))
print(f"{cpu},{memory}")
@stats.command("deployments")
@click.argument("namespace")
@click.argument("deployment_name")
@click.argument("status", type=click.Choice(PodMetric))
@click.pass_context
def stats_for_deployments(ctx, namespace, deployment_name, status):
response: client.V1Deployment = k8s_apps_v1.read_namespaced_deployment(
deployment_name, namespace
)
deployment_status: client.V1DeploymentStatus = response.status
if status == DeploymentMetric.replicas:
print(deployment_status.replicas)
elif status == DeploymentMetric.updated_replicas:
print(deployment_status.updated_replicas)
elif status == DeploymentMetric.status_ready:
for condition in deployment_status.conditions:
if condition.type == "Available":
print(condition.status)
@stats.command("nodes")
@click.argument("node_name")
@click.argument("condition_name")
@click.pass_context
def stats_for_nodes(ctx, node_name, condition_name):
node: client.V1Node = k8s_core_v1.read_node(node_name)
status: client.V1NodeStatus = node.status
condition: client.V1NodeCondition
for condition in status.conditions:
if condition.type == condition_name:
print(condition.status)
@stats.command("apiservices")
@click.argument("api_service_name")
@click.argument("condition_name")
@click.pass_context
def stats_for_apiservices(ctx, api_service_name, condition_name):
api_service: client.V1APIService = k8s_api_registration_v1.read_api_service(
api_service_name
)
status: client.V1APIServiceStatus = api_service.status
condition: client.V1APIServiceCondition
for condition in status.conditions:
if condition.type == condition_name:
print(condition.status)
@stats.command("componentstatuses")
@click.argument("component_status_name")
@click.argument("condition_name")
@click.pass_context
def stats_for_componentstatuses(ctx, component_status_name, condition_name):
component_status: client.V1ComponentStatus = k8s_core_v1.read_component_status(
component_status_name
)
for condition in component_status.conditions:
if condition.type == condition_name:
print(condition.status)
def _render_json(ctx, data):
return json.dumps(data)
def _normalise_unit(value: str):
if value.endswith("n"):
# nano cpu codes. Return as fractions of a whole core
return int(value[:-1]) / 1_000_000_000
elif value.endswith("Ki"):
# Kilobytes, return as bytes
return int(value[:-2]) * 1024
elif value.endswith("Mi"):
return int(value[:-2]) * 1024**2
elif value.endswith("Gi"):
return int(value[:-2]) * 1024**3
else:
return 0
if __name__ == "__main__":
cli()
I've been working on adding container metrics (cpu, memory). As part of this I refactored the
k8s-stats.py
script to use thekubernetes
&click
python library. The CLI is exactly the same, but I think the inner workings are much cleaner and easier to extend.I'm going to leave it here in case it is useful to yourself or anyone else. I may have time to do a PR if a maintainer can confirm it would be welcome.