diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..ce62d98e8b7ebbd97da37a1bb626581041d2e14e --- /dev/null +++ b/.gitignore @@ -0,0 +1,165 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST +venv/ + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + + +# VS Code +.vscode/ \ No newline at end of file diff --git a/AUTHORS.rst b/AUTHORS.rst new file mode 100644 index 0000000000000000000000000000000000000000..1ddb05c9e037859b63a22b271536e95c9827e403 --- /dev/null +++ b/AUTHORS.rst @@ -0,0 +1,19 @@ +======= +Credits +======= + +Development Lead +---------------- + +* Erik Trygg + +Contributors +------------ + +None yet. Why not be the first? + + +Other mentions +-------------- + +* Novo Nordisk A/S, Denmark, for open sourcing the initial version of this piece software (February 2025). diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000000000000000000000000000000000000..56367140157cd4e7ec02d9e09b2275f8c44b107f --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025, Erik Trygg + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/examples/run_server.py b/examples/run_server.py new file mode 100644 index 0000000000000000000000000000000000000000..716888e9aeb407c9976a899691afb0d415f87a95 --- /dev/null +++ b/examples/run_server.py @@ -0,0 +1,26 @@ +import asyncio +import logging +from os import environ + +from urx.sila2 import create_app_compatible + +if __name__ == "__main__": + logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + + # Simulation active on start + environ["APP_SIMULATION_MODE"] = "True" + + # UR Arm config + environ["APP_UR_CONFIG"] = "examples/ur_config.yaml" + environ["APP_UR_CONFIG_ERROR"] = "examples/ur_config_error.yaml" + environ["APP_UR_URL"] = "localhost" + + async def main(): + app = await create_app_compatible() + print("Starting server") + await app.start() + + asyncio.get_event_loop().run_until_complete(main()) diff --git a/examples/ur_config.yaml b/examples/ur_config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..35a1bfcbd390e1d8bacb97089186b26b6f4011f1 --- /dev/null +++ b/examples/ur_config.yaml @@ -0,0 +1,4 @@ +program_1: + program_name: MyProgram.urp + description: This program does things + required_parameters: [para1, para2] diff --git a/examples/ur_config_error.yaml b/examples/ur_config_error.yaml new file mode 100644 index 0000000000000000000000000000000000000000..6f2640f1e322cd3aae171c0754b5b7e64c1ce7cf --- /dev/null +++ b/examples/ur_config_error.yaml @@ -0,0 +1,5 @@ +- variable_name: "error_id" + no_error: [0] + raise_on_unknown: true + errors: + 1: "Unknown error. An unknown error occurred." diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000000000000000000000000000000000000..7bffae8392a5b45da9aef7901bb4197a1e4d90ff --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,59 @@ +[project] +name = "sila2-drv-universal_robots-ur" +version = "0.2.1" +authors = [ + { name="Erik Trygg", email="etrg@novonordisk.com" }, +] +description = "SiLA2 driver for controlling a Universal Robots robot arm." +readme = "README.md" +requires-python = ">=3.9" +license = {file = "LICENSE"} +classifiers = [ + "Programming Language :: Python :: 3", + "Operating System :: OS Independent", +] + +dependencies = [ + "httpx == 0.27.0", +] + +[project.optional-dependencies] +sila2 = [ + "PyYAML >= 6.0.1", + "sila2-feature-lib == v2024.49a0", + "unitelabs-cdk == 0.2.10", +] +dev = [ + "black == 24.4.2", + "bumpver", + "flake8 == 7.1.0", + "isort == 5.13.2", +] + + +[build-system] +requires = ["setuptools>=61.0.0"] +build-backend = "setuptools.build_meta" + +[tool.setuptools.packages.find] +where = ["src"] +include = ["urx*"] + +[tool.isort] +profile = "black" + +[tool.bumpver] +current_version = "0.2.1" +version_pattern = "MAJOR.MINOR.PATCH" +commit_message = "bump version {old_version} -> {new_version}" +tag_message = "v{new_version}" +tag_scope = "default" +commit = true +tag = true +push = false + +[tool.bumpver.file_patterns] +"pyproject.toml" = [ + 'current_version = "{version}"', + 'version = "{version}"', +] diff --git a/src/urx/__init__.py b/src/urx/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..44be9d27b8630e78b2bdf1374206df86c7f4deca --- /dev/null +++ b/src/urx/__init__.py @@ -0,0 +1,5 @@ +from .urx import URConnection + +__all__ = [ + "URConnection", +] diff --git a/src/urx/sila2/__init__.py b/src/urx/sila2/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..ab3397faf68bcb89e160af916bb1d956bdd59eb0 --- /dev/null +++ b/src/urx/sila2/__init__.py @@ -0,0 +1,6 @@ +from .main import create_app, create_app_compatible + +__all__ = [ + "create_app", + "create_app_compatible", +] diff --git a/src/urx/sila2/features/__init__.py b/src/urx/sila2/features/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..4d2aa8feb40e4d70020fbb88cc8e0922f95e9265 --- /dev/null +++ b/src/urx/sila2/features/__init__.py @@ -0,0 +1,5 @@ +from .robot_arm import generate_robot_arm_feature + +__all__ = [ + "generate_robot_arm_feature", +] diff --git a/src/urx/sila2/features/annotations/__init__.py b/src/urx/sila2/features/annotations/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..21d7fb5aa2535a295d661a97f274553ccd8f3836 --- /dev/null +++ b/src/urx/sila2/features/annotations/__init__.py @@ -0,0 +1,5 @@ +from .annotations import better_str_set_constraint + +__all__ = [ + "better_str_set_constraint", +] diff --git a/src/urx/sila2/features/annotations/annotations.py b/src/urx/sila2/features/annotations/annotations.py new file mode 100644 index 0000000000000000000000000000000000000000..653f75e5f9dc9daed55759360f4c6d017c6cd49b --- /dev/null +++ b/src/urx/sila2/features/annotations/annotations.py @@ -0,0 +1,13 @@ +import typing + +from unitelabs.cdk import sila + + +def better_str_set_constraint(valid_values: typing.Iterable[str]): + """Returns a set constraint if the valid_values list is not empty, otherwise returns a minimal string length constraint.""" + L = list(valid_values) + return ( + sila.constraints.Set(L) + if L and len(L) > 0 + else sila.constraints.MinimalLength(1) + ) diff --git a/src/urx/sila2/features/config.py b/src/urx/sila2/features/config.py new file mode 100644 index 0000000000000000000000000000000000000000..1e59b965ad88299b6c30cad48cefe1c52a149bad --- /dev/null +++ b/src/urx/sila2/features/config.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Optional, Union + +import yaml + + +@dataclass +class ProgramConfig: + """ + SiLA feature configuration item for program execution. + + Example (yaml): + ```yaml + program1: # Program alias + program_name: "program1.urp" + required_parameters: ["var1", "var2"] + description: "My first program" + extra: + parameter_rename: + var1: variable_name_passed_to_robot_1 + var2: variable_name_passed_to_robot_2 + ``` + """ + + program_name: str # "My_program.urp" + required_parameters: list[str] = field(default_factory=list) + optional_parameters: list[str] = field(default_factory=list) + pre_program: Optional[str] = None # Optional - program to auto run before this + post_program: Optional[str] = None # Optional - program to auto run after this + description: Optional[str] = None # Optional + extra: dict[str, Any] = field(default_factory=dict) + + @staticmethod + def build(data: dict) -> dict[str, ProgramConfig]: + return {k: ProgramConfig(**v) for k, v in data.items()} + + @staticmethod + def safe_load(config: Union[dict[str, ProgramConfig], Path, str]): + """If a path is provided, load the configuration from the file. If a dict is provided, return it.""" + if isinstance(config, Path) or isinstance(config, str): + with open(config, "r") as f: + raw = yaml.safe_load(f) + return ProgramConfig.build(raw) + elif isinstance(config, dict): + return config + else: + raise TypeError("Invalid configuration type") + + +@dataclass +class ErrorConfig: + """ + SiLA feature configuration item for error handling. + + Example (yaml): + ```yaml + - variable_name: "" # Variable to read error codes from + no_error: [0] # Values that indicate no error + raise_on_unknown: true # Rasie on unknown error code (default: true) + errors: + 1 : "Error number one" + 2 : "Error number two" + ``` + """ + + variable_name: str + no_error: list[int] = field(default_factory=lambda: [0]) + raise_on_unknown: bool = True + errors: dict[int, str] = field(default_factory=dict) + + @staticmethod + def build(data: Optional[Union[dict, list]]): + if data is None: + return [] # No configuration is ok + return [ErrorConfig(**e) for e in data] + + @staticmethod + def safe_load( + config: Union[list[ErrorConfig], Path, str, None], + ) -> list[ErrorConfig]: + """If a path is provided, load the configuration from the file. If a list is provided, return it.""" + if config is None: + return [] # No configuration is ok + elif isinstance(config, Path) or isinstance(config, str): + with open(config, "r") as f: + raw = yaml.safe_load(f) + return ErrorConfig.build(raw) + elif isinstance(config, list): + return config + else: + raise TypeError("Invalid configuration type") diff --git a/src/urx/sila2/features/robot_arm.py b/src/urx/sila2/features/robot_arm.py new file mode 100644 index 0000000000000000000000000000000000000000..befaa57ea59205ae2cb44d138d5e482c15dcf723 --- /dev/null +++ b/src/urx/sila2/features/robot_arm.py @@ -0,0 +1,239 @@ +import asyncio +import json +import logging +from dataclasses import asdict +from pathlib import Path +from typing import Annotated, Union + +from unitelabs.cdk import sila + +from ...urx import URConnection, URRobot +from .annotations import better_str_set_constraint +from .config import ErrorConfig, ProgramConfig +from .types import ConfigError, IOError, ParaWrite, TimeoutError + +try: + from sila2_feature_lib.simulation.v001_0.feature_ul import ( + SimulationModeGlobal as SimMode, + ) +except ImportError: + SimMode = None + +logger = logging.getLogger(__name__) + + +def is_simulation_active(): + """Check simulation mode.""" + return SimMode is not None and SimMode.is_simulation_active() + + +# TODO: Refactor this function to prevent C901 +# flake8: noqa: C901 +def generate_robot_arm_feature( + prg_config: Union[dict[str, ProgramConfig], Path, str], + err_config: Union[list[ErrorConfig], Path, str, None] = None, + *, + robot_ip: str, + prg_timeout: float = 90, + connection_timeout: float = 2, +): + """Create a feature for controlling a robot arm. + + Args: + prg_config (dict[str, ProgramConfig] | Path | str): Program configuration. + err_config (list[ErrorConfig] | Path | str | None): Error configuration. Defaults to None. + robot_ip (str): Robot IP address. + prg_timeout (float | None): Timeout for running a program. Defaults to 90. + connection_timeout (float | None): Timeout for connecting to the robot. Defaults to 2. + + """ + # Handle config loading + prg = ProgramConfig.safe_load(prg_config) + + # Handle error config loading + err = ErrorConfig.safe_load(err_config) + + class RobotArmService(sila.Feature): + def __init__( + self, + robot_ip: str, + prg_timeout: float, + connection_timeout: float, + *args, + **kwargs, + ): + super().__init__( + description="Service for controlling a robot arm.", *args, **kwargs + ) + self.prg_config = prg + self.err_config = err + self.robot_ip = robot_ip + self.prg_timeout = prg_timeout + self.timeout = connection_timeout + + async def _read_error_codes(self, robot: URRobot): + """ + Read error codes and raise if an error is found. + """ + for err in self.err_config: + try: + # Read error code + code = await robot.read_variable(err.variable_name) + except IOError as e: + # Variable not found, no alarm information can be retrieved + Warning(f"Error reading variable {err.variable_name}: {e}") + logger.warning(f"Error reading variable {err.variable_name}: {e}") + continue # Skip to next error config + + if code in err.no_error: # No error + continue + if code in err.errors: # Known error + raise IOError(description=f"{err.errors[code]} (code: {code})") + if err.raise_on_unknown: # Unknown error + raise IOError(description=f"Unknown error code: {code}") + + def _rename_parameters(self, parameters: list[ParaWrite], cfg: ProgramConfig): + renaming_dict: dict[str, str] = cfg.extra.get("parameter_rename", {}) + + for p in parameters: + # Replace name with new name if it exists else keep the same + p.name = renaming_dict.get(p.name, p.name) + + def _validate_parameters(self, parameters: list[ParaWrite], cfg: ProgramConfig): + """ + Check that the provided parameters are valid for the given program. + """ + for p in parameters: + if p.name not in cfg.required_parameters + cfg.optional_parameters: + logger.error("Variable %s not supported", p.name) + raise ConfigError(description=f"Variable {p.name} not supported") + for req_para in cfg.required_parameters: + if req_para not in [var.name for var in parameters]: + logger.error("Variable %s is required", req_para) + raise ConfigError(description=f"Variable {req_para} is required") + + @sila.ObservableCommand( + description="Run Program", + errors=[IOError, TimeoutError], + ) + async def run_program( + self, + name: Annotated[str, better_str_set_constraint(prg.keys())], + parameters: list[ParaWrite], + *, + status: sila.Status, + ): + """ + Run program with the given name. Any provided parameters will be written to \ + the robot before running the program. + + .. parameter:: Program name + .. parameter:: Parameter list + + """ + + # Read config + try: + cfg = self.prg_config[name] + except KeyError: + logger.error("Program %s not found", name) + raise ConfigError(description=f"Program {name} not found") + + # Validate parameters + self._validate_parameters(parameters, cfg) + # Rename parameters if specified in config + self._rename_parameters(parameters, cfg) + + # Validate pre and post programs (that they exist) + pre_program = self.prg_config[cfg.pre_program] if cfg.pre_program else None + post_program = ( + self.prg_config[cfg.post_program] if cfg.post_program else None + ) + + if is_simulation_active(): + if pre_program is not None: + logger.info("Simulating pre-program %s", pre_program.program_name) + await self.run_program(cfg.pre_program, [], status=status) + logger.info( + "Simulating program '%s' << %s", + cfg.program_name, + ParaWrite.pretty(parameters), + ) + await asyncio.sleep(0.01) # Simulate a delay + if post_program is not None: + logger.info("Simulating post-program %s", post_program.program_name) + await self.run_program(cfg.post_program, [], status=status) + logger.info("Simulation finished") + return + + try: + if pre_program is not None: + await self.run_program(cfg.pre_program, [], status=status) + + logger.info( + "Running program '%s' << %s", + cfg.program_name, + ParaWrite.pretty(parameters), + ) + async with URConnection( + url=self.robot_ip, timeout=self.timeout + ) as robot: + await robot.load_program(cfg.program_name) + for p in parameters: + await robot.write_variable( + p.name, float(p.value) if "." in p.value else int(p.value) + ) + await robot.play_program(timeout=self.prg_timeout) + + # Check for error codes + await self._read_error_codes(robot) + # await asyncio.sleep(2) # Wait a little bit extra for the program to finish + + if post_program is not None: + await self.run_program(cfg.post_program, [], status=status) + except asyncio.TimeoutError as e: + logger.exception(e) + raise TimeoutError(description="Timeout while running program") + except Exception as e: + logger.exception(e) + raise IOError(description=str(e)) + + @sila.UnobservableProperty( + errors=[IOError], + ) + async def status(self) -> str: + """ + Get the robot status. Use to check if the robot is ready to receive commands. + """ + + if is_simulation_active(): + logger.info("Simulating robot status") + return "Status OK - Simulation Active" + + try: + async with URConnection( + url=self.robot_ip, timeout=self.timeout + ) as robot: + return robot.robot_mode.name + except Exception as e: + logger.exception(e) + raise IOError(description="Error while getting robot status") + + @sila.UnobservableProperty() + async def settings(self) -> str: + """ + Get the robot settings. + """ + logger.info("Getting robot settings") + return json.dumps( + { + "robot-ip": self.robot_ip, + "config": {k: asdict(v) for k, v in self.prg_config.items()}, + }, + ) + + return RobotArmService( + robot_ip=robot_ip, + prg_timeout=prg_timeout, + connection_timeout=connection_timeout, + ) diff --git a/src/urx/sila2/features/types.py b/src/urx/sila2/features/types.py new file mode 100644 index 0000000000000000000000000000000000000000..97c1951fc969b9c2de0a236aa5fc28e32830cd0e --- /dev/null +++ b/src/urx/sila2/features/types.py @@ -0,0 +1,47 @@ +from dataclasses import dataclass +from typing import Annotated + +from unitelabs.cdk import sila + + +class ConfigError(sila.DefinedExecutionError): + """Generic configuration error.""" + + +class IOError(sila.DefinedExecutionError): + """Generic IO error.""" + + +class TimeoutError(sila.DefinedExecutionError): + """Generic timeout error.""" + + +@dataclass +class ParaWrite(sila.CustomDataType): + """ + Custom data type for writing variables to the robot in conjunction with the run_program command. + + .. parameter:: Parameter name + .. parameter:: Parameter value + """ + + name: Annotated[str, sila.constraints.MinimalLength(1)] + value: Annotated[str, sila.constraints.MinimalLength(1)] + + @classmethod + def pretty(cls, L: "list[ParaWrite]") -> str: + """ + Helper function for pretty printing a list of ParaWrite objects. + + + Example: + ``` + >> stringify([ParaWrite("a", "1"), ParaWrite("b", "2")]) + "a=1, b=2" + ``` + + """ + if len(L) == 0: + return "n/a" + + return ", ".join([(f"{p.name}={p.value}") for p in L]) diff --git a/src/urx/sila2/main.py b/src/urx/sila2/main.py new file mode 100644 index 0000000000000000000000000000000000000000..dfbcff9641c396703e014207d780c853302eca1e --- /dev/null +++ b/src/urx/sila2/main.py @@ -0,0 +1,54 @@ +import logging +import logging.config +import os + +from sila2_feature_lib import dynamic_import_config +from sila2_feature_lib.simulation.v001_0.feature_ul import SimulatorController +from unitelabs.cdk import Connector + +from .features import generate_robot_arm_feature + +logger = logging.getLogger(__name__) + +# Example (how to run the server) +# python -m unitelabs.cdk.cli.connector start --app urx.sila2:create_app +# Any configuration can be passed through a ".env" file + + +async def create_app_compatible(): + app = await create_app() + # Patch sila version - this allows it to run on older schedulers + [setattr(f, "sila2_version", "1.0") for f in app.sila_server.features.values()] + return app + + +async def create_app(): + app = Connector( + { + "sila_server": { + # "name": "", -- Removed to allow for .env override + "type": "RobotArmServer", + "description": "Server for controlling a Universal Robots robot arm.", + "version": "1.0.0", + "vendor_url": "https://www.novonordisk.com/", + } + } + ) + + # Simulator feature + app.register(SimulatorController()) + + # Robot arm feature + robot = generate_robot_arm_feature( + os.environ.get("APP_UR_CONFIG", "ur_config.yaml"), + os.environ.get("APP_UR_CONFIG_ERROR", None), + robot_ip=os.environ["APP_UR_URL"], + ) + app.register(robot) + + # Dynamically load any other features + ## Typical use-case: Labware-manipulation/site, etc. + for feat in dynamic_import_config(os.environ.get("APP_FEATURE_CONFIG", None)): + app.register(feat) + + return app diff --git a/src/urx/simulator/urx_sim.py b/src/urx/simulator/urx_sim.py new file mode 100644 index 0000000000000000000000000000000000000000..978ce74103d57f55dde0fc12a3223c99eccb7bc4 --- /dev/null +++ b/src/urx/simulator/urx_sim.py @@ -0,0 +1,153 @@ +import asyncio +import logging +import struct +import time +from contextlib import asynccontextmanager +from socket import socket +from typing import Literal, Optional + +logger = logging.getLogger(__name__) + + +@asynccontextmanager +async def URSimulation( + urc_socket: Optional[socket] = None, + poly_socket: Optional[socket] = None, +): + """ + Context manager for the UR simulator. + + ```python + # Create sockets somehow + poly = socketpair(AF_INET, SOCK_STREAM) + urc = socketpair(AF_INET, SOCK_STREAM) + # Create simulator + async with URSimulation(urc[0], poly[0]) as sim: + # Here: Simulator is ready to be connected to + sim.push_message("poly", b"bad_reply") # Next command sent to poly will cause an error + ``` + """ + sim = URSim() + await sim.open(urc_socket, poly_socket) + try: + yield sim + finally: + await sim.close() + + +class URSim: + """Universal Robot simulator.""" + + def __init__(self): + self.urc_buffer = [] + self.poly_buffer = [] + + self._program_started_at = None + + self.run_time = 2 + + @property + def is_running(self): + """Returns True if a program is running.""" + return ( + self._program_started_at is not None + and self._program_started_at + self.run_time > time.time() + ) + + async def close(self): + """Close sockets and cleanup resources.""" + + async def _close(sock: asyncio.StreamWriter): + if sock: + sock.close() + await sock.wait_closed() + + t1 = asyncio.gather( + _close(self.urc_write), + _close(self.poly_write), + ) + self.urc_write, self.urc_read = None, None + self.poly_write, self.poly_read = None, None + + await t1 + + async def open(self, urc: Optional[socket] = None, poly: Optional[socket] = None): + """Open sockets and start the simulator.""" + try: + self.urc_read, self.urc_write = await asyncio.open_connection(sock=urc) + self.poly_read, self.poly_write = await asyncio.open_connection(sock=poly) + asyncio.create_task(self._urc_stream()) + asyncio.create_task(self._poly_stream()) + except Exception: + await self.close() + raise + + def push_message(self, stream: Literal["poly", "urc"], msg: bytes): + """Define what the next reply message should be.""" + if stream == "urc": + self.urc_buffer.append(msg) + elif stream == "poly": + self.poly_buffer.append(msg) + else: + raise ValueError(f"Unknown stream: {stream}") + + async def _poly_stream(self): + try: + self.poly_write.write(b"Connected\n") + await self.poly_write.drain() + + while True: + msg: str = (await self.poly_read.read(1024)).decode().strip() + if len(msg) == 0: + return # Stream closed + if self.poly_buffer: + self.poly_write.write(self.poly_buffer.pop(0)) + else: + if msg.startswith("load "): + self.poly_write.write( + b"Loading program: " + msg[5:].encode() + b"\n" + ) + elif msg == "play": + self.poly_write.write(b"Starting program\n") + self._program_started_at = time.time() + elif msg == "stop": + self.poly_write.write(b"Stopped\n") + self._program_started_at = None + elif msg == "pause": + self.poly_write.write(b"Pausing program\n") + await self.poly_write.drain() + except Exception as e: + logger.error(f"Error in poly stream: {e}") + + async def _urc_stream(self): + try: + while True: + if self.urc_buffer: + self.urc_write.write(self.urc_buffer.pop(0)) + else: + self.urc_write.write(self.build_urc_msg()) + await self.urc_write.drain() + await asyncio.sleep(0.2) + except Exception as e: + logger.exception(e) + + def build_urc_msg(self) -> bytes: + return b"".join( + [ + struct.pack(">IB", 52, 16), + struct.pack(">IB", 47, 0), + struct.pack( + ">Q???????BBdddB", + 0, + *(5 * [False]), + self.is_running, + False, + 5, + 0, + 1.0, + 1.0, + 1.0, + 0, + ), + ] + ) diff --git a/src/urx/stream_parser.py b/src/urx/stream_parser.py new file mode 100644 index 0000000000000000000000000000000000000000..7286a8722a80c616f37e3cc77629189fdc736b6f --- /dev/null +++ b/src/urx/stream_parser.py @@ -0,0 +1,108 @@ +import logging +import struct +from dataclasses import dataclass +from enum import Enum +from typing import Callable, Coroutine + +logger = logging.getLogger(__name__) + + +class UrcMsgParser: + """Helper class to parse UR control messages.""" + + def __init__(self, read_cb: Callable[[int], Coroutine[None, None, bytes]]): + self.read = read_cb + self.buffer = b"" + + async def next(self): + """Get the next message from the stream or buffer. Returns `None` if no message is available.""" + self.buffer = b"" + last_mode = None + + def read_buffer(num_bytes: int) -> bytes: + if len(self.buffer) < num_bytes: + raise ValueError("Not enough data in buffer to read.") + req = self.buffer[:num_bytes] + self.buffer = self.buffer[num_bytes:] + return req + + header = MsgHeader.from_bytes(await self.read(MsgHeader.DATA_SIZE)) + self.buffer = await self.read(header.size - MsgHeader.DATA_SIZE) + + while len(self.buffer) > 0: + subheader = MsgHeader.from_bytes(read_buffer(MsgHeader.DATA_SIZE)) + + if subheader.size > len(self.buffer) + MsgHeader.DATA_SIZE: + break # Cannot deal with this... + + content = read_buffer(subheader.size - MsgHeader.DATA_SIZE) + + if header._type == MsgHeader.MESSAGE_TYPE_ROBOT_STATE: + if subheader._type == MsgHeader.PACKAGE_TYPE_ROBOT_MODE_DATA: + last_mode = RobotModeData.from_bytes(content) + else: + pass # Unknown sub-package type + else: + pass # Unknown package type + return last_mode + + +@dataclass +class MsgHeader: + """Header of a UR control message.""" + + size: int + _type: int + + DATA_SIZE = 5 + + PACKAGE_TYPE_ROBOT_MODE_DATA = 0 + MESSAGE_TYPE_ROBOT_STATE = 16 + + @classmethod + def from_bytes(cls, data: bytes): + return cls(*struct.unpack(">IB", data)) + + +@dataclass +class RobotModeData: + """Data structure for robot mode information.""" + + class Mode(Enum): + UNKNOWN = -2 + NO_CONTROLLER = -1 + DISCONNECTED = 0 + CONFIRM_SAFETY = 1 + BOOTING = 2 + POWER_OFF = 3 + POWER_ON = 4 + IDLE = 5 + BACKDRIVE = 6 + RUNNING = 7 + UPDATING_FIRMWARE = 8 + + timestamp: int + isRealRobotConnected: bool + isRealRobotEnabled: bool + isRobotPowerOn: bool + isEmergencyStopped: bool + isProtectiveStopped: bool + isProgramRunning: bool + isProgramPaused: bool + robotMode: int + controlMode: int + targetSpeedFraction: float + speedScaling: float + targetSpeedFractionLimit: float + reserved: int + + def get_robot_mode_enum(self) -> Mode: + try: + return self.Mode(self.robotMode) + except ValueError: + Warning(f"Unknown robot mode: {self.robotMode}") + return self.Mode.UNKNOWN + + @classmethod + def from_bytes(cls, data: bytes): + return cls(*struct.unpack(">Q???????BBdddB", data)) diff --git a/src/urx/urx.py b/src/urx/urx.py new file mode 100644 index 0000000000000000000000000000000000000000..9546d251401fbabb37acf724c633effa64080dfd --- /dev/null +++ b/src/urx/urx.py @@ -0,0 +1,389 @@ +from __future__ import annotations + +import asyncio +import json +import logging +import math +import time +from asyncio import open_connection +from collections import deque +from contextlib import asynccontextmanager +from socket import AF_INET, SOCK_STREAM, socket +from typing import Any, Iterable, Optional, Union + +from .stream_parser import RobotModeData, UrcMsgParser + +logger = logging.getLogger(__name__) + + +@asynccontextmanager +async def URConnection( + *, + url=None, + port_urc=30002, + port_poly=29999, + timeout: float = 2, + sock_urc: Optional[socket] = None, + sock_poly: Optional[socket] = None, +): + """Context manager for a UR robot connection. + + ```python + # Connect to robot + async with URConnection(url="localhost") as robot: + # Load program + await robot.load_program("my_program.urp") + # Start and wait for program to finish + await robot.play_program() + ``` + """ + + robot = URRobot( + url=url, + port_urc=port_urc, + port_poly=port_poly, + timeout=timeout, + sock_poly=sock_poly, + sock_urc=sock_urc, + ) + await robot.open() + try: + yield robot + finally: + await robot.close() + + +class StreamHandle: + """Helper class for managing a pair of StreamReader and StreamWriter object.""" + + def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + self._reader = reader + self._writer = writer + self.is_closed = False + + async def close(self): + if not self.is_closed: + self.is_closed = True + self._writer.close() + await self._writer.wait_closed() + + @classmethod + async def open(cls, host: str, port: int, sock: Optional[socket] = None): + if sock is None: + sock = socket(AF_INET, SOCK_STREAM) + sock.connect((host, port)) + return cls(*await open_connection(sock=sock)) + + async def wait_for_read(self, size: int, timeout: float) -> bytes: + """Read from stream and handle timeout and disconnects""" + rpl = await asyncio.wait_for(self._reader.read(size), timeout=timeout) + if size > 0 and len(rpl) == 0: + raise RuntimeError("Connection closed.") + return rpl + + async def write(self, data: bytes, drain=True): + """Write to stream buffer and optionally wait for it to be sent""" + self._writer.write(data) + if drain: + await self._writer.drain() + + +class URRobot: + """Class for interacting with a UR robot.""" + + def _get_status_obj(self): + if self._status_obj is None: + raise IOError("Status not available.") + return self._status_obj + + @property + def is_emergency_stopped(self) -> bool: + return self._get_status_obj().isEmergencyStopped + + @property + def is_program_paused(self) -> bool: + return self._get_status_obj().isProgramPaused + + @property + def is_program_running(self) -> bool: + return self._get_status_obj().isProgramRunning + + @property + def is_protective_stopped(self) -> bool: + return self._get_status_obj().isProtectiveStopped + + @property + def robot_mode(self) -> RobotModeData.Mode: + return self._get_status_obj().get_robot_mode_enum() + + def __init__( + self, + *, + url, + port_urc=30002, + port_poly=29999, + timeout: float = 2, + buffer_size=4096, + sock_urc: Optional[socket] = None, + sock_poly: Optional[socket] = None, + ): + self.ip = url + self.port_urc = port_urc + self.port_poly = port_poly + self.timeout = timeout + self.buffer_size = buffer_size + self.lock = asyncio.Lock() + self.sock_urc = sock_urc + self.sock_poly = sock_poly + self._status_obj = None + + async def _async_urc_stream_parser(self, init_done: asyncio.Event): + """Async stream parser for URC messages.""" + + async def read(num_bytes: int) -> bytes: + buff = b"" + while len(buff) < num_bytes: + buff += await self.urc.wait_for_read( + num_bytes - len(buff), self.timeout + ) + return buff + + parser = UrcMsgParser(read) + try: + while True: + try: + package = await parser.next() + if package is not None: + self._status_obj = package + init_done.set() # Signal status is available + except asyncio.TimeoutError: + # Sleep for a bit and try again + # Should do something about this here + logger.warning("URC Stream TimeoutError") + await asyncio.sleep(0.1) + except (asyncio.CancelledError, RuntimeError, AttributeError): + return # Bail + except Exception as e: + # Log error and continue + logger.warning( + f"Error in async_urc_stream_parser: {e}", exc_info=True + ) + await asyncio.sleep(0.1) + finally: + # Clear status object to indicate status is not available + self._status_obj = None + + @classmethod + def _validate_reply( + cls, + rpl: str, + succ_msgs: Union[str, Iterable[str]], + err_msgs: Union[str, Iterable[str], None] = None, + ): + """Validate responses.""" + if err_msgs is not None: + if isinstance(err_msgs, str): + err_msgs = [err_msgs] + for err in err_msgs: + if rpl.startswith(err): + raise IOError(f"Error: {rpl}") + if isinstance(succ_msgs, str): + succ_msgs = [succ_msgs] + for succ in succ_msgs: + if rpl.startswith(succ): + return # OK + + raise IOError(f"Unhandled response: {rpl}") + + async def close(self): + """Close connection to robot.""" + tasks = [] + if self.urc: + tasks.append(self.urc.close()) + if self.poly: + tasks.append(self.poly.close()) + + logger.info(f"Closing connection to {self.ip}") + await asyncio.gather(*filter(None.__ne__, tasks)) + logger.info(f"Connection to {self.ip} closed.") + + async def load_program(self, program_name: str): + """Load a program into the robot.""" + logger.info(f"Loading program: {program_name}") + async with self.lock: + if self.is_program_running: + raise RuntimeError("Program is running. Stop it first.") + if self.is_protective_stopped: + raise RuntimeError("Cannot start program due to protective stop.") + if self.is_emergency_stopped: + raise RuntimeError("Cannot start program due to emergency stop.") + await self.poly.write(f"load {program_name}\n".encode()) + rpl = await self.poly.wait_for_read(self.buffer_size, timeout=self.timeout) + self._validate_reply( + rpl.decode(), + "Loading program:", + ("File not found:", "Error while loading program:"), + ) + logger.info(f"Program successfully loaded: {program_name}") + + async def open(self): + """Open connection to robot.""" + logger.info(f"Connecting to {self.ip}") + self.urc = await StreamHandle.open(self.ip, self.port_urc, self.sock_urc) + + try: + self.poly = await StreamHandle.open(self.ip, self.port_poly, self.sock_poly) + await self.poly.wait_for_read(self.buffer_size, timeout=self.timeout) + init = asyncio.Event() + asyncio.create_task(self._async_urc_stream_parser(init)) + await asyncio.wait_for(init.wait(), 5) + logger.info(f"Connecting to {self.ip} established.") + except Exception: + await self.urc.close() + logger.warning(f"Failed to connect to {self.ip}.") + raise + + async def play_program(self, timeout: float = 30): + """Start the loaded program and wait for it to finish.""" + logger.info("Playing program") + async with self.lock: + for a in range(20): + if not self.is_program_running: + break + await asyncio.sleep(0.1) + if self.is_program_running: + raise RuntimeError("Program is already running.") + if self.is_protective_stopped: + raise RuntimeError("Cannot start program due to protective stop.") + if self.is_emergency_stopped: + raise RuntimeError("Cannot start program due to emergency stop.") + await self.poly.write("play\n".encode()) + rpl = await self.poly.wait_for_read(self.buffer_size, timeout=self.timeout) + + self._validate_reply( + rpl.decode(), "Starting program", "Failed to execute: play" + ) + logger.info("Program started. Waiting for program to finish.") + await asyncio.sleep(0.5) # Give it a second to start + await self._wait_for_program_finish_running(timeout) + logger.info("Program finished.") + + async def read_all_io(self, num_inputs: int = 8, num_outputs: int = 8): + """Read all digital inputs and outputs of the robot.""" + logger.debug("Reading all IOs") + DI, DO = "dig_in", "dig_out" + cmd = f"""def myProg(): + \n\tglobal _hidden_verificationVariable=0 + \n\tglobal {DI} = [{", ".join([f"get_digital_in({i})" for i in range(num_inputs)])}] + \n\tglobal {DO} = [{", ".join([f"get_digital_out({i})" for i in range(num_outputs)])}] + \nend\nmyProg()\n""" + async with self.lock: + await self.urc.write(cmd.encode()) + # await self.urc.wait_for_read(self.buffer_size, self.timeout) + await asyncio.sleep(1) + dig_in: list[int] = await self.read_variable(DI, no_lock=True) + dig_out: list[int] = await self.read_variable(DO, no_lock=True) + return (dig_in, dig_out) + + async def read_variable(self, name: str, no_lock=False) -> Any: + """Read a global variable from the robot.""" + logger.debug(f"Reading variable: {name}") + + async def _read(): + await self.poly.write(f"getVariable {name}\n".encode()) + rpl = await self.poly.wait_for_read(self.buffer_size, self.timeout) + if rpl.decode().strip() == "null": + raise IOError(f"Variable '{name}' not found.") + return json.loads(rpl.decode().strip()) + + if no_lock: + return await _read() + else: + async with self.lock: + return await _read() + + async def write_variable(self, name: str, value: Union[float, int]): + """Write a global variable on the robot.""" + value_str = f"{value:.5e}" if isinstance(value, float) else str(value) + logger.info(f"Writing variable: {name} = {value_str}") + + cmd = f"""def myProg(): + \n\tglobal _hidden_verificationVariable=0 + \n\tglobal {name} = {value_str} + \nend\nmyProg()\n""" + async with self.lock: + for a in range(30): + if not self.is_program_running: + break + await asyncio.sleep(0.1) + else: + raise RuntimeError("Program is running. Stop it first.") + await self.urc.write(cmd.encode()) + start = time.time() + while start + 3 > time.time(): # 3 seconds + await asyncio.sleep(0.1) + try: + v = await self.read_variable(name, no_lock=True) + # Handle exact match for strings and ints + if isinstance(value, int) and v == value: + break + elif isinstance(value, float) and math.isclose( + v, value, rel_tol=1e-5, abs_tol=1e-4 + ): + break + + except IOError: + pass + else: + raise IOError( + f"Failed to write variable '{name}' with value '{value}'." + ) + + async def stop(self): + """Stop the program running on the robot.""" + logger.info("Stopping program") + async with self.lock: + await self.poly.write("stop\n".encode()) + for i in range(10): + rpl = ( + await self.poly.wait_for_read( + self.buffer_size, + timeout=self.timeout, + ) + ).decode() + if "Stopped" in rpl: + break + if "please switch robot to Remote Control mode" in rpl: + raise RuntimeError("Please switch robot to Remote Control mode.") + + async def _wait_for_program_finish_running(self, timeout: float): + """Wait for the current program to finish running.""" + CYCLES_WITH_NOT_RUNNING = 2 + + start = time.time() + lossy_buffer = deque(maxlen=CYCLES_WITH_NOT_RUNNING) + was_paused = False + while time.time() - start < timeout: + await asyncio.sleep(0.1) + + if self.is_emergency_stopped: + raise RuntimeError("Emergency stop triggered.") + if self.is_protective_stopped: + raise RuntimeError("Protective stop triggered.") + + lossy_buffer.append( + not self.is_program_running and not self.is_program_paused + ) + + # Check that we've had not running for enough cycles + if len(lossy_buffer) == CYCLES_WITH_NOT_RUNNING and all(lossy_buffer): + break # All good - exit loop + + if self.is_program_paused and not was_paused: + logger.warning("Program paused.") + was_paused = True + elif not self.is_program_paused and was_paused: + logger.warning("Program unpaused.") + was_paused = False + else: + raise asyncio.TimeoutError("Timeout while running program") diff --git a/tests/test_urx.py b/tests/test_urx.py new file mode 100644 index 0000000000000000000000000000000000000000..98758ae55aa55218835dca66bfff4fbc73c613a1 --- /dev/null +++ b/tests/test_urx.py @@ -0,0 +1,34 @@ +import asyncio +import unittest +from socket import AF_INET, SOCK_STREAM, socketpair + +from urx import URConnection +from urx.simulator.urx_sim import URSimulation + + +class TestURX(unittest.IsolatedAsyncioTestCase): + def setUp(self) -> None: + poly = socketpair(AF_INET, SOCK_STREAM) + urc = socketpair(AF_INET, SOCK_STREAM) + self.sim = URSimulation(urc[0], poly[0]) + self.robot = URConnection(sock_urc=urc[1], sock_poly=poly[1]) + + async def test_Connect(self): + async with self.sim, self.robot as robot: + self.assertFalse(robot.is_program_running) + + async def test_Run(self): + async with self.sim as _, self.robot as robot: + await robot.load_program("test.urp") + t1 = asyncio.create_task(robot.play_program()) + + async def loop_while_not_running(): + while not robot.is_program_running: + await asyncio.sleep(0.1) + + await asyncio.wait_for(loop_while_not_running(), timeout=5) + await t1 + + async def test_Stop(self): + async with self.sim, self.robot as robot: + await robot.stop()