single_ursim_control/single_ursim_control/program_executor.py

89 lines
2.6 KiB
Python

from typing import List
from threading import Thread
from plugins import AbstractCommand
from enum import Enum
import logging
class ProgramExecutorStates(Enum):
PREPARING = 0
RUNNING = 1
DONE = 2
ABORTED = 3
CRASHED = 4
class ProgramExecutor(Thread):
def __init__(
self,
program: List[AbstractCommand],
loop: bool = False
):
super().__init__()
self._program = program
self._loop = loop
self._current_step_desc = {}
self._pc = 0
self._loop_counter = 0
self._state = ProgramExecutorStates.PREPARING
self._logger = logging.getLogger("program_executor")
def abort(self):
self._logger.debug("Aborting due to external request...")
self._state = ProgramExecutorStates.ABORTED
self._program[self._pc].abort()
def get_status(self) -> dict:
return {
"current_step": self._pc,
"program_length": len(self._program),
"step_description": self._current_step_desc,
"state": self._state.name,
"loop_count": self._loop_counter,
"config": {
"loop": self._loop
}
}
@property
def state(self):
# Used to check the successfulness of the run as well
return self._state
def stop_looping(self):
if self._loop:
self._logger.info("Looping disabled! Finishing current loop then exiting...")
self._loop = False
def run(self) -> None:
self._state = ProgramExecutorStates.RUNNING
self._logger.info("Start running program")
while self._state == ProgramExecutorStates.RUNNING: # needed for loop
self._loop_counter += 1
for i, step in enumerate(self._program):
self._pc = i
self._current_step_desc = step.describe()
self._logger.debug(f"Executing step {self._pc:04d}")
try:
step.execute()
except Exception as e:
self._logger.exception(e)
self._state = ProgramExecutorStates.CRASHED
return
if self._state != ProgramExecutorStates.RUNNING:
self._logger.info(f"State changed to {self._state.name}. Execution will now stop!")
return
if self._loop:
self._logger.debug("Looping program")
else:
self._logger.info("Program ended")
self._state = ProgramExecutorStates.DONE
return