Aldebaran documentation What's new in NAOqi 2.4.3?

ALBasicAwareness - Getting Started

NAOqi Interaction engines - Overview | API | Getting Started


A simple example

Here we just start and stop BasicAwareness.

albasicawareness_example.py

#! /usr/bin/env python
# -*- encoding: UTF-8 -*-

"""Example: Use startAwareness and stopAwareness Methods"""

import qi
import argparse
import sys
import time


def main(session):
    """
    This example uses the startAwareness and stopAwareness methods.
    """
    # Get the services ALBasicAwareness and ALMotion.

    ba_service = session.service("ALBasicAwareness")
    motion_service = session.service("ALMotion")

    motion_service.wakeUp()
    print "Starting BasicAwareness"
    ba_service.startAwareness()
    print "Take some time to play with the robot"
    time.sleep(30)
    print "Stopping BasicAwareness"
    ba_service.stopAwareness()
    motion_service.rest()


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--ip", type=str, default="127.0.0.1",
                        help="Robot IP address. On robot or Local Naoqi: use '127.0.0.1'.")
    parser.add_argument("--port", type=int, default=9559,
                        help="Naoqi port number")

    args = parser.parse_args()
    session = qi.Session()
    try:
        session.connect("tcp://" + args.ip + ":" + str(args.port))
    except RuntimeError:
        print ("Can't connect to Naoqi at ip \"" + args.ip + "\" on port " + str(args.port) +".\n"
               "Please check your script arguments. Run with -h option for help.")
        sys.exit(1)
    main(session)

Detect someone, track and get information about them

In some cases we want to trigger an action when a person is found, and then keep focused on this person. The FullyEngaged mode suits this need, as it can detect someone and keep engaged with them. We know that the robot has found (or lost) someone by catching the events ALBasicAwareness/HumanTracked and ALBasicAwareness/PeopleLeft (see the event handling tutorial). It is then possible to trigger an action: in our example we start or stop speech recognition, then we get some information related to the tracked person from PeoplePerception and print them.

albasicawareness_human_found.py

#! /usr/bin/env python
# -*- encoding: UTF-8 -*-

"""Example: A Simple class to Find Human with BasicAwareness"""

import qi
import argparse
import sys
import time


class HumanTrackedEventWatcher(object):
    """ A class to react to HumanTracked and PeopleLeft events """

    def __init__(self, app):
        """
        Initialisation of qi framework and event detection.
        """
        super(HumanTrackedEventWatcher, self).__init__()
        app.start()
        session = app.session
        self.subscribers_list = []
        self.is_speech_reco_started = False

        # Get the services ALMemory, ALSpeechRecognition, ALBasicAwareness and ALMotion.
        self.memory = session.service("ALMemory")
        self.speech_reco = session.service("ALSpeechRecognition")
        self.basic_awareness = session.service("ALBasicAwareness")
        self.motion = session.service("ALMotion")
        self.connect_callback("ALBasicAwareness/HumanTracked",
                              self.on_human_tracked)
        self.connect_callback("ALBasicAwareness/HumanLost",
                              self.on_people_left)

    def connect_callback(self, event_name, callback_func):
        """ connect a callback for a given event """
        subscriber = self.memory.subscriber(event_name)
        subscriber.signal.connect(callback_func)
        self.subscribers_list.append(subscriber)

    def on_human_tracked(self, value):
        """ callback for event HumanTracked """
        print "got HumanTracked: detected person with ID:", str(value)
        if value >= 0:  # found a new person
            self.start_speech_reco()
            position_human = self.get_people_perception_data(value)
            [x, y, z] = position_human
            print "The tracked person with ID", value, "is at the position:", \
                "x=", x, "/ y=",  y, "/ z=", z

    def on_people_left(self, value):
        """ callback for event PeopleLeft """
        print "got PeopleLeft: lost person", str(value)
        self.stop_speech_reco()

    def start_speech_reco(self):
        """ start ASR when someone's detected in event handler class """
        if not self.is_speech_reco_started:
            try:
                self.speech_reco.setVocabulary(["yes", "no"], False)
            except RuntimeError:
                print "ASR already started"
            self.speech_reco.subscribe("BasicAwareness_Test")
            self.is_speech_reco_started = True
            print "started ASR"

    def stop_speech_reco(self):
        """ stop ASR when someone's detected in event handler class """
        if self.is_speech_reco_started:
            self.speech_reco.unsubscribe("BasicAwareness_Test")
            self.is_speech_reco_started = False
            print "stopped ASR"

    def get_people_perception_data(self, id_person_tracked):
        memory_key = "PeoplePerception/Person/" + str(id_person_tracked) + \
                     "/PositionInWorldFrame"
        return self.memory.getData(memory_key)

    def run(self):
        #start
        self.motion.wakeUp()
        self.basic_awareness.setEngagementMode("FullyEngaged")
        self.basic_awareness.startAwareness()

        #loop on, wait for events until manual interruption
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            print "Interrupted by user, shutting down"
            #stop
            self.basic_awareness.stopAwareness()
            self.stop_speech_reco()
            self.motion.rest()
            sys.exit(0)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--ip", type=str, default="127.0.0.1",
                        help="Robot IP address. On robot or Local Naoqi: use '127.0.0.1'.")
    parser.add_argument("--port", type=int, default=9559,
                        help="Naoqi port number")

    args = parser.parse_args()
    try:
        # Initialize qi framework.
        connection_url = "tcp://" + args.ip + ":" + str(args.port)
        app = qi.Application(["HumanTrackedEventWatcher", "--qi-url=" + connection_url])
    except RuntimeError:
        print ("Can't connect to Naoqi at ip \"" + args.ip + "\" on port " + str(args.port) +".\n"
               "Please check your script arguments. Run with -h option for help.")
        sys.exit(1)
    human_tracked_event_watcher = HumanTrackedEventWatcher(app)
    human_tracked_event_watcher.run()