Added sync plugin
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Pünkösd Marcell 2021-04-11 13:53:21 +02:00
parent 7e2b4aa371
commit 7d30328e21
5 changed files with 85 additions and 3 deletions

View File

@ -1 +1,3 @@
ur-rtde~=1.4.1 ur-rtde~=1.4.1
redis~=3.5.3
pyprocsync>=0.1.0

View File

@ -3,4 +3,7 @@ import os
# Config is loaded statically at import time # Config is loaded statically at import time
class Config: class Config:
pass RUN_ID = os.environ.get("RUN_ID", "")
SYNC_DELAY = float(os.environ.get("SYNC_DELAY", 1.0))
REDIS_URL = os.environ["REDIS_URL"]
SYNC_TIMEOUT = os.environ.get("SYNC_TIMEOUT", None) # Wait infinity by default

View File

@ -2,7 +2,7 @@
import os import os
import sys import sys
from config import Config from config import Config
from plugins import WaitPlugin from plugins import WaitPlugin, SyncPlugin
from plugin_repository import PluginRepository from plugin_repository import PluginRepository
from program_executor import ProgramExecutor from program_executor import ProgramExecutor
from http_server import ControllerHTTPServer from http_server import ControllerHTTPServer
@ -34,12 +34,15 @@ def main():
compiler_repo = PluginRepository() compiler_repo = PluginRepository()
compiler_repo.register_plugin(WaitPlugin) compiler_repo.register_plugin(WaitPlugin)
compiler_repo.register_plugin(SyncPlugin)
# Example code: # Example code:
compiler_repo.load_plugin("wait") compiler_repo.load_plugin("wait")
compiler_repo.load_plugin("sync")
program = [] program = []
program.append(compiler_repo.get_compiler("wait").compile(secs=2)) program.append(compiler_repo.get_compiler("wait").compile(secs=2))
program.append(compiler_repo.get_compiler("wait").compile(secs=3)) program.append(compiler_repo.get_compiler("wait").compile(secs=3))
program.append(compiler_repo.get_compiler("sync").compile(nodes=2, name="test"))
program.append(compiler_repo.get_compiler("wait").compile(secs=10)) program.append(compiler_repo.get_compiler("wait").compile(secs=10))
program.append(compiler_repo.get_compiler("wait").compile(secs=10)) program.append(compiler_repo.get_compiler("wait").compile(secs=10))

View File

@ -1,2 +1,3 @@
from .abstract_plugin import AbstractCommand, AbstractCommandCompiler, AbstractPlugin from .abstract_plugin import AbstractCommand, AbstractCommandCompiler, AbstractPlugin
from .wait_plugin import WaitPlugin from .wait_plugin import WaitPlugin
from .sync_plugin import SyncPlugin

View File

@ -0,0 +1,73 @@
from typing import Dict
from .abstract_plugin import AbstractCommand, AbstractPlugin, AbstractCommandCompiler
import logging
import time
from config import Config
import redis
from pyprocsync import ProcSync
class SyncCommand(AbstractCommand):
def __init__(self, logger: logging.Logger, procsync_instance: ProcSync, name: str, nodes: int):
if type(nodes) not in [int]:
raise ValueError("Nodes must be int")
if nodes <= 1:
raise ValueError("Nodes must be greater than 1")
self._logger = logger
self._procsync_instance = procsync_instance
self._name = name
self._nodes = nodes
def execute(self):
self._logger.debug(f"Synchronizing on {self._name} with {self._nodes} nodes...")
self._procsync_instance.sync(self._name, self._nodes, Config.SYNC_TIMEOUT)
self._logger.debug(f"Event {self._name} synchronized!")
def describe(self) -> dict:
return {
"command": "sync",
"params": {
"name": self._name,
"nodes": self._nodes,
},
"config": {
"SYNC_TIMEOUT": Config.SYNC_TIMEOUT,
"RUN_ID": Config.RUN_ID
}
}
class SyncCompiler(AbstractCommandCompiler):
def __init__(self, logger: logging.Logger, procsync_instance: ProcSync):
self._logger = logger
self._procsync_instance = procsync_instance
def compile(self, name: str, nodes: int) -> AbstractCommand:
return SyncCommand(self._logger, self._procsync_instance, name, nodes)
class SyncPlugin(AbstractPlugin):
plugin_name = "sync"
def __init__(self):
self._logger = logging.getLogger("plugin").getChild("sync")
self._procsync_instance = ProcSync(
redis_client=redis.from_url(Config.REDIS_URL),
run_id=Config.RUN_ID,
delay=Config.SYNC_DELAY
)
def load_compilers(self) -> Dict[str, AbstractCommandCompiler]:
return {
"sync": SyncCompiler(self._logger, self._procsync_instance)
}
def close(self):
self._procsync_instance.close()