Files
job_orchestrator_service/job_orchestrator_service/utils/kubernetes_adapter.py
2021-04-18 15:13:50 +02:00

39 lines
1.1 KiB
Python

from flask import current_app, _app_ctx_stack
import kubernetes
class FlaskKubernetes:
def __init__(self, app=None):
self.app = app
if app:
self.init_app(app)
def init_app(self, app):
# app.config.setdefault('SOMETHING', None)
app.extensions = getattr(app, 'extensions', {})
app.extensions['kubernetes'] = self
def connect(self) -> kubernetes.client.ApiClient:
# This may be slow, but at least safe
config = kubernetes.client.configuration.Configuration()
kubernetes.config.load_incluster_config(config)
return kubernetes.client.ApiClient(configuration=config)
@property
def connection(self) -> kubernetes.client.ApiClient:
ctx = _app_ctx_stack.top
if ctx is not None:
if not hasattr(ctx, 'kubernetes'): # first time called during this request
ctx.kubernetes = self.connect()
return ctx.kubernetes
@property
def corev1api(self) -> kubernetes.client.CoreV1Api:
return kubernetes.client.CoreV1Api(self.connection)
k8s = FlaskKubernetes()