#!/usr/bin/env python3 """ Kubernetes Client Tools """ __author__ = "@tormakris" __copyright__ = "Copyright 2020, videON Team" __module_name__ = "kuberclient" __version__text__ = "1" from kubernetes import client, config from config import RELEASEMODE class Kubectl: def __init__(self, name: str, resourcetype: str = "ingest", stream_key: str = "test", push_urls: str = "", ffmpeg_args: str = "", encode_push_urls: str = ""): if RELEASEMODE != "dev": config.load_incluster_config() self.name = name self.streamtype = resourcetype.lower(), self.stream_key = stream_key, self.push_urls = push_urls, self.ffmpeg_args = ffmpeg_args, self.encode_push_url = encode_push_urls def __delete_pod(self): foundpods = client.CoreV1Api().list_namespaced_pod("videon", label_selector=f"app in ({self.name})") for pod in foundpods.items: client.CoreV1Api().delete_namespaced_pod(pod.metadata.name, "videon") def __create_configmap(self): configmap = client.V1ConfigMap(api_version="v1", kind="ConfigMap", data={"TYPE": self.streamtype, "STREAM_KEY": self.stream_key, "PUSH_URLS": self.push_urls, "FFMPEG_ARGS": self.ffmpeg_args, "ENCODE_PUSH_URL": self.encode_push_url}) client.CoreV1Api().create_namespaced_config_map(namespace="videon", name=self.name, body=configmap) def __update_configmap(self): configmap = client.V1ConfigMap(api_version="v1", kind="ConfigMap", data={"TYPE": self.streamtype, "STREAM_KEY": self.stream_key, "PUSH_URLS": self.push_urls, "FFMPEG_ARGS": self.ffmpeg_args, "ENCODE_PUSH_URL": self.encode_push_url}) client.CoreV1Api().patch_namespaced_config_map(namespace="videon", name=self.name, body=configmap) def __delete_configmap(self): client.CoreV1Api().delete_namespaced_config_map(namespace="videon", name=self.name) def __delete_service(self): client.CoreV1Api().delete_namespaced_service(namespace="videon", name=self.name) def __delete_deployment(self): client.ExtensionsV1beta1Api().delete_namespaced_deployment(name=self.name, namespace="videon", body=client.V1DeleteOptions( propagation_policy="Foreground", grace_period_seconds=5)) def __create_deployment(self): envs = [client.V1EnvFromSource(config_map_ref=client.V1ConfigMapEnvSource(name=self.name))] container = client.V1Container( name=self.name, image="registry.kmlabz.com/videon/nginx-streamer", image_pull_policy="Always", ports=[client.V1ContainerPort(container_port=1935)], env_from=envs, ) # Template template = client.V1PodTemplateSpec( metadata=client.V1ObjectMeta(labels={"app": self.name}), spec=client.V1PodSpec(containers=[container])) # Spec spec = client.V1DeploymentSpec( replicas=1, template=template) # Deployment deployment = client.V1Deployment( api_version="apps/v1", kind="Deployment", metadata=client.V1ObjectMeta(name=self.name), spec=spec) # Creation of the Deployment in specified namespace client.AppsV1Api().create_namespaced_deployment( namespace="videon", body=deployment ) def __create_service(self): core_v1_api = client.CoreV1Api() body = client.V1Service( api_version="v1", kind="Service", metadata=client.V1ObjectMeta( name=self.name ), spec=client.V1ServiceSpec( selector={"app": self.name}, ports=[client.V1ServicePort( port=1935, target_port=1935 )] ) ) # Creation of the Service in specified namespace core_v1_api.create_namespaced_service(namespace="videon", body=body, name=self.name) def create_resource(self): if RELEASEMODE != "dev": self.__create_configmap() self.__create_deployment() self.__create_service() def update_resource(self): if RELEASEMODE != "dev": self.__update_configmap() self.__delete_pod() def delete_resource(self): if RELEASEMODE != "dev": self.__delete_deployment() self.__delete_configmap() self.__delete_service()