SoftBank Robotics documentation What's new in NAOqi 2.8?

ALBasicAwareness - Getting Started

NAOqi Interaction engines - Overview | API | Getting Started


A simple example

Here we just start and stop BasicAwareness.

albasicawareness_example.py

#! /usr/bin/env python
# -*- encoding: UTF-8 -*-

"""Example: Use startAwareness and stopAwareness Methods"""

import qi
import argparse
import sys
import time


def main(session):
    """
    This example uses the setEnabled method.
    """
    # Get the services ALBasicAwareness and ALMotion.

    ba_service = session.service("ALBasicAwareness")
    motion_service = session.service("ALMotion")

    print "Waiting for the robot to be in wake up position"
    motion_service.wakeUp()

    print "Starting BasicAwareness"
    ba_service.setEnabled(True)

    print "Take some time to play with the robot"
    time.sleep(30)

    print "Stopping BasicAwareness"
    ba_service.setEnabled(False)

    print "Waiting for the robot to be in rest position"
    motion_service.rest()


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--ip", type=str, default="127.0.0.1",
                        help="Robot IP address. On robot or Local Naoqi: \
                        use '127.0.0.1'.")
    parser.add_argument("--port", type=int, default=9559,
                        help="Naoqi port number")

    args = parser.parse_args()
    session = qi.Session()
    try:
        session.connect("tcp://" + args.ip + ":" + str(args.port))
    except RuntimeError:
        print ("Can't connect to Naoqi at ip \"" + args.ip + "\" on port " +
               str(args.port) + ".\n"
               "Please check your script arguments. Run with -h option for \
help.")
        sys.exit(1)

    main(session)

Detect someone, track and get information about them

In some cases we want to trigger an action when a person is found, and then keep focused on this person. The FullyEngaged mode suits this need, as it can detect someone and keep engaged with them. We know that the robot has found (or lost) someone by catching the events ALBasicAwareness/HumanTracked and ALBasicAwareness/PeopleLeft (see the event handling tutorial). It is then possible to trigger an action: in our example we start or stop speech recognition, then we get some information related to the tracked person from PeoplePerception and print them.

albasicawareness_human_found.py

#! /usr/bin/env python
# -*- encoding: UTF-8 -*-

"""Example: A Simple class to Find Human with BasicAwareness"""

import qi
import argparse
import sys
import time


class HumanTrackedEventWatcher(object):
    """ A class to react to HumanTracked and PeopleLeft events """

    def __init__(self, app):
        """
        Initialisation of qi framework and event detection.
        """
        super(HumanTrackedEventWatcher, self).__init__()

        try:
            app.start()
        except RuntimeError:
            print ("Can't connect to Naoqi at ip \"" + args.ip + "\" on port " +
                   str(args.port) + ".\n")

            sys.exit(1)

        session = app.session
        self.subscribers_list = []
        self.is_speech_reco_started = False

        self.memory = session.service("ALMemory")
        self.speech_reco = session.service("ALSpeechRecognition")
        self.basic_awareness = session.service("ALBasicAwareness")
        self.motion = session.service("ALMotion")
        self.connect_callback("ALBasicAwareness/HumanTracked",
                              self.on_human_tracked)
        self.connect_callback("ALBasicAwareness/HumanLost",
                              self.on_people_left)

    def connect_callback(self, event_name, callback_func):
        """ connect a callback for a given event """
        subscriber = self.memory.subscriber(event_name)
        subscriber.signal.connect(callback_func)
        self.subscribers_list.append(subscriber)

    def on_human_tracked(self, value):
        """ callback for event HumanTracked """
        print "Got HumanTracked: detected person with ID:", str(value)
        if value >= 0:  # found a new person
            self.start_speech_reco()
            position_human = self.get_people_perception_data(value)
            [x, y, z] = position_human
            print "The tracked person with ID", value, "is at the position:", \
                "x=", x, "/ y=",  y, "/ z=", z

    def on_people_left(self, value):
        """ callback for event PeopleLeft """
        print "Got PeopleLeft: lost person", str(value)
        self.stop_speech_reco()

    def start_speech_reco(self):
        """ start ASR when someone's detected in event handler class """
        if not self.is_speech_reco_started:
            try:
                self.speech_reco.setVocabulary(["yes", "no"], False)
            except RuntimeError:
                print "ASR already started"

            print "Starting speech recognition"
            self.speech_reco.subscribe("BasicAwareness_Test")
            self.is_speech_reco_started = True

    def stop_speech_reco(self):
        """ stop ASR when someone's detected in event handler class """
        if self.is_speech_reco_started:
            print "Stopping speech recognition"
            self.speech_reco.unsubscribe("BasicAwareness_Test")
            self.is_speech_reco_started = False

    def get_people_perception_data(self, id_person_tracked):
        """
            return information related to the person who has the id
            "id_person_tracked" from People Perception
        """
        memory_key = "PeoplePerception/Person/" + str(id_person_tracked) + \
                     "/PositionInWorldFrame"
        return self.memory.getData(memory_key)

    def run(self):
        """
            this example uses the setEngagementMode, startAwareness and
            stopAwareness methods
        """
        # start
        print "Waiting for the robot to be in wake up position"
        self.motion.wakeUp()

        print "Starting BasicAwareness with the fully engaged mode"
        self.basic_awareness.setEngagementMode("FullyEngaged")
        self.basic_awareness.setEnabled(True)

        # loop on, wait for events until manual interruption
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            print "Interrupted by user, shutting down"
            # stop
            print "Stopping BasicAwareness"
            self.basic_awareness.setEnabled(False)

            self.stop_speech_reco()

            print "Waiting for the robot to be in rest position"
            self.motion.rest()

            sys.exit(0)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--ip", type=str, default="127.0.0.1",
                        help="Robot IP address. On robot or Local Naoqi: use \
                        '127.0.0.1'.")
    parser.add_argument("--port", type=int, default=9559,
                        help="Naoqi port number")

    args = parser.parse_args()

    # Initialize qi framework.
    connection_url = "tcp://" + args.ip + ":" + str(args.port)
    app = qi.Application(["HumanTrackedEventWatcher",
                          "--qi-url=" + connection_url])

    human_tracked_event_watcher = HumanTrackedEventWatcher(app)
    human_tracked_event_watcher.run()