Source code for pyniryo2.sound.sound

# Communication imports
import functools
import base64
from pyniryo2.robot_commander import RobotCommander, RobotCommandException

from .services import SoundServices
from .topics import SoundTopics
from .enums import Language


def check_ned2_version(func):
    """
    Decorator that check the robot version
    """

    @functools.wraps(func)
    def wrap(*args, **kwargs):
        robot_instance = args[0]
        if robot_instance.client.hardware_version != 'ned2':
            raise Exception("Error Code : BAD_HARDWARE_VERSION\n"
                            "Message : Wrong robot hardware version, feature only available on Ned2")

        return func(*args, **kwargs)

    return wrap


[docs]class Sound(RobotCommander): # --- Public functions --- # def __init__(self, client): """ Sound robot functions Example: :: ros_instance = NiryoRos("127.0.0.1") # Hotspot sound_interface = Sound(ros_instance) :param client: Niryo ROS client :type client: NiryoRos """ super(Sound, self).__init__(client) if self._client.hardware_version == 'ned2': self._services = SoundServices(self._client) self._topics = SoundTopics(self._client) else: self._services = None self._topics = None def __call__(self, *args, **kwargs): self.play(*args, **kwargs) @property def sounds(self): """ Returns the list of available sounds in the robot Examples: :: sounds_list = sound.sounds :return: Returns the list of available sounds in the robot :rtype: list[str] """ return list(self._topics.sound_database_topic().keys())
[docs] @check_ned2_version def get_sounds(self): """ Returns the list of available sounds in the robot Examples: :: sounds_list = sound.get_sounds() :return: Returns the list of available sounds in the robot :rtype: list[str] """ return self.sounds
[docs] @check_ned2_version def get_sound_duration(self, sound_name): """ Get the duration of a sound in seconds Examples: :: sound_name = sound.get_sounds()[0] sound_duration = sound.get_sound_duration(sound_name) sound_duration = sound.get_sound_duration('test_sound.mp3') :return: Returns the duration of a sound in seconds :rtype: float """ sounds = self._topics.sound_database_topic() self._check_dict_belonging(sound_name, sounds) return sounds[sound_name]
[docs] @check_ned2_version def play(self, sound_name, wait_end=False, start_time_sec=0, end_time_sec=0): """ Play a sound that as already been imported by the user on the robot. Example: :: # If you know that the sound test_sound.wav is already imported on the robot sound.play_sound_user("test_sound.wav") # If you want to play the first sound of the ones that are already on the robot without knowing its name sound_name = sound.get_sounds()[0] sound_duration = sound.play(sound_name) # Waits until the sound has been fully played sound_duration = sound.play(sound_name, wait_end=True) # Doesn't wait until the sound has been fully played sound_duration = sound.play(sound_name, wait_end=False) # Plays sound from 1.1 seconds from start to 4.3 seconds from start sound_duration = sound.play(sound_name, start_time_sec=1.1, end_time_sec=4.3) :param: sound_name: Name of the sound that will be played :type sound_name: str :param wait_end: wait for the end of the sound before exiting the function :type wait_end: bool :param start_time_sec: start the sound from this value in seconds :type start_time_sec: float :param end_time_sec: end the sound at this value in seconds :type end_time_sec: float :rtype: None """ self._check_instance(sound_name, str) req = self._services.play_sound_request(sound_name, start_time_sec, end_time_sec, wait_end) resp = self._services.play_sound_service.call(req) self._check_result_status(resp)
[docs] @check_ned2_version def stop(self): """ Stop a sound being played. It will get automatically the name of the sound being played and stop it. Example: :: self.sound.stop() :rtype: None """ req = self._services.stop_sound_request() resp = self._services.stop_sound_service.call(req) self._check_result_status(resp)
@property def state(self): """ Returns the sound state client which can be used synchronously or asynchronously to obtain the current played sound. Examples: :: # Get last value sound.state() sound.state.value # Subscribe a callback def sound_callback(sound_name): print sound_name sound.state.subscribe(sound_callback) sound.state.unsubscribe() :return: sound state topic instance :rtype: NiryoTopic """ return self._topics.current_sound_topic @property def volume(self): """ Returns the volume state client which can be used synchronously or asynchronously to obtain the current volume. Examples: :: # Get last value sound.volume() sound.volume.value # Subscribe a callback def volume_callback(value): print value sound.volume.subscribe(volume_callback) sound.volume.unsubscribe() :return: volume topic instance :rtype: NiryoTopic """ return self._topics.volume_topic
[docs] @check_ned2_version def get_volume(self): """ Returns the volume of the robot. The sound can be set between 0 (sound off) and 100 (sound max) Examples: :: # Get the volume of the sound sound.get_volume() :return: int8 corresponding to the volume (0: sound off, 100: sound max) :rtype: int8 """ return self._topics.volume_topic()
[docs] @check_ned2_version def set_volume(self, sound_volume): """ Set the volume of the robot. You can set it between 0 and 100 (0: sound off and 100: sound max). If you put less than 0, the volume will be set to 0. If you put more than 100, the volume will be set to 100. Example: :: # Set the volume to 25 self.sound.set_volume(25) self.sound.play_sound_user("test_sound.wav") :param sound_volume: Between O and 100 (0 sound off and 100 sound maximum) :type sound_volume: int8 :rtype: None """ self._check_range_belonging(sound_volume, 0, 200) req = self._services.set_sound_volume_request(sound_volume) resp = self._services.set_sound_volume_service.call(req) self._check_result_status(resp)
[docs] @check_ned2_version def delete(self, sound_name): """ Delete a sound imported on the robot Example: :: self.sound.delete("test_sound.wav") :param sound_name: For example, test.wav :type sound_name: str :rtype: None """ req = self._services.delete_sound_request(sound_name) resp = self._services.manage_sound_service.call(req) self._check_result_status(resp)
[docs] @check_ned2_version def save(self, sound_name, sound_path): """ Import a sound on the RaspberryPi of the robot. To do that, you will need the encoded data from a wav or mp3 sound. It is preferable to put the encoded data from the sound on a text file and directly read it from this file. You also need to give the name of the sound you want to import. Example: :: sound_name = "test_import_sound.wav" sound_path = "/home/niryo/test_sound.wav" ros_instance = pyniryo2.NiryoRos("10.10.10.10") sound = pyniryo2.Sound(ros_instance) sound.save(sound_name, sound_path) sound.play(sound_name) :param sound_name: For example, test.wav. Il will be the name of the sound in the robot :type sound_name: str :param sound_path: absolute path to the sound file :type sound_path: str :rtype: None """ if not sound_name.endswith('.mp3') and not sound_name.endswith('.wav'): raise RobotCommandException("The sound name {} must end with the suffix .mp3 or .wav.".format(sound_name)) with open(sound_path, 'rb') as f: sound_data = f.read() base64_bytes = base64.b64encode(sound_data) base64_message = base64_bytes.decode('ascii') req = self._services.import_sound_request(sound_name, base64_message) resp = self._services.manage_sound_service.call(req) self._check_result_status(resp)
[docs] @check_ned2_version def say(self, text, language=Language.ENGLISH): """ Use gtts (Google Text To Speech) to interpret a string as sound Languages available are: :: - English: Language. ENGLISH - French: Language.FRENCH - Spanish: Language.SPANISH - Mandarin: Language.MANDARIN - Portuguese: Language.PORTUGUESE Example :: robot.say("Hello", Language.ENGLISH) robot.say("Bonjour", Language.FRENCH) robot.say("Hola", Language.SPANISH) :param text: Text that needs to be spoken < 100 char :type text: str :param language: language of the text :type language: Language :rtype: None """ self._check_enum_belonging(language, Language) self._check_type(text, str) req = self._services.tts_request(text, language) resp = self._services.tts_service.call(req) self._check_result_status(resp)