Source code for pyniryo2.conveyor.conveyor

# Communication imports
from pyniryo2.robot_commander import RobotCommander

from .enums import ConveyorID, ConveyorCan, ConveyorTTL, ConveyorDirection, ConveyorStatus
from .services import ConveyorServices
from .topics import ConveyorTopics, conveyor_id_to_conveyor_type


[docs]class Conveyor(RobotCommander): # --- Public functions --- # def __init__(self, client): """ Conveyor robot functions Example: :: ros_instance = NiryoRos("10.10.10.10") # Hotspot conveyor_interface = Conveyor(ros_instance) :param client: Niryo ROS client :type client: NiryoRos """ super(Conveyor, self).__init__(client) self._services = ConveyorServices(self._client) self._topics = ConveyorTopics(self._client)
[docs] def set_conveyor(self): """ Scan if a conveyor is plugged or not on a can bus. If a new conveyor is detected, activate it and return its conveyor ID. If a conveyor is already set, return its ID Example: :: # Get the id of the conveyor plugged conveyor_id = conveyor.set_conveyor() # Scan and set the conveyor plugged conveyor.set_conveyor() :return : New conveyor ID :rtype: ConveyorID """ req = self._services.get_ping_and_set_conveyor_request() resp = self._services.ping_and_set_conveyor_service.call(req) conveyor_id = conveyor_id_to_conveyor_type(resp["id"]) # If new conveyor has been found if conveyor_id != ConveyorID.NONE: # print("New conveyor detected and set with id :", conveyor_id) return conveyor_id else: last_conveyor_id = self.get_conveyors_feedback()[0].conveyor_id if last_conveyor_id != ConveyorID.NONE: # print("No new conveyor detected, actual conveyor id :", last_conveyor_id) return last_conveyor_id else: # print("No conveyor detected") return last_conveyor_id
[docs] def unset_conveyor(self, conveyor_id): """ Remove and unset a conveyor previously plugged and set Example: :: conveyor_id = conveyor.set_conveyor() conveyor.unset_conveyor(conveyor_id) conveyor.unset_conveyor(ConveyorID.ID_1) conveyor.unset_conveyor(ConveyorID.ID_2) :param conveyor_id: Basically, ConveyorID.ID_1 or ConveyorID.ID_2 :type conveyor_id: ConveyorID :rtype: None """ self._check_instance(conveyor_id, (ConveyorID, ConveyorTTL, ConveyorCan)) real_conv_id = self.__conveyor_number_to_conveyor_id(conveyor_id) req = self._services.unset_conveyor_request(real_conv_id) resp = self._services.ping_and_set_conveyor_service.call(req) self._check_result_status(resp)
[docs] def run_conveyor(self, conveyor_id, speed=100, direction=ConveyorDirection.FORWARD): """ Run conveyor at id 'conveyor_id' Example: :: # Set the conveyor and get its id and un it. # By default, the conveyor will go forward at a speed of 50 # You can't choose the parameters with this method conveyor_id = conveyor.set_conveyor() conveyor.run_conveyor(conveyor_id) :param conveyor_id: The conveyor id :type conveyor_id: ConveyorID :param speed: speed percentage between 0% and 100% :type speed: int :param direction: direction = ConveyorDirection.FORWARD :type direction: ConveyorDirection :rtype: None """ self.control_conveyor(conveyor_id, control_on=True, speed=speed, direction=direction)
[docs] def stop_conveyor(self, conveyor_id): """ Stop conveyor at id 'conveyor_id' Example: :: # Set the conveyor and get its id, run it and then stop it after 3 seconds # By default, the conveyor will go forward at a speed of 50 # When the conveyor is stopped, its control_on parameter is False and its speed is 0 import time conveyor_id = conveyor.set_conveyor() conveyor.run_conveyor(conveyor_id) time.sleep(3) conveyor.stop_conveyor(conveyor_id) :param conveyor_id: :type conveyor_id: ConveyorID :rtype: None """ self.control_conveyor(conveyor_id, control_on=False, speed=50, direction=ConveyorDirection.FORWARD)
[docs] def control_conveyor(self, conveyor_id, control_on, speed, direction): """ Control conveyor associated to conveyor_id. Then stops it if bool_control_on is False, else refreshes it speed and direction Example: :: # Example 1 # Set the conveyor and get its id, control it and then stop it after 3 seconds # It this first example, we control the conveyor at a speed of 100% and in the forward direction import time conveyor_id = conveyor.set_conveyor() conveyor.control_conveyo(conveyor_id, True, 100, ConveyorDirection.FORWARD.value) time.sleep(3) conveyor.stop_conveyor(conveyor_id) # Example 2 # Set the conveyor and get its id, control it and then stop it after 3 seconds # It this second example, we control the conveyor at a speed of 30% and in the backward direction import time conveyor_id = conveyor.set_conveyor() conveyor.control_conveyor(conveyor_id, True, 30, ConveyorDirection.BACKWARD.value) time.sleep(3) conveyor.stop_conveyor(conveyor_id) :param conveyor_id: :type conveyor_id: ConveyorID :param control_on: True for activate, False for deactivate :type control_on: bool :param speed: New speed which is a percentage of maximum speed (0% to 100%) :type speed: int :param direction: ConveyorDirection.FORWARD.value, ConveyorDirection.BACKWARD.value :type direction: ConveyorDirection :rtype: None """ self._check_instance(conveyor_id, (ConveyorID, ConveyorTTL, ConveyorCan)) self._check_type(control_on, bool) self._transform_to_type(speed, int) self._check_range_belonging(speed, 0, 100) self._check_enum_belonging(direction, ConveyorDirection) real_conv_id = self.__conveyor_number_to_conveyor_id(conveyor_id) req = self._services.control_conveyor_request(real_conv_id, control_on, speed, direction) resp = self._services.control_conveyor_service.call(req) self._check_result_status(resp)
@property def get_conveyors_feedback(self): """ Returns the conveyors feedback client which can be used synchronously or asynchronously to obtain the conveyors feedback: (conveyor_id, connection_state, running, speed, direction) Examples: :: # Get last value arm.get_conveyors_feedback() arm.get_conveyors_feedback.value # Subscribe a callback def conveyor_callback(conveyor_feedback): print conveyor_feedback arm.hardware_status.subscribe(conveyor_callback) # wait arm.hardware_status.unsubscribe() :return: namedtuple[conveyor_id, running, speed, direction] :rtype: namedtuple(ConveyorID, bool, int, ConveyorDirection) """ return self._topics.conveyor_feedback_topic @property def conveyors(self): """ Return list of registered conveyors :return: namedtuple[conveyor_id, running, speed, direction] :rtype: namedtuple(ConveyorID, bool, int, ConveyorDirection) """ return self._topics.conveyor_feedback_topic() def __conveyor_number_to_conveyor_id(self, conveyor_number): if conveyor_number == ConveyorID.ID_1: return ConveyorTTL.ID_1 if self._client.hardware_version in ['ned2'] else ConveyorCan.ID_1 elif conveyor_number == ConveyorID.ID_2: return ConveyorTTL.ID_2 if self._client.hardware_version in ['ned2'] else ConveyorCan.ID_2 else: return conveyor_number @staticmethod def __conveyor_id_to_conveyor_number(conveyor_id): if conveyor_id in [ConveyorTTL.ID_1, ConveyorCan.ID_1]: return ConveyorID.ID_1 elif conveyor_id in [ConveyorTTL.ID_2, ConveyorCan.ID_2]: return ConveyorID.ID_2 elif conveyor_id in [ConveyorTTL.NONE, ConveyorCan.NONE]: return ConveyorID.NONE else: return conveyor_id