Added HTTP control stuff
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Pünkösd Marcell 2021-04-15 00:31:53 +02:00
parent fffa8b2fef
commit 2b3d994cc2
5 changed files with 97 additions and 11 deletions

View File

@ -12,3 +12,6 @@ class Config:
PROGRAM_URL = os.environ["PROGRAM_URL"] PROGRAM_URL = os.environ["PROGRAM_URL"]
DRY_RUN = ('--dry-run' in sys.argv) or bool(os.environ.get("DRY_RUN", False)) DRY_RUN = ('--dry-run' in sys.argv) or bool(os.environ.get("DRY_RUN", False))
DEBUG = ('--debug' in sys.argv) or bool(os.environ.get("DEBUG", False)) DEBUG = ('--debug' in sys.argv) or bool(os.environ.get("DEBUG", False))
HTTP_ENABLE = ('--no-http' not in sys.argv) and \
bool(os.environ.get("HTTP_ENABLE", "").upper() not in ['NO', 'FALSE', '0'])
HTTP_PORT = int(os.environ.get("HTTP_PORT", 8080))

View File

@ -4,7 +4,8 @@ from config import Config
from plugins import SleepPlugin, SyncPlugin, WaitPlugin, URRTDEPlugin, LogPlugin from plugins import SleepPlugin, SyncPlugin, WaitPlugin, URRTDEPlugin, LogPlugin
from plugin_repository import PluginRepository from plugin_repository import PluginRepository
from program_executor import ProgramExecutor, ProgramExecutorStates from program_executor import ProgramExecutor, ProgramExecutorStates
from http_server import ControllerHTTPServer import weakref
from tiny_http_server import TinyHTTPServer
import logging import logging
import signal import signal
@ -14,14 +15,66 @@ from program_loader import load_program
class HTTPControl: class HTTPControl:
def GET_status(self, path, data): def __init__(self, executor: ProgramExecutor, wait_plugin: WaitPlugin):
return 200, "Very good!" self._executor_ref = weakref.ref(executor)
def POST_terminate(self, path, data): if wait_plugin:
return 200, "Will do sir!" self._wait_plugin_ref = weakref.ref(wait_plugin)
else:
self._wait_plugin_ref = None
def GET_status(self, path, data):
executor: ProgramExecutor = self._executor_ref()
if not executor:
return 503, "Service unavailable"
try:
return 200, executor.get_status()
except Exception as e:
return 500, str(e)
def POST_abort(self, path, data):
# TODO: announce to redis message bus
executor: ProgramExecutor = self._executor_ref()
if not executor:
return 503, "Service unavailable"
try:
executor.abort()
except Exception as e:
return 500, str(e)
return 200, "Aborting"
def POST_continue(self, path, data): def POST_continue(self, path, data):
return 201, "Will do sir!"
if not self._wait_plugin_ref:
return 422, "Plugin not loaded"
wait_plugin: WaitPlugin = self._wait_plugin_ref()
if not wait_plugin:
return 503, "Service unavailable"
try:
wait_plugin.cont()
except Exception as e:
return 500, str(e)
return 200, "Continuing"
def POST_unloop(self, path, data):
executor: ProgramExecutor = self._executor_ref()
if not executor:
return 503, "Service unavailable"
try:
executor.stop_looping()
except Exception as e:
return 500, str(e)
return 200, "Looping off"
def main() -> int: def main() -> int:
@ -72,6 +125,20 @@ def main() -> int:
logging.info("Preparing for execution...") logging.info("Preparing for execution...")
executor = ProgramExecutor(program, loop=False) executor = ProgramExecutor(program, loop=False)
# Prepare the HTTP Server
if Config.HTTP_ENABLE:
try:
loaded_wait_plugin = plugin_repo.get_plugin_instance('wait')
except KeyError:
loaded_wait_plugin = None
# The HTTP server stores weak reference to the executor and the wait plugin (if loaded)
httpd = TinyHTTPServer(HTTPControl(executor, loaded_wait_plugin), port=Config.HTTP_PORT)
httpd.start()
else:
httpd = None
# Setup signal handler # Setup signal handler
def handle_stop_signal(signum, frame): def handle_stop_signal(signum, frame):
logging.warning(f"Signal {signum} received. Aborting execution!") logging.warning(f"Signal {signum} received. Aborting execution!")
@ -103,7 +170,11 @@ def main() -> int:
# Close all resources # Close all resources
logging.info("Cleaning up...") logging.info("Cleaning up...")
if httpd:
httpd.shutdown()
plugin_repo.close() plugin_repo.close()
# Set exit code
return 0 if execution_success else 5 return 0 if execution_success else 5

View File

@ -84,3 +84,5 @@ class PluginRepository:
for plugin_name, plugin_instance in self._loaded_plugins.items(): for plugin_name, plugin_instance in self._loaded_plugins.items():
plugin_instance.close() plugin_instance.close()
self._logger.info(f"Unloaded plugin: {plugin_name}") self._logger.info(f"Unloaded plugin: {plugin_name}")
self._loaded_plugins = {}

View File

@ -14,6 +14,10 @@ class ProgramExecutorStates(Enum):
CRASHED = 4 CRASHED = 4
#
# TODO: Put locks where they needed to be
#
class ProgramExecutor(Thread): class ProgramExecutor(Thread):
def __init__( def __init__(
@ -32,11 +36,13 @@ class ProgramExecutor(Thread):
self._logger = logging.getLogger("executor") self._logger = logging.getLogger("executor")
def abort(self): def abort(self):
# TODO: Na ide kellene locking
self._logger.debug("Aborting due to external request...") self._logger.debug("Aborting due to external request...")
self._state = ProgramExecutorStates.ABORTED self._state = ProgramExecutorStates.ABORTED
self._program[self._pc].abort() self._program[self._pc].abort()
def get_status(self) -> dict: def get_status(self) -> dict:
# TODO: evaluate data consistency and necessity of locking.
return { return {
"current_step": self._pc, "current_step": self._pc,
"program_length": len(self._program), "program_length": len(self._program),
@ -51,12 +57,16 @@ class ProgramExecutor(Thread):
@property @property
def state(self): def state(self):
# Used to check the successfulness of the run as well # Used to check the successfulness of the run as well
# Simple atomic read access no need to be locked
return self._state return self._state
def stop_looping(self): def stop_looping(self):
# This only changes a boolean
# Unprotected access won't cause problems
if self._loop: if self._loop:
self._logger.info("Looping disabled! Finishing current loop then exiting...")
self._loop = False self._loop = False
self._logger.info("Looping disabled! Finishing current loop then exiting...")
def run(self) -> None: def run(self) -> None:
self._state = ProgramExecutorStates.RUNNING self._state = ProgramExecutorStates.RUNNING

View File

@ -9,7 +9,7 @@ from threading import Thread
# Although the process might become unresponsive # Although the process might become unresponsive
# #
class ControlRequestHandler(BaseHTTPRequestHandler): class TinyRequestHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args): def log_message(self, format, *args):
self.server._logger.debug(format % args) self.server._logger.debug(format % args)
@ -53,17 +53,17 @@ class ControlRequestHandler(BaseHTTPRequestHandler):
self._handle_request('POST', post_data) self._handle_request('POST', post_data)
class ControllerHTTPServer(Thread): class TinyHTTPServer(Thread):
def __init__(self, methods_instance, port: int = 8000): def __init__(self, methods_instance, port: int = 8000):
super().__init__() super().__init__()
self._logger = logging.getLogger("http") self._logger = logging.getLogger("http")
self._http_server = ThreadingHTTPServer(('', port), ControlRequestHandler) self._http_server = ThreadingHTTPServer(('', port), TinyRequestHandler)
self._http_server._methods_instance = methods_instance self._http_server._methods_instance = methods_instance
self._http_server._logger = self._logger self._http_server._logger = self._logger
def run(self): def run(self):
self._logger.info("Starting HTTP Server...") self._logger.info(f"Starting HTTP Server (bind: {self._http_server.server_address})...")
self._http_server.serve_forever() self._http_server.serve_forever()
self._logger.info("Stopped HTTP Server...") self._logger.info("Stopped HTTP Server...")