from typing import List from threading import Thread from plugins import AbstractCommand from enum import Enum import logging class ProgramExecutorStates(Enum): PREPARING = 0 RUNNING = 1 DONE = 2 ABORTED = 3 CRASHED = 4 class ProgramExecutor(Thread): def __init__( self, program: List[AbstractCommand], loop: bool = False ): super().__init__() self._program = program self._loop = loop self._current_step_desc = {} self._pc = 0 self._loop_counter = 0 self._state = ProgramExecutorStates.PREPARING self._logger = logging.getLogger("program_executor") def abort(self): self._state = ProgramExecutorStates.ABORTED # TODO: implement abort def get_status(self) -> dict: return { "current_step": self._pc, "program_length": len(self._program), "step_description": self._current_step_desc, "state": self._state.name, "loop_count": self._loop_counter, "config": { "loop": self._loop } } @property def state(self): return self._state def run(self) -> None: self._state = ProgramExecutorStates.RUNNING self._logger.info("Start running program") while True: # needed for loop self._loop_counter += 1 for i, step in enumerate(self._program): self._pc = i self._current_step_desc = step.describe() self._logger.debug(f"Executing step {self._pc}") try: step.execute() except Exception as e: self._logger.exception(e) self._state = ProgramExecutorStates.CRASHED return # TODO: jogging wait if self._loop: self._logger.debug("Looping program") else: self._logger.debug("Program ended") break self._state = ProgramExecutorStates.DONE