ALBasicAwareness - Getting Started¶
NAOqi Interaction engines - Overview | API | Getting Started
A simple example¶
Here we just start and stop BasicAwareness.
#! /usr/bin/env python
# -*- encoding: UTF-8 -*-
"""Example: Use startAwareness and stopAwareness Methods"""
import qi
import argparse
import sys
import time
def main(session):
"""
This example uses the startAwareness and stopAwareness methods.
"""
# Get the services ALBasicAwareness and ALMotion.
ba_service = session.service("ALBasicAwareness")
motion_service = session.service("ALMotion")
motion_service.wakeUp()
print "Starting BasicAwareness"
ba_service.startAwareness()
print "Take some time to play with the robot"
time.sleep(30)
print "Stopping BasicAwareness"
ba_service.stopAwareness()
motion_service.rest()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--ip", type=str, default="127.0.0.1",
help="Robot IP address. On robot or Local Naoqi: use '127.0.0.1'.")
parser.add_argument("--port", type=int, default=9559,
help="Naoqi port number")
args = parser.parse_args()
session = qi.Session()
try:
session.connect("tcp://" + args.ip + ":" + str(args.port))
except RuntimeError:
print ("Can't connect to Naoqi at ip \"" + args.ip + "\" on port " + str(args.port) +".\n"
"Please check your script arguments. Run with -h option for help.")
sys.exit(1)
main(session)
Detect someone, track and get information about them¶
In some cases we want to trigger an action when a person is found, and then keep focused on this person. The FullyEngaged mode suits this need, as it can detect someone and keep engaged with them. We know that the robot has found (or lost) someone by catching the events ALBasicAwareness/HumanTracked and ALBasicAwareness/PeopleLeft (see the event handling tutorial). It is then possible to trigger an action: in our example we start or stop speech recognition, then we get some information related to the tracked person from PeoplePerception and print them.
albasicawareness_human_found.py
#! /usr/bin/env python
# -*- encoding: UTF-8 -*-
"""Example: A Simple class to Find Human with BasicAwareness"""
import qi
import argparse
import sys
import time
class HumanTrackedEventWatcher(object):
""" A class to react to HumanTracked and PeopleLeft events """
def __init__(self, app):
"""
Initialisation of qi framework and event detection.
"""
super(HumanTrackedEventWatcher, self).__init__()
app.start()
session = app.session
self.subscribers_list = []
self.is_speech_reco_started = False
# Get the services ALMemory, ALSpeechRecognition, ALBasicAwareness and ALMotion.
self.memory = session.service("ALMemory")
self.speech_reco = session.service("ALSpeechRecognition")
self.basic_awareness = session.service("ALBasicAwareness")
self.motion = session.service("ALMotion")
self.connect_callback("ALBasicAwareness/HumanTracked",
self.on_human_tracked)
self.connect_callback("ALBasicAwareness/HumanLost",
self.on_people_left)
def connect_callback(self, event_name, callback_func):
""" connect a callback for a given event """
subscriber = self.memory.subscriber(event_name)
subscriber.signal.connect(callback_func)
self.subscribers_list.append(subscriber)
def on_human_tracked(self, value):
""" callback for event HumanTracked """
print "got HumanTracked: detected person with ID:", str(value)
if value >= 0: # found a new person
self.start_speech_reco()
position_human = self.get_people_perception_data(value)
[x, y, z] = position_human
print "The tracked person with ID", value, "is at the position:", \
"x=", x, "/ y=", y, "/ z=", z
def on_people_left(self, value):
""" callback for event PeopleLeft """
print "got PeopleLeft: lost person", str(value)
self.stop_speech_reco()
def start_speech_reco(self):
""" start ASR when someone's detected in event handler class """
if not self.is_speech_reco_started:
try:
self.speech_reco.setVocabulary(["yes", "no"], False)
except RuntimeError:
print "ASR already started"
self.speech_reco.subscribe("BasicAwareness_Test")
self.is_speech_reco_started = True
print "started ASR"
def stop_speech_reco(self):
""" stop ASR when someone's detected in event handler class """
if self.is_speech_reco_started:
self.speech_reco.unsubscribe("BasicAwareness_Test")
self.is_speech_reco_started = False
print "stopped ASR"
def get_people_perception_data(self, id_person_tracked):
memory_key = "PeoplePerception/Person/" + str(id_person_tracked) + \
"/PositionInWorldFrame"
return self.memory.getData(memory_key)
def run(self):
#start
self.motion.wakeUp()
self.basic_awareness.setEngagementMode("FullyEngaged")
self.basic_awareness.startAwareness()
#loop on, wait for events until manual interruption
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print "Interrupted by user, shutting down"
#stop
self.basic_awareness.stopAwareness()
self.stop_speech_reco()
self.motion.rest()
sys.exit(0)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--ip", type=str, default="127.0.0.1",
help="Robot IP address. On robot or Local Naoqi: use '127.0.0.1'.")
parser.add_argument("--port", type=int, default=9559,
help="Naoqi port number")
args = parser.parse_args()
try:
# Initialize qi framework.
connection_url = "tcp://" + args.ip + ":" + str(args.port)
app = qi.Application(["HumanTrackedEventWatcher", "--qi-url=" + connection_url])
except RuntimeError:
print ("Can't connect to Naoqi at ip \"" + args.ip + "\" on port " + str(args.port) +".\n"
"Please check your script arguments. Run with -h option for help.")
sys.exit(1)
human_tracked_event_watcher = HumanTrackedEventWatcher(app)
human_tracked_event_watcher.run()