|
import logging |
|
import platform |
|
import sys |
|
import threading |
|
|
|
import aiy.assistant.auth_helpers |
|
from aiy.assistant.library import Assistant |
|
import aiy.voicehat |
|
from google.assistant.library.event import EventType |
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s" |
|
) |
|
|
|
class MyAssistant(object): |
|
|
|
def __init__(self): |
|
self._task = threading.Thread(target=self._run_task) |
|
self._can_start_conversation = False |
|
self._assistant = None |
|
|
|
def start(self): |
|
self._task.start() |
|
|
|
def _run_task(self): |
|
credentials = aiy.assistant.auth_helpers.get_assistant_credentials() |
|
with Assistant(credentials) as assistant: |
|
self._assistant = assistant |
|
for event in assistant.start(): |
|
self._process_event(event) |
|
|
|
def _process_event(self, event): |
|
status_ui = aiy.voicehat.get_status_ui() |
|
if event.type == EventType.ON_START_FINISHED: |
|
status_ui.status('ready') |
|
self._can_start_conversation = True |
|
# Start the voicehat button trigger. |
|
aiy.voicehat.get_button().on_press(self._on_button_pressed) |
|
if sys.stdout.isatty(): |
|
print('Say "OK, Google" or press the button, then speak. ' |
|
'Press Ctrl+C to quit...') |
|
|
|
elif event.type == EventType.ON_CONVERSATION_TURN_STARTED: |
|
self._can_start_conversation = False |
|
status_ui.status('listening') |
|
|
|
elif event.type == EventType.ON_END_OF_UTTERANCE: |
|
status_ui.status('thinking') |
|
|
|
elif (event.type == EventType.ON_CONVERSATION_TURN_FINISHED |
|
or event.type == EventType.ON_CONVERSATION_TURN_TIMEOUT |
|
or event.type == EventType.ON_NO_RESPONSE): |
|
status_ui.status('ready') |
|
self._can_start_conversation = True |
|
|
|
elif event.type == EventType.ON_ASSISTANT_ERROR and event.args and event.args['is_fatal']: |
|
sys.exit(1) |
|
|
|
def _on_button_pressed(self): |
|
if self._can_start_conversation: |
|
self._assistant.start_conversation() |
|
|
|
def main(): |
|
if platform.machine() == 'armv6l': |
|
print('Cannot run hotword demo on Pi Zero!') |
|
exit(-1) |
|
MyAssistant().start() |
|
|
|
if __name__ == '__main__': |
|
main() |