This commit is contained in:
		@@ -41,3 +41,5 @@ class Config:
 | 
			
		||||
    AUTO_CLEANUP = bool(os.environ.get("AUTO_CLEANUP", "").upper() in ['YES', 'TRUE', '1'])
 | 
			
		||||
    URSIM_CONTROL_IMAGE = os.environ["URSIM_CONTROL_IMAGE"]
 | 
			
		||||
    URSIM_CONTROL_CONFIGMAP = os.environ["URSIM_CONTROL_CONFIGMAP"]
 | 
			
		||||
 | 
			
		||||
    LINK_QUALITY_REPORT_URL = os.environ["LINK_QUALITY_REPORT_URL"]
 | 
			
		||||
 
 | 
			
		||||
@@ -6,4 +6,6 @@ from . import db
 | 
			
		||||
 | 
			
		||||
class Job(db.Model):
 | 
			
		||||
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
 | 
			
		||||
    created_at = db.Column(db.TIMESTAMP, nullable=False, server_default=func.now())
 | 
			
		||||
    created_at = db.Column(db.TIMESTAMP, nullable=False, server_default=func.now())
 | 
			
		||||
 | 
			
		||||
    site = db.Column(db.String, nullable=False)
 | 
			
		||||
 
 | 
			
		||||
@@ -9,5 +9,7 @@ class JobSchema(Schema):
 | 
			
		||||
 | 
			
		||||
    controllers = fields.Nested(ControllerSchema, many=True, required=True, validate=Length(min=1))
 | 
			
		||||
 | 
			
		||||
    site = fields.Str(required=False, dump_only=True)
 | 
			
		||||
 | 
			
		||||
    class Meta:
 | 
			
		||||
        unknown = RAISE
 | 
			
		||||
 
 | 
			
		||||
@@ -35,5 +35,9 @@ class FlaskKubernetes:
 | 
			
		||||
    def corev1api(self) -> kubernetes.client.CoreV1Api:
 | 
			
		||||
        return kubernetes.client.CoreV1Api(self.connection)
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def api(self) -> kubernetes.client.ApiClient:
 | 
			
		||||
        return kubernetes.client.ApiClient(self.connection)
 | 
			
		||||
 | 
			
		||||
k8s = FlaskKubernetes()
 | 
			
		||||
 | 
			
		||||
k8s = FlaskKubernetes()
 | 
			
		||||
 
 | 
			
		||||
@@ -62,7 +62,8 @@ class JobView(FlaskView):
 | 
			
		||||
        return f"ursim-controller-{controller.id}"
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _create_k8s_pod_api_object(controller: Controller, controller_desc: dict) -> dict:
 | 
			
		||||
    def _create_k8s_pod_api_object(controller: Controller, controller_desc: dict, site_lowest_latency: str) -> dict:
 | 
			
		||||
        # This is a job actually
 | 
			
		||||
        return {
 | 
			
		||||
            "metadata": {
 | 
			
		||||
                "name": JobView._controller_to_pod_name(controller),
 | 
			
		||||
@@ -72,39 +73,52 @@ class JobView(FlaskView):
 | 
			
		||||
                }
 | 
			
		||||
            },
 | 
			
		||||
            "spec": {
 | 
			
		||||
                "restartPolicy": "Never",
 | 
			
		||||
                "containers": [
 | 
			
		||||
                    {
 | 
			
		||||
                        "name": f"ursim-controller-{controller.id}-cont",
 | 
			
		||||
                        "image": current_app.config["URSIM_CONTROL_IMAGE"],
 | 
			
		||||
                        "env": [
 | 
			
		||||
                            {
 | 
			
		||||
                                "name": "ROBOT_ADDRESS",
 | 
			
		||||
                                "value": controller_desc['configuration']['robot_address']
 | 
			
		||||
                            },
 | 
			
		||||
                            {
 | 
			
		||||
                                "name": "PROGRAM_URL",
 | 
			
		||||
                                "value": controller_desc['configuration']['program_url']
 | 
			
		||||
                            },
 | 
			
		||||
                            {
 | 
			
		||||
                                "name": "RUN_ID",
 | 
			
		||||
                                "value": f"run{controller.job.id}"
 | 
			
		||||
                            },
 | 
			
		||||
                            {
 | 
			
		||||
                                "name": "HTTP_PORT",
 | 
			
		||||
                                "value": str(JobView.CONTROLLER_HTTP_PORT)
 | 
			
		||||
                "template": {
 | 
			
		||||
                    "spec": {
 | 
			
		||||
                        "parallelism": 1,
 | 
			
		||||
                        "backoffLimit": 1,
 | 
			
		||||
                        "template": {
 | 
			
		||||
                            "spec": {
 | 
			
		||||
                                "restartPolicy": "Never",
 | 
			
		||||
                                "containers": [
 | 
			
		||||
                                    {
 | 
			
		||||
                                        "name": f"ursim-controller-{controller.id}-cont",
 | 
			
		||||
                                        "image": current_app.config["URSIM_CONTROL_IMAGE"],
 | 
			
		||||
                                        "env": [
 | 
			
		||||
                                            {
 | 
			
		||||
                                                "name": "ROBOT_ADDRESS",
 | 
			
		||||
                                                "value": controller_desc['configuration']['robot_address']
 | 
			
		||||
                                            },
 | 
			
		||||
                                            {
 | 
			
		||||
                                                "name": "PROGRAM_URL",
 | 
			
		||||
                                                "value": controller_desc['configuration']['program_url']
 | 
			
		||||
                                            },
 | 
			
		||||
                                            {
 | 
			
		||||
                                                "name": "RUN_ID",
 | 
			
		||||
                                                "value": f"run{controller.job.id}"
 | 
			
		||||
                                            },
 | 
			
		||||
                                            {
 | 
			
		||||
                                                "name": "HTTP_PORT",
 | 
			
		||||
                                                "value": str(JobView.CONTROLLER_HTTP_PORT)
 | 
			
		||||
                                            }
 | 
			
		||||
                                        ],
 | 
			
		||||
                                        "envFrom": [
 | 
			
		||||
                                            {"configMapRef": {"name": current_app.config["URSIM_CONTROL_CONFIGMAP"]}}
 | 
			
		||||
                                        ]
 | 
			
		||||
                                    }
 | 
			
		||||
                                ],
 | 
			
		||||
                                "imagePullSecrets": [
 | 
			
		||||
                                    {
 | 
			
		||||
                                        "name": "regcred"
 | 
			
		||||
                                    }
 | 
			
		||||
                                ]
 | 
			
		||||
                            }
 | 
			
		||||
                        ],
 | 
			
		||||
                        "envFrom": [
 | 
			
		||||
                            {"configMapRef": {"name": current_app.config["URSIM_CONTROL_CONFIGMAP"]}}
 | 
			
		||||
                        ]
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                ],
 | 
			
		||||
                "imagePullSecrets": [
 | 
			
		||||
                    {
 | 
			
		||||
                        "name": "regcred"
 | 
			
		||||
                    }
 | 
			
		||||
                ]
 | 
			
		||||
                },
 | 
			
		||||
                "placement": {
 | 
			
		||||
                    "clusters": [site_lowest_latency]
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@@ -151,36 +165,47 @@ class JobView(FlaskView):
 | 
			
		||||
        except ValidationError as e:
 | 
			
		||||
            return abort(422, str(e))
 | 
			
		||||
 | 
			
		||||
        r = requests.get(current_app.config['LINK_QUALITY_REPORT_URL'])
 | 
			
		||||
        r.raise_for_status()
 | 
			
		||||
        weather_report = r.json()
 | 
			
		||||
 | 
			
		||||
        if not weather_report:
 | 
			
		||||
            return abort(500, "Could not fetch weather report...")
 | 
			
		||||
 | 
			
		||||
        link_lowest_latency = min(weather_report.items(), key=lambda x: x[1]['latency']['mean'])[0]
 | 
			
		||||
        site_lowest_latency = link_lowest_latency.split(':', 1)[1]
 | 
			
		||||
 | 
			
		||||
        # Check if something is already running
 | 
			
		||||
        last_job = Job.query.order_by(Job.id.desc()).first()
 | 
			
		||||
        # last_job = Job.query.order_by(Job.id.desc()).first()
 | 
			
		||||
 | 
			
		||||
        if last_job:
 | 
			
		||||
            # Check if any controller is running
 | 
			
		||||
            pod_names = [JobView._controller_to_pod_name(controller) for controller in last_job.controllers]
 | 
			
		||||
            missing_pod_names = []
 | 
			
		||||
 | 
			
		||||
            for pod_name in pod_names:
 | 
			
		||||
                try:
 | 
			
		||||
                    r = k8s.corev1api.read_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
 | 
			
		||||
                except kubernetes.client.exceptions.ApiException as e:
 | 
			
		||||
                    if e.status == 404:
 | 
			
		||||
                        missing_pod_names.append(pod_name)
 | 
			
		||||
                        continue
 | 
			
		||||
                    else:
 | 
			
		||||
                        raise
 | 
			
		||||
 | 
			
		||||
                # Check if running
 | 
			
		||||
                if r.status.phase not in ['Succeeded', 'Failed']:  # Unknown, Running and Pending are the others
 | 
			
		||||
                    return abort(409, "One of the controllers are still running. Terminate it first!")
 | 
			
		||||
 | 
			
		||||
            # Do some cleanup if needed
 | 
			
		||||
            if current_app.config['AUTO_CLEANUP']:
 | 
			
		||||
                for pod_name in pod_names:
 | 
			
		||||
                    if pod_name not in missing_pod_names:
 | 
			
		||||
                        k8s.corev1api.delete_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
 | 
			
		||||
        # if last_job:
 | 
			
		||||
        #     # Check if any controller is running
 | 
			
		||||
        #     pod_names = [JobView._controller_to_pod_name(controller) for controller in last_job.controllers]
 | 
			
		||||
        #     missing_pod_names = []
 | 
			
		||||
        #
 | 
			
		||||
        #     for pod_name in pod_names:
 | 
			
		||||
        #         try:
 | 
			
		||||
        #             r = k8s.corev1api.read_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
 | 
			
		||||
        #         except kubernetes.client.exceptions.ApiException as e:
 | 
			
		||||
        #             if e.status == 404:
 | 
			
		||||
        #                 missing_pod_names.append(pod_name)
 | 
			
		||||
        #                 continue
 | 
			
		||||
        #             else:
 | 
			
		||||
        #                 raise
 | 
			
		||||
        #
 | 
			
		||||
        #         # Check if running
 | 
			
		||||
        #         if r.status.phase not in ['Succeeded', 'Failed']:  # Unknown, Running and Pending are the others
 | 
			
		||||
        #             return abort(409, "One of the controllers are still running. Terminate it first!")
 | 
			
		||||
        #
 | 
			
		||||
        #     # Do some cleanup if needed
 | 
			
		||||
        #     if current_app.config['AUTO_CLEANUP']:
 | 
			
		||||
        #         for pod_name in pod_names:
 | 
			
		||||
        #             if pod_name not in missing_pod_names:
 | 
			
		||||
        #                 k8s.corev1api.delete_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
 | 
			
		||||
 | 
			
		||||
        # Perform starting job
 | 
			
		||||
        job = Job()
 | 
			
		||||
        job.site = site_lowest_latency
 | 
			
		||||
        db.session.add(job)
 | 
			
		||||
        db.session.flush()
 | 
			
		||||
 | 
			
		||||
@@ -194,8 +219,16 @@ class JobView(FlaskView):
 | 
			
		||||
            db.session.add(controller)
 | 
			
		||||
            db.session.flush()
 | 
			
		||||
 | 
			
		||||
            pod_object = self._create_k8s_pod_api_object(controller, controller_desc)
 | 
			
		||||
            r = k8s.corev1api.create_namespaced_pod(current_app.config['WORKING_NAMESPACE'], pod_object)
 | 
			
		||||
            pod_object = self._create_k8s_pod_api_object(controller, controller_desc, site_lowest_latency)
 | 
			
		||||
            # r = k8s.corev1api.create_namespaced_pod(, pod_object)
 | 
			
		||||
 | 
			
		||||
            group = 'types.kubefed.io'
 | 
			
		||||
            version = 'v1beta1'
 | 
			
		||||
            plural = 'federatedjobs'
 | 
			
		||||
 | 
			
		||||
            r = k8s.api.create_namespaced_custom_object(
 | 
			
		||||
                group, version, current_app.config['WORKING_NAMESPACE'], plural, pod_object
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            job_desc['controllers'][i]['pod_name'] = r.metadata.name
 | 
			
		||||
            controllers.append((controller, r.metadata.name, i))
 | 
			
		||||
@@ -205,18 +238,18 @@ class JobView(FlaskView):
 | 
			
		||||
        # Ez idő alatt, ha jön még egy post kérés, akkor az a db-ben nem látná hogy van már task indulóban
 | 
			
		||||
        # Szóval elkezdene mégegyet indítani és az nem lenne jó
 | 
			
		||||
 | 
			
		||||
        for controller, pod_name, i in controllers:
 | 
			
		||||
            while True:
 | 
			
		||||
                # Wait until the pod gains ip address
 | 
			
		||||
                r = k8s.corev1api.read_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
 | 
			
		||||
                if r.status.pod_ip:
 | 
			
		||||
                    status = {
 | 
			
		||||
                        "cluster_ip": r.status.pod_ip,
 | 
			
		||||
                        "phase": r.status.phase
 | 
			
		||||
                    }
 | 
			
		||||
                    job_desc['controllers'][i]['status'] = status
 | 
			
		||||
                    break
 | 
			
		||||
                time.sleep(0.2)
 | 
			
		||||
        # for controller, pod_name, i in controllers:
 | 
			
		||||
        #     while True:
 | 
			
		||||
        #         # Wait until the pod gains ip address
 | 
			
		||||
        #         r = k8s.corev1api.read_namespaced_pod(pod_name, current_app.config['WORKING_NAMESPACE'])
 | 
			
		||||
        #         if r.status.pod_ip:
 | 
			
		||||
        #             status = {
 | 
			
		||||
        #                 "cluster_ip": r.status.pod_ip,
 | 
			
		||||
        #                 "phase": r.status.phase
 | 
			
		||||
        #             }
 | 
			
		||||
        #             job_desc['controllers'][i]['status'] = status
 | 
			
		||||
        #             break
 | 
			
		||||
        #         time.sleep(0.2)
 | 
			
		||||
 | 
			
		||||
        return jsonify(self.job_schema.dump(job_desc))
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user