161 lines
4.9 KiB
Python
161 lines
4.9 KiB
Python
import threading
|
|
from typing import Dict, List
|
|
|
|
from .abstract_plugin import AbstractCommand, AbstractPlugin, AbstractCommandCompiler
|
|
import logging
|
|
import rtde_control
|
|
import rtde_receive
|
|
|
|
from config import Config
|
|
|
|
|
|
#
|
|
# I have to admit, the abort() solution is not the most robust thing I've ever designed
|
|
# But for this simple scenario, it's good enough
|
|
#
|
|
|
|
class CombinedRobotInterface:
|
|
USE_UPPER_RANGE_REGISTERS = False
|
|
|
|
def __init__(self, addr: str):
|
|
self._robot_c = rtde_control.RTDEControlInterface(addr)
|
|
self._robot_r = rtde_receive.RTDEReceiveInterface(
|
|
addr,
|
|
use_upper_range_registers=self.USE_UPPER_RANGE_REGISTERS
|
|
)
|
|
|
|
@property
|
|
def robot_c(self) -> rtde_control.RTDEControlInterface:
|
|
return self._robot_c
|
|
|
|
@property
|
|
def robot_r(self) -> rtde_receive.RTDEReceiveInterface:
|
|
return self._robot_r
|
|
|
|
def get_async_operation_progress(self) -> int:
|
|
# https://gitlab.com/sdurobotics/ur_rtde/-/blob/968b08e357f2e562e9ae156a3036283e23fa74f2/src/rtde_receive_interface.cpp#L42
|
|
register_offset = 24 if self.USE_UPPER_RANGE_REGISTERS else 0
|
|
|
|
# https://gitlab.com/sdurobotics/ur_rtde/-/blob/968b08e357f2e562e9ae156a3036283e23fa74f2/src/rtde_receive_interface.cpp#L632
|
|
return self.robot_r.getOutputIntRegister(register_offset + 2)
|
|
|
|
# This does not work.... lol
|
|
# The rtde_receive interface only allows you to use the user defined registers not the internal ones
|
|
|
|
def close(self):
|
|
self._robot_c.stopScript()
|
|
self._robot_c.disconnect()
|
|
self._robot_r.disconnect()
|
|
|
|
|
|
class URRTDEMoveJCommand(AbstractCommand):
|
|
|
|
def __init__(self,
|
|
logger: logging.Logger,
|
|
robot: CombinedRobotInterface,
|
|
q: List[float],
|
|
speed: float,
|
|
acceleration: float):
|
|
# TODO: validations
|
|
|
|
self._logger = logger
|
|
self._robot = robot
|
|
self._q = q
|
|
self._speed = speed
|
|
self._acceleration = acceleration
|
|
|
|
def execute(self):
|
|
self._logger.debug(f"Moving robot arm: MoveJ to {self._q} s: {self._speed} a: {self._acceleration}")
|
|
self._robot.robot_c.moveJ(self._q, self._speed, self._acceleration)
|
|
self._logger.debug("Done moving")
|
|
|
|
def abort(self):
|
|
# TODO
|
|
pass
|
|
|
|
def describe(self) -> dict:
|
|
return {
|
|
"command": "moveJ",
|
|
"params": {
|
|
"q": self._q,
|
|
"speed": self._speed,
|
|
"acceleration": self._acceleration
|
|
}
|
|
}
|
|
|
|
|
|
class URRTDEMoveLCommand(AbstractCommand):
|
|
|
|
def __init__(self,
|
|
logger: logging.Logger,
|
|
robot: CombinedRobotInterface,
|
|
pose: List[float],
|
|
speed: float,
|
|
acceleration: float):
|
|
# TODO: validations
|
|
|
|
self._logger = logger
|
|
self._robot = robot
|
|
self._pose = pose
|
|
self._speed = speed
|
|
self._acceleration = acceleration
|
|
|
|
def execute(self):
|
|
self._logger.debug(f"Moving robot arm: MoveL to {self._pose} s: {self._speed} a: {self._acceleration}")
|
|
self._robot.robot_c.moveL(self._pose, self._speed, self._acceleration)
|
|
self._logger.debug("Done moving")
|
|
|
|
def abort(self):
|
|
# TODO
|
|
pass
|
|
|
|
def describe(self) -> dict:
|
|
return {
|
|
"command": "moveL",
|
|
"params": {
|
|
"pose": self._pose,
|
|
"speed": self._speed,
|
|
"acceleration": self._acceleration
|
|
}
|
|
}
|
|
|
|
|
|
class URRTDEMoveJCompiler(AbstractCommandCompiler):
|
|
|
|
def __init__(self, logger: logging.Logger, robot: CombinedRobotInterface):
|
|
self._logger = logger
|
|
self._robot = robot
|
|
|
|
def compile(self, q: List[float], speed: float = 1.05, acceleration: float = 1.4) -> AbstractCommand:
|
|
return URRTDEMoveJCommand(self._logger, self._robot, q, speed, acceleration)
|
|
|
|
|
|
class URRTDEMoveLCompiler(AbstractCommandCompiler):
|
|
|
|
def __init__(self, logger: logging.Logger, robot: CombinedRobotInterface):
|
|
self._logger = logger
|
|
self._robot = robot
|
|
|
|
def compile(self, pose: List[float], speed: float = 0.25, acceleration: float = 1.2) -> AbstractCommand:
|
|
return URRTDEMoveLCommand(self._logger, self._robot, pose, speed, acceleration)
|
|
|
|
|
|
class URRTDEPlugin(AbstractPlugin):
|
|
plugin_name = "ur_rtde"
|
|
|
|
def __init__(self):
|
|
self._logger = logging.getLogger("plugin").getChild("ur_rtde")
|
|
|
|
self._robot = CombinedRobotInterface(Config.ROBOT_ADDRESS)
|
|
self._logger.info(f"Connected to robot at {Config.ROBOT_ADDRESS}")
|
|
|
|
def load_compilers(self) -> Dict[str, AbstractCommandCompiler]:
|
|
return {
|
|
"moveL": URRTDEMoveLCompiler(self._logger, self._robot),
|
|
"moveJ": URRTDEMoveJCompiler(self._logger, self._robot)
|
|
}
|
|
|
|
def close(self):
|
|
self._robot.close()
|
|
logging.info(f"Disconnected from robot")
|