Source code for pyniryo2.niryo_ros

# Python libraries
import roslibpy
import socket
import time
from threading import Thread

from pyniryo2.niryo_topic import NiryoTopic


[docs]class NiryoRosTimeoutException(Exception): pass
[docs]class NiryoRos(roslibpy.Ros): def __init__(self, ip_address="127.0.0.1", port=9090): """ Connect to your computer to ros: :: ros_instance = NiryoRos("127.0.0.1") # Simulation ros_instance = NiryoRos("10.10.10.10") # Hotspot ros_instance = NiryoRos("169.254.200.201") # Ethernet Based on the roslibpy ROS client: https://roslibpy.readthedocs.io/en/latest/reference/index.html#roslibpy.Ros :param ip_address: ip of the ros master :type ip_address: str :param port: usually 9090 :type port: int """ self.port = port self.ip_address = ip_address super(NiryoRos, self).__init__(host=ip_address, port=port) self.should_run = False self.__pyniryo2_ip = socket.gethostbyname(socket.gethostname()) self.__ping_loop_thread = Thread(target=self.__ping_loop) self.__pyniryo_ping_publisher = NiryoTopic(self, '/ping_pyniryo', 'std_msgs/Bool') self.run(timeout=5) self.__ping_loop_thread.start() self._hardware_version = self.get_param("/niryo_robot/hardware_version") time.sleep(0.5)
[docs] def close(self): self.should_run = False super(NiryoRos, self).close()
[docs] def terminate(self): self.should_run = False super(NiryoRos, self).terminate()
def __ping_loop(self): self.should_run = True while self.is_connected and self.should_run: self.__pyniryo_ping_publisher.publish(roslibpy.Message({'data': True})) time.sleep(0.1) @property def hardware_version(self): """ Get the hardware version of the robot (one, ned, ned2) :return: The hardware version of the robot (one, ned, ned2) :rtype: str """ return self._hardware_version