Files
job_orchestrator_service/job_orchestrator_service/views/job_view.py
marcsello 22a685425e
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone Build is passing
Revert "Fixed domain naming"
This reverts commit cf77ff0f89.
2021-04-19 21:14:28 +02:00

252 lines
9.2 KiB
Python

#!/usr/bin/env python3
import requests
from typing import List
from flask import request, jsonify, current_app, abort, Response
from flask_classful import FlaskView, route
import kubernetes
from utils import json_required, k8s
from marshmallow.exceptions import ValidationError
from model import db, Job, Controller
from schemas import JobSchema
import time
class JobView(FlaskView):
CONTROLLER_HTTP_PORT = 8080 # It's a mediocre idea to hard code this I think
job_schema = JobSchema(many=False)
jobs_schema = JobSchema(many=True, exclude=["controllers"])
@staticmethod
def _get_k8s_stuff_controller(controller: Controller) -> dict:
# Meglátjuk mennyire lesz ez robosztus így
pod_name = JobView._controller_to_pod_name(controller)
try:
r = k8s.corev1api.read_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
except kubernetes.client.exceptions.ApiException as e:
if e.status == 404:
return {
"pod_name": pod_name,
"configuration": {
"robot_address": None,
"program_url": None,
},
"status": {
"cluster_ip": None,
"phase": None
}
}
else:
raise
env = {x.name: x.value for x in r.spec.containers[0].env}
return {
"pod_name": pod_name,
"configuration": {
"robot_address": env['ROBOT_ADDRESS'],
"program_url": env['PROGRAM_URL'],
},
"status": {
"cluster_ip": r.status.pod_ip,
"phase": r.status.phase
}
}
@staticmethod
def _controller_to_pod_name(controller: Controller) -> str:
return f"ursim-controller-{controller.id}"
@staticmethod
def _create_k8s_pod_api_object(controller: Controller, controller_desc: dict) -> dict:
return {
"metadata": {
"name": JobView._controller_to_pod_name(controller),
"labels": {
"ursim-job-id": str(controller.job.id),
"ursim-role": "controller"
}
},
"spec": {
"restartPolicy": "Never",
"containers": [
{
"name": f"ursim-controller-{controller.id}-cont",
"image": current_app.config["URSIM_CONTROL_IMAGE"],
"env": [
{
"name": "ROBOT_ADDRESS",
"value": controller_desc['configuration']['robot_address']
},
{
"name": "PROGRAM_URL",
"value": controller_desc['configuration']['program_url']
},
{
"name": "RUN_ID",
"value": f"run{controller.job.id}"
},
{
"name": "HTTP_PORT",
"value": str(JobView.CONTROLLER_HTTP_PORT)
}
],
"envFrom": [
{"configMapRef": {"name": current_app.config["URSIM_CONTROL_CONFIGMAP"]}}
]
}
]
}
}
@staticmethod
def _k8s_wait_controller_phase(controller: Controller, desired_phases: List[str]) -> str:
pod_name = JobView._controller_to_pod_name(controller)
return JobView._k8s_wait_pod_phase(pod_name, desired_phases)
@staticmethod
def _k8s_wait_pod_phase(pod_name: str, desired_phases: List[str]) -> str:
while True:
r = k8s.corev1api.read_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
if r.status.phase in desired_phases:
return r.status.phase
time.sleep(0.2)
def index(self):
jobs = Job.query.all()
result = []
for job in jobs:
result.append({
"id": job.id,
"created_at": job.created_at,
"controllers": [{"id": x.id} for x in job.controllers]
})
return jsonify(self.jobs_schema.dump(result))
def get(self, _id: str):
job = Job.query.get_or_404(_id)
result = {
"id": job.id,
"created_at": job.created_at,
"controllers": [dict(id=x.id, **self._get_k8s_stuff_controller(x)) for x in job.controllers]
}
return jsonify(self.job_schema.dump(result))
@json_required
def post(self):
try:
job_desc = self.job_schema.load(request.json)
except ValidationError as e:
return abort(422, str(e))
# Check if something is already running
last_job = Job.query.order_by(Job.id.desc()).first()
if last_job:
# Check if any controller is running
pod_names = [JobView._controller_to_pod_name(controller) for controller in last_job.controllers]
missing_pod_names = []
for pod_name in pod_names:
try:
r = k8s.corev1api.read_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
except kubernetes.client.exceptions.ApiException as e:
if e.status == 404:
missing_pod_names.append(pod_name)
continue
else:
raise
# Check if running
if r.status.phase not in ['Succeeded', 'Failed']: # Unknown, Running and Pending are the others
return abort(409, "One of the controllers are still running. Terminate it first!")
# Do some cleanup if needed
if current_app.config['AUTO_CLEANUP']:
for pod_name in pod_names:
if pod_name not in missing_pod_names:
k8s.corev1api.delete_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
# Perform starting job
job = Job()
db.session.add(job)
db.session.flush()
job_desc['id'] = job.id
job_desc['created_at'] = job.created_at
controllers = []
for i, controller_desc in enumerate(job_desc['controllers']):
controller = Controller()
controller.job = job
db.session.add(controller)
db.session.flush()
pod_object = self._create_k8s_pod_api_object(controller, controller_desc)
r = k8s.corev1api.create_namespaced_pod(current_app.config['WORKING_NAMESPACE'], pod_object)
job_desc['controllers'][i]['pod_name'] = r.metadata.name
controllers.append((controller, r.metadata.name, i))
db.session.commit()
# Szóval azért van itt a commit, mert a lenti rész egy csomó időt tölt el
# Ez idő alatt, ha jön még egy post kérés, akkor az a db-ben nem látná hogy van már task indulóban
# Szóval elkezdene mégegyet indítani és az nem lenne jó
for controller, pod_name, i in controllers:
while True:
# Wait until the pod gains ip address
r = k8s.corev1api.read_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
if r.status.pod_ip:
status = {
"cluster_ip": r.status.pod_ip,
"phase": r.status.phase
}
job_desc['controllers'][i]['status'] = status
break
time.sleep(0.2)
return jsonify(self.job_schema.dump(job_desc))
def delete(self, _id: str):
job = Job.query.get_or_404(_id)
pod_names = [JobView._controller_to_pod_name(controller) for controller in job.controllers]
for pod_name in pod_names:
try:
r = k8s.corev1api.read_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
except kubernetes.client.exceptions.ApiException as e:
if e.status == 404:
continue
else:
raise
if r.status.phase == 'Running':
try:
r = requests.post(f"http://{r.status.pod_ip}:{self.CONTROLLER_HTTP_PORT}/abort")
r.raise_for_status()
self._k8s_wait_pod_phase(pod_name, ['Succeeded', 'Failed'])
except requests.exceptions.ConnectionError:
# Ignore failure to abort, k8s will kill the pod anyways
pass
try:
k8s.corev1api.delete_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
except kubernetes.client.exceptions.ApiException as e:
if e.status == 404:
continue
else:
raise
db.session.delete(job)
db.session.commit()
return Response(status=204)