import threading from typing import Dict, List from .abstract_plugin import AbstractCommand, AbstractPlugin, AbstractCommandCompiler import logging import rtde_control import rtde_receive from config import Config # # I have to admit, the abort() solution is not the most robust thing I've ever designed # But for this simple scenario, it's good enough # class CombinedRobotInterface: USE_UPPER_RANGE_REGISTERS = False def __init__(self, addr: str): self._robot_c = rtde_control.RTDEControlInterface(addr) self._robot_r = rtde_receive.RTDEReceiveInterface( addr, use_upper_range_registers=self.USE_UPPER_RANGE_REGISTERS ) @property def robot_c(self) -> rtde_control.RTDEControlInterface: return self._robot_c @property def robot_r(self) -> rtde_receive.RTDEReceiveInterface: return self._robot_r def get_async_operation_progress(self) -> int: # https://gitlab.com/sdurobotics/ur_rtde/-/blob/968b08e357f2e562e9ae156a3036283e23fa74f2/src/rtde_receive_interface.cpp#L42 register_offset = 24 if self.USE_UPPER_RANGE_REGISTERS else 0 # https://gitlab.com/sdurobotics/ur_rtde/-/blob/968b08e357f2e562e9ae156a3036283e23fa74f2/src/rtde_receive_interface.cpp#L632 return self.robot_r.getOutputIntRegister(register_offset + 2) # This does not work.... lol # The rtde_receive interface only allows you to use the user defined registers not the internal ones def close(self): self._robot_c.stopScript() self._robot_c.disconnect() self._robot_r.disconnect() class URRTDEMoveJCommand(AbstractCommand): def __init__(self, logger: logging.Logger, robot: CombinedRobotInterface, pose: List[float], speed: float, acceleration: float): # TODO: validations self._logger = logger self._robot = robot self._pose = pose self._speed = speed self._acceleration = acceleration def execute(self): self._logger.debug(f"Moving robot arm: MoveJ to {self._pose} s: {self._speed} a: {self._acceleration}") self._robot.robot_c.moveJ(self._pose, self._speed, self._acceleration) self._logger.debug("Done moving") def abort(self): # TODO pass def describe(self) -> dict: return { "command": "moveJ", "params": { "pose": self._pose, "speed": self._speed, "acceleration": self._acceleration } } class URRTDEMoveLCommand(AbstractCommand): def __init__(self, logger: logging.Logger, robot: CombinedRobotInterface, q: List[float], speed: float, acceleration: float): # TODO: validations self._logger = logger self._robot = robot self._q = q self._speed = speed self._acceleration = acceleration def execute(self): self._logger.debug(f"Moving robot arm: MoveL to {self._q} s: {self._speed} a: {self._acceleration}") self._robot.robot_c.moveL(self._q, self._speed, self._acceleration) self._logger.debug("Done moving") def abort(self): # TODO pass def describe(self) -> dict: return { "command": "moveL", "params": { "q": self._q, "speed": self._speed, "acceleration": self._acceleration } } class URRTDEMoveJCompiler(AbstractCommandCompiler): def __init__(self, logger: logging.Logger, robot: CombinedRobotInterface): self._logger = logger self._robot = robot def compile(self, pose: List[float], speed: float = 1.05, acceleration: float = 1.4) -> AbstractCommand: return URRTDEMoveJCommand(self._logger, self._robot, pose, speed, acceleration) class URRTDEMoveLCompiler(AbstractCommandCompiler): def __init__(self, logger: logging.Logger, robot: CombinedRobotInterface): self._logger = logger self._robot = robot def compile(self, q: List[float], speed: float = 0.25, acceleration: float = 1.2) -> AbstractCommand: return URRTDEMoveLCommand(self._logger, self._robot, q, speed, acceleration) class URRTDEPlugin(AbstractPlugin): plugin_name = "ur_rtde" def __init__(self): self._logger = logging.getLogger("plugin").getChild("ur_rtde") self._robot = CombinedRobotInterface(Config.ROBOT_ADDRESS) logging.info(f"Connected to robot at {Config.ROBOT_ADDRESS}") def load_compilers(self) -> Dict[str, AbstractCommandCompiler]: return { "moveL": URRTDEMoveLCompiler(self._logger, self._robot), "moveJ": URRTDEMoveJCompiler(self._logger, self._robot) } def close(self): self._robot.close() logging.info(f"Disconnected from robot")