ALBasicAwareness - Getting Started¶
NAOqi Interaction engines - Overview | API | Getting Started
A simple example¶
Here we just start and stop BasicAwareness.
#! /usr/bin/env python
# -*- encoding: UTF-8 -*-
"""Example: Use startAwareness and stopAwareness Methods"""
import qi
import argparse
import sys
import time
def main(session):
"""
This example uses the setEnabled method.
"""
# Get the services ALBasicAwareness and ALMotion.
ba_service = session.service("ALBasicAwareness")
motion_service = session.service("ALMotion")
print "Waiting for the robot to be in wake up position"
motion_service.wakeUp()
print "Starting BasicAwareness"
ba_service.setEnabled(True)
print "Take some time to play with the robot"
time.sleep(30)
print "Stopping BasicAwareness"
ba_service.setEnabled(False)
print "Waiting for the robot to be in rest position"
motion_service.rest()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--ip", type=str, default="127.0.0.1",
help="Robot IP address. On robot or Local Naoqi: \
use '127.0.0.1'.")
parser.add_argument("--port", type=int, default=9559,
help="Naoqi port number")
args = parser.parse_args()
session = qi.Session()
try:
session.connect("tcp://" + args.ip + ":" + str(args.port))
except RuntimeError:
print ("Can't connect to Naoqi at ip \"" + args.ip + "\" on port " +
str(args.port) + ".\n"
"Please check your script arguments. Run with -h option for \
help.")
sys.exit(1)
main(session)
Detect someone, track and get information about them¶
In some cases we want to trigger an action when a person is found, and then keep focused on this person. The FullyEngaged mode suits this need, as it can detect someone and keep engaged with them. We know that the robot has found (or lost) someone by catching the events ALBasicAwareness/HumanTracked and ALBasicAwareness/PeopleLeft (see the event handling tutorial). It is then possible to trigger an action: in our example we start or stop speech recognition, then we get some information related to the tracked person from PeoplePerception and print them.
albasicawareness_human_found.py
#! /usr/bin/env python
# -*- encoding: UTF-8 -*-
"""Example: A Simple class to Find Human with BasicAwareness"""
import qi
import argparse
import sys
import time
class HumanTrackedEventWatcher(object):
""" A class to react to HumanTracked and PeopleLeft events """
def __init__(self, app):
"""
Initialisation of qi framework and event detection.
"""
super(HumanTrackedEventWatcher, self).__init__()
try:
app.start()
except RuntimeError:
print ("Can't connect to Naoqi at ip \"" + args.ip + "\" on port " +
str(args.port) + ".\n")
sys.exit(1)
session = app.session
self.subscribers_list = []
self.is_speech_reco_started = False
self.memory = session.service("ALMemory")
self.speech_reco = session.service("ALSpeechRecognition")
self.basic_awareness = session.service("ALBasicAwareness")
self.motion = session.service("ALMotion")
self.connect_callback("ALBasicAwareness/HumanTracked",
self.on_human_tracked)
self.connect_callback("ALBasicAwareness/HumanLost",
self.on_people_left)
def connect_callback(self, event_name, callback_func):
""" connect a callback for a given event """
subscriber = self.memory.subscriber(event_name)
subscriber.signal.connect(callback_func)
self.subscribers_list.append(subscriber)
def on_human_tracked(self, value):
""" callback for event HumanTracked """
print "Got HumanTracked: detected person with ID:", str(value)
if value >= 0: # found a new person
self.start_speech_reco()
position_human = self.get_people_perception_data(value)
[x, y, z] = position_human
print "The tracked person with ID", value, "is at the position:", \
"x=", x, "/ y=", y, "/ z=", z
def on_people_left(self, value):
""" callback for event PeopleLeft """
print "Got PeopleLeft: lost person", str(value)
self.stop_speech_reco()
def start_speech_reco(self):
""" start ASR when someone's detected in event handler class """
if not self.is_speech_reco_started:
try:
self.speech_reco.setVocabulary(["yes", "no"], False)
except RuntimeError:
print "ASR already started"
print "Starting speech recognition"
self.speech_reco.subscribe("BasicAwareness_Test")
self.is_speech_reco_started = True
def stop_speech_reco(self):
""" stop ASR when someone's detected in event handler class """
if self.is_speech_reco_started:
print "Stopping speech recognition"
self.speech_reco.unsubscribe("BasicAwareness_Test")
self.is_speech_reco_started = False
def get_people_perception_data(self, id_person_tracked):
"""
return information related to the person who has the id
"id_person_tracked" from People Perception
"""
memory_key = "PeoplePerception/Person/" + str(id_person_tracked) + \
"/PositionInWorldFrame"
return self.memory.getData(memory_key)
def run(self):
"""
this example uses the setEngagementMode, startAwareness and
stopAwareness methods
"""
# start
print "Waiting for the robot to be in wake up position"
self.motion.wakeUp()
print "Starting BasicAwareness with the fully engaged mode"
self.basic_awareness.setEngagementMode("FullyEngaged")
self.basic_awareness.setEnabled(True)
# loop on, wait for events until manual interruption
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print "Interrupted by user, shutting down"
# stop
print "Stopping BasicAwareness"
self.basic_awareness.setEnabled(False)
self.stop_speech_reco()
print "Waiting for the robot to be in rest position"
self.motion.rest()
sys.exit(0)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--ip", type=str, default="127.0.0.1",
help="Robot IP address. On robot or Local Naoqi: use \
'127.0.0.1'.")
parser.add_argument("--port", type=int, default=9559,
help="Naoqi port number")
args = parser.parse_args()
# Initialize qi framework.
connection_url = "tcp://" + args.ip + ":" + str(args.port)
app = qi.Application(["HumanTrackedEventWatcher",
"--qi-url=" + connection_url])
human_tracked_event_watcher = HumanTrackedEventWatcher(app)
human_tracked_event_watcher.run()