Implemented job view
This commit is contained in:
parent
998658c148
commit
800ed14449
@ -24,15 +24,18 @@ if Config.SENTRY_DSN:
|
||||
# create flask app
|
||||
app = Flask(__name__)
|
||||
app.config.from_object(Config)
|
||||
db.init_app(app)
|
||||
|
||||
# init stuffs
|
||||
k8s.init_app(app)
|
||||
register_all_error_handlers(app)
|
||||
|
||||
|
||||
@app.before_first_request
|
||||
def init_db():
|
||||
db.create_all()
|
||||
|
||||
|
||||
# register views
|
||||
for view in [JobView]:
|
||||
view.register(app, trailing_slash=False)
|
||||
|
@ -6,6 +6,26 @@ Configuration
|
||||
"""
|
||||
|
||||
|
||||
def get_namespace():
|
||||
namespace = os.environ.get("WORKING_NAMESPACE")
|
||||
|
||||
if namespace:
|
||||
return namespace
|
||||
|
||||
# Try to figure out
|
||||
try:
|
||||
# https://github.com/kubernetes-client/python/issues/363
|
||||
with open("/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f:
|
||||
namespace = f.read()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
if namespace:
|
||||
return namespace
|
||||
else:
|
||||
raise Exception("WORKING_NAMESPACE is not configured!")
|
||||
|
||||
|
||||
class Config:
|
||||
SQLALCHEMY_DATABASE_URI = os.environ.get("SQLALCHEMY_DATABASE_URI", "sqlite://")
|
||||
SQLALCHEMY_TRACK_MODIFICATIONS = False
|
||||
@ -15,3 +35,9 @@ class Config:
|
||||
SENTRY_DSN = os.environ.get("SENTRY_DSN")
|
||||
RELEASE_ID = os.environ.get("RELEASE_ID", "test")
|
||||
RELEASEMODE = os.environ.get("RELEASEMODE", "dev")
|
||||
|
||||
# Technically this could differ from the one this pod is running in, but no one tests this.
|
||||
WORKING_NAMESPACE = get_namespace()
|
||||
AUTO_CLEANUP = bool(os.environ.get("AUTO_CLEANUP", "").upper() in ['YES', 'TRUE', '1'])
|
||||
URSIM_CONTROL_IMAGE = os.environ["URSIM_CONTROL_IMAGE"]
|
||||
URSIM_CONTROL_CONFIGMAP = os.environ["URSIM_CONTROL_CONFIGMAP"]
|
||||
|
@ -1 +1,3 @@
|
||||
from .db import db
|
||||
from .job import Job
|
||||
from .controller import Controller
|
||||
|
9
job_orchestrator_service/model/controller.py
Normal file
9
job_orchestrator_service/model/controller.py
Normal file
@ -0,0 +1,9 @@
|
||||
#!/usr/bin/env python3
|
||||
from . import db
|
||||
|
||||
|
||||
class Controller(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
|
||||
|
||||
job_id = db.Column(db.Integer, db.ForeignKey("job.id", ondelete="CASCADE"), nullable=False)
|
||||
job = db.relationship("Job", backref=db.backref("controllers", lazy='joined'))
|
9
job_orchestrator_service/model/job.py
Normal file
9
job_orchestrator_service/model/job.py
Normal file
@ -0,0 +1,9 @@
|
||||
#!/usr/bin/env python3
|
||||
from sqlalchemy import func
|
||||
|
||||
from . import db
|
||||
|
||||
|
||||
class Job(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
|
||||
created_at = db.Column(db.TIMESTAMP, nullable=False, server_default=func.now())
|
27
job_orchestrator_service/schemas/controller_schema.py
Normal file
27
job_orchestrator_service/schemas/controller_schema.py
Normal file
@ -0,0 +1,27 @@
|
||||
#!/usr/bin/env python3
|
||||
from marshmallow import Schema, fields, RAISE
|
||||
|
||||
|
||||
class ControlConfigurationSchema(Schema):
|
||||
robot_address = fields.Str(required=True)
|
||||
program_url = fields.Str(required=True)
|
||||
|
||||
class Meta:
|
||||
unknown = RAISE
|
||||
|
||||
|
||||
class ControlStatusConfigurationSchema(Schema):
|
||||
cluster_ip = fields.IP()
|
||||
phase = fields.Str()
|
||||
|
||||
class Meta:
|
||||
unknown = RAISE
|
||||
|
||||
|
||||
class ControllerSchema(Schema):
|
||||
pod_name = fields.Str(required=False, dump_only=True)
|
||||
configuration = fields.Nested(ControlConfigurationSchema, many=False, required=True)
|
||||
status = fields.Nested(ControlStatusConfigurationSchema, many=False, required=False, dump_only=True)
|
||||
|
||||
class Meta:
|
||||
unknown = RAISE
|
@ -1,41 +1,13 @@
|
||||
from marshmallow import Schema, fields
|
||||
from marshmallow import Schema, fields, RAISE
|
||||
from marshmallow.validate import Length
|
||||
from marshmallow import RAISE
|
||||
|
||||
from datetime import datetime
|
||||
import uuid
|
||||
|
||||
|
||||
class ControlConfigurationSchema(Schema):
|
||||
robot_address = fields.Str(required=True)
|
||||
program_url = fields.Str(required=True)
|
||||
|
||||
class Meta:
|
||||
unknown = RAISE
|
||||
|
||||
|
||||
class ControlStatusConfigurationSchema(Schema):
|
||||
cluster_ip = fields.IP(required=False, dump_only=True)
|
||||
running = fields.Boolean(required=False, dump_only=True)
|
||||
|
||||
class Meta:
|
||||
unknown = RAISE
|
||||
|
||||
|
||||
class ControllerSchema(Schema):
|
||||
pod_id = fields.UUID(required=False, missing=uuid.uuid4)
|
||||
configuration = fields.Nested(ControlConfigurationSchema, many=False, required=True)
|
||||
status = fields.Nested(ControlStatusConfigurationSchema, required=False, dump_only=True)
|
||||
|
||||
class Meta:
|
||||
unknown = RAISE
|
||||
from .controller_schema import ControllerSchema
|
||||
|
||||
|
||||
class JobSchema(Schema):
|
||||
id = fields.UUID(required=False, missing=uuid.uuid4)
|
||||
created_at = fields.DateTime(required=False, missing=datetime.now)
|
||||
id = fields.Int(required=False, dump_only=True)
|
||||
created_at = fields.DateTime(required=False, dump_only=True)
|
||||
|
||||
controllers = fields.Nested(ControlConfigurationSchema, many=True, required=True, validate=Length(min=1))
|
||||
controllers = fields.Nested(ControllerSchema, many=True, required=True, validate=Length(min=1))
|
||||
|
||||
class Meta:
|
||||
unknown = RAISE
|
||||
|
@ -1,31 +1,228 @@
|
||||
#!/usr/bin/env python3
|
||||
import requests
|
||||
from typing import List
|
||||
from flask import request, jsonify, current_app, abort, Response
|
||||
from flask_classful import FlaskView, route
|
||||
import kubernetes
|
||||
|
||||
from utils import json_required
|
||||
from utils import json_required, k8s
|
||||
|
||||
from marshmallow.exceptions import ValidationError
|
||||
|
||||
from model import db, Job, Controller
|
||||
from schemas import JobSchema
|
||||
|
||||
import time
|
||||
|
||||
|
||||
class JobView(FlaskView):
|
||||
CONTROLLER_HTTP_PORT = 8080 # It's a mediocre idea to hard code this I think
|
||||
job_schema = JobSchema(many=False)
|
||||
jobs_schema = JobSchema(many=True, exclude=['controllers'])
|
||||
jobs_schema = JobSchema(many=True)
|
||||
|
||||
@staticmethod
|
||||
def _get_k8s_stuff_controller(controller: Controller) -> dict:
|
||||
# Meglátjuk mennyire lesz ez robosztus így
|
||||
pod_name = JobView._controller_to_pod_name(controller)
|
||||
|
||||
r = k8s.corev1api.read_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
|
||||
|
||||
env = {x.name: x.value for x in r.spec.containers[0].env}
|
||||
|
||||
return {
|
||||
"pod_name": pod_name,
|
||||
"config": {
|
||||
"robot_address": env['ROBOT_ADDRESS'],
|
||||
"program_url": env['PROGRAM_URL'],
|
||||
},
|
||||
"status": {
|
||||
"cluster_ip": r.status.pod_ip,
|
||||
"phase": r.status.phase
|
||||
}
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _controller_to_pod_name(controller: Controller) -> str:
|
||||
return f"ursim-controller-{controller.id}"
|
||||
|
||||
@staticmethod
|
||||
def _create_k8s_pod_api_object(controller: Controller, controller_desc: dict) -> dict:
|
||||
return {
|
||||
"metadata": {
|
||||
"name": JobView._controller_to_pod_name(controller),
|
||||
"labels": {
|
||||
"ursim-job-id": str(controller.job.id),
|
||||
"ursim-role": "controller"
|
||||
}
|
||||
},
|
||||
"spec": {
|
||||
"restartPolicy": "Never",
|
||||
"containers": [
|
||||
{
|
||||
"name": f"ursim-controller-{controller.id}-cont",
|
||||
"image": current_app.config["URSIM_CONTROL_IMAGE"],
|
||||
"env": [
|
||||
{
|
||||
"name": "ROBOT_ADDRESS",
|
||||
"value": controller_desc['configuration']['robot_address']
|
||||
},
|
||||
{
|
||||
"name": "PROGRAM_URL",
|
||||
"value": controller_desc['configuration']['program_url']
|
||||
},
|
||||
{
|
||||
"name": "RUN_ID",
|
||||
"value": f"run{controller.job.id}"
|
||||
},
|
||||
{
|
||||
"name": "HTTP_PORT",
|
||||
"value": str(JobView.CONTROLLER_HTTP_PORT)
|
||||
}
|
||||
],
|
||||
"envFrom": [
|
||||
{"configMapRef": {"name": current_app.config["URSIM_CONTROL_IMAGE"]}}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _k8s_wait_controller_phase(controller: Controller, desired_phases: List[str]) -> str:
|
||||
pod_name = JobView._controller_to_pod_name(controller)
|
||||
return JobView._k8s_wait_pod_phase(pod_name, desired_phases)
|
||||
|
||||
@staticmethod
|
||||
def _k8s_wait_pod_phase(pod_name: str, desired_phases: List[str]) -> str:
|
||||
while True:
|
||||
r = k8s.corev1api.read_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
|
||||
if r.status.phase in desired_phases:
|
||||
return r.status.phase
|
||||
time.sleep(0.2)
|
||||
|
||||
def index(self):
|
||||
# List all jobs
|
||||
pass
|
||||
jobs = Job.query.all()
|
||||
result = []
|
||||
for job in jobs:
|
||||
result.append({
|
||||
"id": job.id,
|
||||
"created_at": job.created_at,
|
||||
"controllers": [{"id": x.id} for x in job.controllers]
|
||||
})
|
||||
|
||||
return jsonify(self.jobs_schema.dump(result))
|
||||
|
||||
def get(self, _id: str):
|
||||
# Get info about a job
|
||||
pass
|
||||
job = Job.query.get_or_404(_id)
|
||||
|
||||
result = {
|
||||
"id": job.id,
|
||||
"created_at": job.created_at,
|
||||
"controllers": [dict(id=x.id, **self._get_k8s_stuff_controller(x)) for x in job.controllers]
|
||||
}
|
||||
|
||||
return jsonify(self.job_schema.dump(result))
|
||||
|
||||
@json_required
|
||||
def post(self):
|
||||
# Start (schedule) a job
|
||||
pass
|
||||
try:
|
||||
job_desc = self.job_schema.load(request.json)
|
||||
except ValidationError as e:
|
||||
return abort(422, str(e))
|
||||
|
||||
# Check if something is already running
|
||||
last_job = Job.query.order_by(Job.id.desc()).first()
|
||||
|
||||
if last_job:
|
||||
# Check if any controller is running
|
||||
pod_names = [JobView._controller_to_pod_name(controller) for controller in last_job.controllers]
|
||||
missing_pod_names = []
|
||||
|
||||
for pod_name in pod_names:
|
||||
try:
|
||||
r = k8s.corev1api.read_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
|
||||
except kubernetes.client.exceptions.ApiException as e:
|
||||
if e.status == 404:
|
||||
missing_pod_names.append(pod_name)
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
|
||||
# Check if running
|
||||
if r.status.phase not in ['Succeeded', 'Failed']: # Unknown, Running and Pending are the others
|
||||
return abort(409, "One of the controllers are still running. Terminate it first!")
|
||||
|
||||
# Do some cleanup if needed
|
||||
if current_app.config['AUTO_CLEANUP']:
|
||||
for pod_name in pod_names:
|
||||
if pod_name not in missing_pod_names:
|
||||
k8s.corev1api.delete_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
|
||||
|
||||
# Perform starting job
|
||||
job = Job()
|
||||
db.session.add(job)
|
||||
|
||||
job_desc['id'] = job.id
|
||||
job_desc['created_at'] = job.created_at
|
||||
|
||||
controllers = []
|
||||
for i, controller_desc in enumerate(job_desc['controllers']):
|
||||
controller = Controller()
|
||||
controller.job = job
|
||||
db.session.add(controller)
|
||||
|
||||
pod_object = self._create_k8s_pod_api_object(controller, controller_desc)
|
||||
r = k8s.corev1api.create_namespaced_pod(current_app.config['WORKING_NAMESPACE'], pod_object)
|
||||
|
||||
job_desc['controllers'][i]['pod_name'] = r.metadata.name
|
||||
controllers.append((controller, r.metadata.name, i))
|
||||
|
||||
db.session.commit()
|
||||
# Szóval azért van itt a commit, mert a lenti rész egy csomó időt tölt el
|
||||
# Ez idő alatt, ha jön még egy post kérés, akkor az a db-ben nem látná hogy van már task indulóban
|
||||
# Szóval elkezdene mégegyet indítani és az nem lenne jó
|
||||
|
||||
for controller, pod_name, i in controllers:
|
||||
while True:
|
||||
# Wait until the pod gains ip address
|
||||
r = k8s.corev1api.read_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
|
||||
if r.status.pod_ip:
|
||||
status = {
|
||||
"cluster_ip": r.status.pod_ip,
|
||||
"phase": r.status.phase
|
||||
}
|
||||
job_desc['controllers'][i]['status'] = status
|
||||
break
|
||||
time.sleep(0.2)
|
||||
|
||||
return jsonify(self.job_schema.dump(job_desc))
|
||||
|
||||
def delete(self, _id: str):
|
||||
# stop a job
|
||||
pass
|
||||
job = Job.query.get_or_404(_id)
|
||||
|
||||
pod_names = [JobView._controller_to_pod_name(controller) for controller in job.controllers]
|
||||
|
||||
for pod_name in pod_names:
|
||||
try:
|
||||
r = k8s.corev1api.read_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
|
||||
except kubernetes.client.exceptions.ApiException as e:
|
||||
if e.status == 404:
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
|
||||
if r.status.phase not in ['Succeeded', 'Failed']:
|
||||
requests.post(f"http://{r.status.pod_ip}:{self.CONTROLLER_HTTP_PORT}/abort")
|
||||
self._k8s_wait_pod_phase(pod_name, ['Succeeded', 'Failed'])
|
||||
|
||||
try:
|
||||
k8s.corev1api.delete_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
|
||||
except kubernetes.client.exceptions.ApiException as e:
|
||||
if e.status == 404:
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
|
||||
db.session.delete(job)
|
||||
db.session.commit()
|
||||
return Response(status=204)
|
||||
|
@ -1,5 +1,5 @@
|
||||
pyyaml
|
||||
|
||||
requests
|
||||
blinker
|
||||
Flask
|
||||
marshmallow
|
||||
|
Loading…
Reference in New Issue
Block a user