Files
single_ursim_control/single_ursim_control/plugins/sleep_plugin.py
marcsello 964a072e80
All checks were successful
continuous-integration/drone/push Build is passing
Added abort to command
2021-04-11 14:10:14 +02:00

65 lines
1.6 KiB
Python

from typing import Dict
from .abstract_plugin import AbstractCommand, AbstractPlugin, AbstractCommandCompiler
import logging
from threading import Event
class SleepCommand(AbstractCommand):
def __init__(self, logger: logging.Logger, secs: float):
if type(secs) not in [float, int]:
raise ValueError("Secs must be float or int")
if secs <= 0:
raise ValueError("Secs must be a positive integer")
self._secs = secs
self._logger = logger
self._event = Event()
def execute(self):
self._logger.debug(f"Sleeping for {self._secs} seconds")
self._event.clear()
aborted = self._event.wait(timeout=self._secs)
if aborted:
self._logger.warning("Sleeping aborted externally!")
else:
self._logger.debug(f"Slept for {self._secs} seconds")
def abort(self):
self._event.set() # <- force the event.wait to return
def describe(self) -> dict:
return {
"command": "sleep",
"params": {
"secs": self._secs
}
}
class SleepCompiler(AbstractCommandCompiler):
def __init__(self, logger: logging.Logger):
self._logger = logger
def compile(self, secs: float) -> AbstractCommand:
return SleepCommand(self._logger, secs)
class SleepPlugin(AbstractPlugin):
plugin_name = "sleep"
def __init__(self):
self._logger = logging.getLogger("plugin").getChild("sleep")
def load_compilers(self) -> Dict[str, AbstractCommandCompiler]:
return {
"sleep": SleepCompiler(self._logger)
}
def close(self):
pass