Source code for pyniryo2.niryo_topic

import roslibpy
from threading import Event

from pyniryo2.exceptions import TopicException


[docs]class NiryoTopic(object): """ Represent a Ros Topic instance. It supports both the request of a single value and/or callbacks. This class is a wrapper of roslibpy Topic instance (https://roslibpy.readthedocs.io/en/latest/reference/index.html#topics) """ def __init__(self, client, topic_name, topic_type, conversion_function=None, timeout=3): """ :param client: Instance of the ROS connection. :type client: roslibpy.Ros :param topic_name: Topic name. :type topic_name: str :param topic_type: Topic type. :type topic_type: str :param conversion_function: convert the response of the topic in a specific type. :type conversion_function: function :param timeout: Timeout while waiting a message. :type timeout: float """ self.__topic_name = topic_name self.__topic_type = topic_type self.__topic = roslibpy.Topic(client, self.__topic_name, self.__topic_type) self.__user_callback = None self.__timeout = timeout self.__sync_topic_value = None self.__sync_event = Event() self.__sync_event.clear() self.__conversion_function = conversion_function def __del__(self): self.unsubscribe() def __call__(self): if self.__topic.is_subscribed: self.__sync_event.wait(timeout=self.__timeout) return self.__sync_topic_value self.__sync_event.clear() self.__topic.subscribe(self.__internal_callback) self.__sync_event.wait(timeout=self.__timeout) self.__topic.unsubscribe() return self.__sync_topic_value def __str__(self): return "Name: {}\nType: {}\nSubscribed: {}\nValue: {}".format(self.__topic_name, self.__topic_type, self.__topic.is_subscribed, self.__sync_topic_value) def __repr__(self): return self.__str__() @property def is_subscribed(self): """ Return the topic connection status. :return: True if already subscribed, False otherwise. :rtype: Bool """ return self.__topic.is_subscribed @property def value(self): """ Return the last value of the topic. :return: The last value of the topic. The value depends on the conversion function of the topic. By default, it will be a dict. :rtype: """ return self.__call__()
[docs] def subscribe(self, callback): """ Subscribe a callback to the topic. A TopicException will be thrown if the topic is already subscribed. :param callback: The callback function which is called at each incoming topic message. :type callback: function(dict, ) :return: None :rtype: None """ if not callback: raise TopicException("Empty callback on {} topic subscription".format(self.__topic_name)) if not self.__topic.is_subscribed: self.__user_callback = callback self.__sync_event.clear() self.__topic.subscribe(self.__internal_callback) else: raise TopicException("Topic {} already subscribed".format(self.__topic_name))
[docs] def unsubscribe(self): """ Unsubscribe to the topic. :rtype: None """ if self.__topic.is_subscribed: self.__topic.unsubscribe() self.__user_callback = None
[docs] def publish(self, msg): """ Publish a message on the topic :param msg: jsonified topic message content :type msg: dict of roslibpy.Message :rtype: None """ self.__topic.publish(roslibpy.Message(msg) if isinstance(msg, dict) else msg)
def __internal_callback(self, topic_value): """ This function is an internal callback that stores the last value of the subject and calls the user's callback if it is registered. :param topic_value: Message returned by the topic. :type topic_value: dict :return: None :rtype: None """ if self.__conversion_function is not None: self.__sync_topic_value = self.__conversion_function(topic_value) else: self.__sync_topic_value = topic_value self.__sync_event.set() if self.__user_callback: self.__user_callback(self.__sync_topic_value)