diff --git a/requirements.txt b/requirements.txt index 71e51b5..0bdb51c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ -ur-rtde~=1.4.1 \ No newline at end of file +ur-rtde~=1.4.1 +redis~=3.5.3 +pyprocsync>=0.1.0 \ No newline at end of file diff --git a/single_ursim_control/config.py b/single_ursim_control/config.py index 92f72ab..84fe3d5 100644 --- a/single_ursim_control/config.py +++ b/single_ursim_control/config.py @@ -3,4 +3,7 @@ import os # Config is loaded statically at import time class Config: - pass + RUN_ID = os.environ.get("RUN_ID", "") + SYNC_DELAY = float(os.environ.get("SYNC_DELAY", 1.0)) + REDIS_URL = os.environ["REDIS_URL"] + SYNC_TIMEOUT = os.environ.get("SYNC_TIMEOUT", None) # Wait infinity by default diff --git a/single_ursim_control/main.py b/single_ursim_control/main.py index 74df636..274b62f 100644 --- a/single_ursim_control/main.py +++ b/single_ursim_control/main.py @@ -2,7 +2,7 @@ import os import sys from config import Config -from plugins import WaitPlugin +from plugins import WaitPlugin, SyncPlugin from plugin_repository import PluginRepository from program_executor import ProgramExecutor from http_server import ControllerHTTPServer @@ -34,12 +34,15 @@ def main(): compiler_repo = PluginRepository() compiler_repo.register_plugin(WaitPlugin) + compiler_repo.register_plugin(SyncPlugin) # Example code: compiler_repo.load_plugin("wait") + compiler_repo.load_plugin("sync") program = [] program.append(compiler_repo.get_compiler("wait").compile(secs=2)) program.append(compiler_repo.get_compiler("wait").compile(secs=3)) + program.append(compiler_repo.get_compiler("sync").compile(nodes=2, name="test")) program.append(compiler_repo.get_compiler("wait").compile(secs=10)) program.append(compiler_repo.get_compiler("wait").compile(secs=10)) diff --git a/single_ursim_control/plugins/__init__.py b/single_ursim_control/plugins/__init__.py index 83ef11d..51c8396 100644 --- a/single_ursim_control/plugins/__init__.py +++ b/single_ursim_control/plugins/__init__.py @@ -1,2 +1,3 @@ from .abstract_plugin import AbstractCommand, AbstractCommandCompiler, AbstractPlugin from .wait_plugin import WaitPlugin +from .sync_plugin import SyncPlugin \ No newline at end of file diff --git a/single_ursim_control/plugins/sync_plugin.py b/single_ursim_control/plugins/sync_plugin.py new file mode 100644 index 0000000..34c53ce --- /dev/null +++ b/single_ursim_control/plugins/sync_plugin.py @@ -0,0 +1,73 @@ +from typing import Dict + +from .abstract_plugin import AbstractCommand, AbstractPlugin, AbstractCommandCompiler +import logging +import time +from config import Config + +import redis +from pyprocsync import ProcSync + + +class SyncCommand(AbstractCommand): + + def __init__(self, logger: logging.Logger, procsync_instance: ProcSync, name: str, nodes: int): + + if type(nodes) not in [int]: + raise ValueError("Nodes must be int") + + if nodes <= 1: + raise ValueError("Nodes must be greater than 1") + + self._logger = logger + self._procsync_instance = procsync_instance + self._name = name + self._nodes = nodes + + def execute(self): + self._logger.debug(f"Synchronizing on {self._name} with {self._nodes} nodes...") + self._procsync_instance.sync(self._name, self._nodes, Config.SYNC_TIMEOUT) + self._logger.debug(f"Event {self._name} synchronized!") + + def describe(self) -> dict: + return { + "command": "sync", + "params": { + "name": self._name, + "nodes": self._nodes, + }, + "config": { + "SYNC_TIMEOUT": Config.SYNC_TIMEOUT, + "RUN_ID": Config.RUN_ID + } + } + + +class SyncCompiler(AbstractCommandCompiler): + + def __init__(self, logger: logging.Logger, procsync_instance: ProcSync): + self._logger = logger + self._procsync_instance = procsync_instance + + def compile(self, name: str, nodes: int) -> AbstractCommand: + return SyncCommand(self._logger, self._procsync_instance, name, nodes) + + +class SyncPlugin(AbstractPlugin): + plugin_name = "sync" + + def __init__(self): + self._logger = logging.getLogger("plugin").getChild("sync") + self._procsync_instance = ProcSync( + redis_client=redis.from_url(Config.REDIS_URL), + run_id=Config.RUN_ID, + delay=Config.SYNC_DELAY + ) + + def load_compilers(self) -> Dict[str, AbstractCommandCompiler]: + return { + "sync": SyncCompiler(self._logger, self._procsync_instance) + } + + def close(self): + self._procsync_instance.close()