184 lines
5.3 KiB
Python
184 lines
5.3 KiB
Python
#!/usr/bin/env python3
|
|
import sys
|
|
from typing import Optional
|
|
from config import Config
|
|
from plugins import SleepPlugin, SyncPlugin, WaitPlugin, URRTDEPlugin, LogPlugin
|
|
from plugin_repository import PluginRepository
|
|
from program_executor import ProgramExecutor, ProgramExecutorStates
|
|
import weakref
|
|
from tiny_http_server import TinyHTTPServer
|
|
import logging
|
|
import signal
|
|
|
|
from compiler import compile_program
|
|
from program_loader import load_program
|
|
|
|
|
|
class HTTPControl:
|
|
|
|
def __init__(self, executor: ProgramExecutor, wait_plugin: Optional[WaitPlugin]):
|
|
self._executor_ref = weakref.ref(executor)
|
|
|
|
if wait_plugin:
|
|
self._wait_plugin_ref = weakref.ref(wait_plugin)
|
|
else:
|
|
self._wait_plugin_ref = None
|
|
|
|
def GET_status(self, path, data):
|
|
|
|
executor: ProgramExecutor = self._executor_ref()
|
|
if not executor:
|
|
return 503, "Service unavailable"
|
|
|
|
try:
|
|
return 200, executor.get_status()
|
|
except Exception as e:
|
|
return 500, str(e)
|
|
|
|
def POST_abort(self, path, data):
|
|
# TODO: announce to redis message bus
|
|
executor: ProgramExecutor = self._executor_ref()
|
|
if not executor:
|
|
return 503, "Service unavailable"
|
|
|
|
try:
|
|
executor.abort()
|
|
except Exception as e:
|
|
return 500, str(e)
|
|
|
|
return 200, "Aborting"
|
|
|
|
def POST_continue(self, path, data):
|
|
|
|
if not self._wait_plugin_ref:
|
|
return 422, "Plugin not loaded"
|
|
|
|
wait_plugin: WaitPlugin = self._wait_plugin_ref()
|
|
|
|
if not wait_plugin:
|
|
return 503, "Service unavailable"
|
|
|
|
try:
|
|
wait_plugin.cont()
|
|
except Exception as e:
|
|
return 500, str(e)
|
|
|
|
return 200, "Continuing"
|
|
|
|
def POST_unloop(self, path, data):
|
|
executor: ProgramExecutor = self._executor_ref()
|
|
if not executor:
|
|
return 503, "Service unavailable"
|
|
|
|
try:
|
|
executor.stop_looping()
|
|
except Exception as e:
|
|
return 500, str(e)
|
|
|
|
return 200, "Looping off"
|
|
|
|
|
|
def main() -> int:
|
|
# init instance
|
|
logging.basicConfig(
|
|
stream=sys.stdout,
|
|
format="%(asctime)s [%(levelname)s]: %(name)s: %(message)s",
|
|
level=logging.DEBUG if Config.DEBUG else logging.INFO
|
|
)
|
|
|
|
logging.info("Registering available plugins...")
|
|
# Register all available plugins
|
|
plugin_repo = PluginRepository()
|
|
plugin_repo.register_plugin(SleepPlugin)
|
|
plugin_repo.register_plugin(SyncPlugin)
|
|
plugin_repo.register_plugin(WaitPlugin)
|
|
plugin_repo.register_plugin(URRTDEPlugin)
|
|
plugin_repo.register_plugin(LogPlugin)
|
|
|
|
# Download the program
|
|
logging.info("Downloading program...")
|
|
try:
|
|
program_source = load_program(Config.PROGRAM_URL)
|
|
except Exception as e:
|
|
logging.error(f"Failed to download program: {e}! Exiting...")
|
|
logging.exception(e)
|
|
return 1
|
|
|
|
# Load required plugins
|
|
logging.info("Loading required plugins...")
|
|
try:
|
|
plugin_repo.load_plugins(program_source['load_plugins'])
|
|
except Exception as e:
|
|
logging.error(f"Error during plugin loading: {e}! Exiting...")
|
|
logging.exception(e)
|
|
return 2
|
|
|
|
# Compile the program
|
|
logging.info("Compiling program...")
|
|
try:
|
|
program = compile_program(plugin_repo, program_source['program'])
|
|
except Exception as e:
|
|
logging.error(f"Error during compilation: {e}! Exiting...")
|
|
logging.exception(e)
|
|
return 3
|
|
|
|
# prepare the executor
|
|
logging.info("Preparing for execution...")
|
|
executor = ProgramExecutor(program, loop=False)
|
|
|
|
# Prepare the HTTP Server
|
|
if Config.HTTP_ENABLE:
|
|
|
|
try:
|
|
loaded_wait_plugin = plugin_repo.get_plugin_instance('wait')
|
|
except KeyError:
|
|
loaded_wait_plugin = None
|
|
|
|
# The HTTP server stores weak reference to the executor and the wait plugin (if loaded)
|
|
httpd = TinyHTTPServer(HTTPControl(executor, loaded_wait_plugin), port=Config.HTTP_PORT)
|
|
httpd.start()
|
|
else:
|
|
httpd = None
|
|
|
|
# Setup signal handler
|
|
def handle_stop_signal(signum, frame):
|
|
logging.warning(f"Signal {signum} received. Aborting execution!")
|
|
executor.abort()
|
|
|
|
# Should be possible to call only once
|
|
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
|
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
|
|
|
signal.signal(signal.SIGINT, handle_stop_signal)
|
|
signal.signal(signal.SIGTERM, handle_stop_signal)
|
|
|
|
# Actually execute
|
|
execution_success = True
|
|
if Config.DRY_RUN:
|
|
logging.info("DRY_RUN enabled. Dumping command descriptions and exiting!")
|
|
for i, command in enumerate(program):
|
|
logging.info(f"{i:04d}: {command.describe()}")
|
|
else:
|
|
logging.info("Starting execution...")
|
|
executor.start()
|
|
executor.join()
|
|
|
|
if executor.state == ProgramExecutorStates.DONE:
|
|
logging.info("Program executed successfully!")
|
|
else:
|
|
logging.error(f"Could not finish execution! Executor state: {executor.state.name}")
|
|
execution_success = False
|
|
|
|
# Close all resources
|
|
logging.info("Cleaning up...")
|
|
if httpd:
|
|
httpd.shutdown()
|
|
plugin_repo.close()
|
|
|
|
# Set exit code
|
|
return 0 if execution_success else 5
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|