Created
March 14, 2021 11:43
-
-
Save MattMoony/a1d1084e55ede9c61dd0d2bff23a8f82 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Title: IBM-Arnold | |
| Date: March 2021 | |
| Author: mattmoony (https://github.com/MattMoony) | |
| Quick implementation of a semi-smart assistant called 'Arnold' | |
| (even though the name can be changed) utilizing IBM Watson services. | |
| I'm well aware that the code isn't very clean - this is mostly | |
| due to the fact that I wrote this Poc in a rather short amount of time :P | |
| If you want to try it yourself, simply create an IBM Watson account, | |
| make a `.env.json` file of the following format and paste your apikeys | |
| and endpoint URLs in there: | |
| `.env.json`: | |
| { | |
| "visual-recognition": { | |
| "key": "[APIKEY]", | |
| "url": "[ENDPOINT-URL]" | |
| }, | |
| "text-to-speech": { | |
| "key": "[APIKEY]", | |
| "url": "[ENDPOINT-URL]" | |
| }, | |
| "speech-to-text": { | |
| "key": "[APIKEY]", | |
| "url": "[ENDPOINT-URL]" | |
| } | |
| } | |
| """ | |
| import os | |
| import io | |
| import cv2 | |
| import time | |
| import json | |
| import random | |
| import shutil | |
| import pyaudio | |
| import traceback | |
| import requests as req | |
| from rich import print | |
| from rich.table import Table | |
| from threading import Thread | |
| from queue import Queue, Full | |
| from ibm_watson import SpeechToTextV1, TextToSpeechV1 | |
| from ibm_watson.websocket import RecognizeCallback, AudioSource, SynthesizeCallback | |
| from ibm_cloud_sdk_core.authenticators import IAMAuthenticator | |
| from ibm_watson import VisualRecognitionV3 | |
| from typing import * | |
| # CONSTANTS ====================================================== # | |
| CHUNK: int = 1024 | |
| BUF_MAX_SIZE: int = 4 * CHUNK | |
| FORMAT: int = pyaudio.paInt16 | |
| CHANNELS: int = 1 | |
| RATE: int = 44100 | |
| CAMERA_DEVICE: int = 1 | |
| ARNOLD_QUOTES: List[str] = [ 'I\'ll be back!', 'Hasta la vista, baby!', 'Get to the chopper!', 'I\'m Hercules, son of Zeus!', | |
| 'Knock-knock', 'I lied.', 'Come with me if you want to live!', ] | |
| ENV: Dict[str, Any] = json.load(open(os.path.join(os.path.dirname(__file__), '.env.json'), 'r')) | |
| # HELPERS ======================================================== # | |
| def random_joke() -> Dict[str, Any]: | |
| return json.loads(req.get('https://official-joke-api.appspot.com/random_joke').text) | |
| def random_fun_fact() -> Dict[str, Any]: | |
| return json.loads(req.get('https://uselessfacts.jsph.pl/random.json?language=en').text) | |
| # COMMANDS ======================================================= # | |
| def introduction(cmd: str, transcriber: "AudioTranscriber", *args, **kwargs) -> Callable: | |
| print(f'[green][!][/green] Hello, I\'m {transcriber.wakeword}!') | |
| transcriber.say(f'Hello, I\'m {transcriber.wakeword}!') | |
| return introduction | |
| def who_am_i(cmd: str, transcriber: "AudioTranscriber", *args, **kwargs) -> Callable: | |
| authenticator = IAMAuthenticator(ENV['visual-recognition']['key']) | |
| service = VisualRecognitionV3( | |
| version='2020-02-26', | |
| authenticator=authenticator | |
| ) | |
| service.set_service_url(ENV['visual-recognition']['url']) | |
| cap = cv2.VideoCapture(CAMERA_DEVICE) | |
| ret, frame = cap.read() | |
| cv2.imwrite(os.path.join(os.path.dirname(__file__), 'tmp.jpg'), frame) | |
| with open('tmp.jpg', 'rb') as f: | |
| classes: Dict[str, Any] = service.classify(images_file=f, threshold='0.6').get_result() | |
| labels: List[Dict[str, Any]] = classes['images'][0]['classifiers'][0]['classes'] | |
| if not labels: | |
| print('[red][-][/red] Nothing detected ... ') | |
| transcriber.say('I didn\'t see anything!') | |
| return | |
| table: Table = Table(show_header=True, header_style='bold magenta') | |
| table.add_column('Class') | |
| table.add_column('Score') | |
| for lb in labels: | |
| table.add_row(lb['class'], str(lb['score'])) | |
| print(table) | |
| transcriber.say('Here you go! Now, get to the chopper!') | |
| return who_am_i | |
| def please_stop(cmd: str, transcriber: "AudioTranscriber", *args, **kwargs) -> Callable: | |
| print('[yellow][!][/yellow] Sorry, at the moment you have to press `Ctrl + C` to stop ... ') | |
| transcriber.say(random.choice(ARNOLD_QUOTES)) | |
| return please_stop | |
| def tell_a_joke(cmd: str, transcriber: "AudioTranscriber", *args, **kwargs) -> Callable: | |
| jk: Dict[str, Any] = random_joke() | |
| print(f'[green][!][/green] {jk["setup"]}') | |
| transcriber.say(jk['setup']) | |
| time.sleep(3) | |
| print(f'[green][!][/green] {jk["punchline"]}') | |
| transcriber.say(jk['punchline']) | |
| return tell_a_joke | |
| def again(cmd: str, transcriber: "AudioTranscriber", *args, last_cmd: Callable[[str, "AudioTranscriber"], None] = None, **kwargs) -> Callable: | |
| if not last_cmd: | |
| return | |
| last_cmd(cmd, transcriber) | |
| return last_cmd | |
| def fun_fact(cmd: str, transcriber: "AudioTranscriber", *args, **kwargs) -> Callable: | |
| ff: Dict[str, Any] = random_fun_fact() | |
| print(f'[green][!][/green] {ff["text"]} (source: {ff["source"]})') | |
| transcriber.say(f'From {ff["source"]}: {ff["text"]}') | |
| return fun_fact | |
| def change_name(cmd: str, transcriber: "AudioTranscriber", *args, trigger: str = None, **kwargs) -> Callable: | |
| if not trigger: | |
| return | |
| transcriber.wakeword = cmd.replace(trigger.replace('%wakeword%', transcriber.wakeword), '').strip() | |
| return introduction(cmd, transcriber, *args, trigger, **kwargs) | |
| # CLASSES ======================================================== # | |
| class AudioPlayer(object): | |
| def __init__(self): | |
| self.format: int = pyaudio.paInt16 | |
| self.channels: int = 1 | |
| self.rate: int = 22050 | |
| self.chunk: int = 1024 | |
| self.pyaudio: pyaudio.PyAudio = None | |
| self.stream: pyaudio.Stream = None | |
| def start(self) -> None: | |
| self.pyaudio = pyaudio.PyAudio() | |
| self.stream = self.__new_stream() | |
| self.stream.start_stream() | |
| def __new_stream(self) -> pyaudio.Stream: | |
| return self.pyaudio.open( | |
| format=self.format, | |
| channels=self.channels, | |
| rate=self.rate, | |
| output=True, | |
| frames_per_buffer=self.chunk, | |
| start=False | |
| ) | |
| def write(self, frames: bytes) -> None: | |
| self.stream.write(frames) | |
| def finish(self) -> None: | |
| self.stream.stop_stream() | |
| self.stream.close() | |
| self.pyaudio.terminate() | |
| class AudioSynthesizer(SynthesizeCallback): | |
| def __init__(self): | |
| super().__init__() | |
| self.player: AudioPlayer = AudioPlayer() | |
| def on_connected(self) -> None: | |
| self.player.start() | |
| def on_error(self, error: Any) -> None: | |
| print(f'[red][-][/red] Error received: {error}') | |
| def on_audio_stream(self, frames: bytes) -> None: | |
| self.player.write(frames) | |
| def on_close(self) -> None: | |
| self.player.finish() | |
| class AudioTranscriber(RecognizeCallback): | |
| cmds: Dict[str, Any] = { | |
| 'who are you': introduction, | |
| 'what\'s this': introduction, | |
| 'what is this': introduction, | |
| '%wakeword% who am i': who_am_i, | |
| '%wakeword% please stop': please_stop, | |
| '%wakeword% tell me a joke': tell_a_joke, | |
| '%wakeword% say something funny': tell_a_joke, | |
| '%wakeword% again': again, | |
| 'again': again, | |
| '%wakeword% tell me a fun fact': fun_fact, | |
| '%wakeword% give me a fun fact': fun_fact, | |
| '%wakeword% i\'m bored': fun_fact, | |
| '%wakeword% i am bored': fun_fact, | |
| '%wakeword% you are now': change_name, | |
| '%wakeword% you\'re now': change_name, | |
| '%wakeword% i will call you': change_name, | |
| '%wakeword% i\'ll call you': change_name, | |
| '%wakeword% i shall call you': change_name, | |
| } | |
| def __init__(self): | |
| super().__init__() | |
| self.wakeword: str = 'arnold' | |
| self.tts: TextToSpeechV1 = TextToSpeechV1(authenticator=IAMAuthenticator(ENV['text-to-speech']['key'])) | |
| self.tts.set_service_url(ENV['text-to-speech']['url']) | |
| self.synthesizer: AudioSynthesizer = AudioSynthesizer() | |
| self.last_cmd: Callable[[str, "AudioTranscriber"], Callable] = None | |
| def on_transcription(self, transcript: List[Dict[str, Any]]) -> None: | |
| print() | |
| res: Dict[str, Any] = transcript[0] | |
| if res['confidence'] < .2: | |
| print('[red][-][/red] I didn\'t quite catch that, please repeat what you said!') | |
| self.say('I didn\'t quite catch that, please repeat what you said!') | |
| return | |
| cmd: str = res['transcript'].replace('%HESITATION', '').replace(' ', ' ').lower().strip() | |
| print(f'[green][+][/green] You said: [bold magenta]{cmd}[/bold magenta]') | |
| matches: List[str] = list(filter(lambda s: cmd.startswith(s.replace('%wakeword%', self.wakeword)), AudioTranscriber.cmds.keys())) | |
| if not matches or len(matches) > 1: | |
| print(f'[red][-][/red] Be more specific! :wink:') | |
| self.say('Be more specific!') | |
| return | |
| try: | |
| self.last_cmd = AudioTranscriber.cmds[matches[0]](cmd, self, last_cmd=self.last_cmd, trigger=matches[0]) | |
| except Exception as e: | |
| with open('errors.log', 'a') as f: | |
| traceback.print_exc(file=f) | |
| print('[red][-][/red] Oops, something went wrong ... ') | |
| self.say('Oops, something went wrong.') | |
| def say(self, text: str) -> None: | |
| self.tts.synthesize_using_websocket(f'<speak>{text}</speak>', self.synthesizer, accept='audio/wav', voice='en-US_KevinV3Voice') | |
| def on_error(self, error: Any) -> None: | |
| print(f'[red][-][/red] Error received: {error}') | |
| def on_listening(self) -> None: | |
| print('[green][+][/green] Now listening for your lovely voice!') | |
| def on_hypothesis(self, hypothesis: str) -> None: | |
| print(f'[yellow][*][/yellow] You\'re saying: [bold]{hypothesis}[/bold]'.ljust(shutil.get_terminal_size().columns-1), end='\r') | |
| def on_data(self, data: Dict[str, Any]) -> None: | |
| pass | |
| def on_close(self) -> None: | |
| print('[yellow][!][/yellow] Stopped listening ... Shutting down now ... ') | |
| # MAIN =========================================================== # | |
| def main() -> None: | |
| q: Queue = Queue(maxsize=BUF_MAX_SIZE//CHUNK) | |
| src: AudioSource = AudioSource(q, True, True) | |
| auth: IAMAuthenticator = IAMAuthenticator(ENV['speech-to-text']['key']) | |
| stt: SpeechToTextV1 = SpeechToTextV1(authenticator=auth) | |
| stt.set_service_url(ENV['speech-to-text']['url']) | |
| audio: pyaudio.PyAudio = pyaudio.PyAudio() | |
| def audio_cb(in_data: bytes, frame_count: int, time_info: Dict[str, float], status_flags: int) -> Tuple[Optional[bytes], int]: | |
| try: | |
| q.put(in_data) | |
| except: | |
| pass | |
| return (None, pyaudio.paContinue) | |
| def start_stt(*args): | |
| cb: AudioTranscriber = AudioTranscriber() | |
| stt.recognize_using_websocket(audio=src, | |
| content_type=f'audio/l16; rate={RATE}', | |
| recognize_callback=cb, | |
| interim_results=True, | |
| ) | |
| stream: pyaudio.Stream = audio.open(format=FORMAT, | |
| channels=CHANNELS, | |
| rate=RATE, | |
| input=True, | |
| frames_per_buffer=CHUNK, | |
| stream_callback=audio_cb, | |
| start=False) | |
| stream.start_stream() | |
| try: | |
| Thread(target=start_stt).start() | |
| while True: | |
| pass | |
| except KeyboardInterrupt: | |
| pass | |
| finally: | |
| stream.stop_stream() | |
| stream.close() | |
| audio.terminate() | |
| src.completed_recording() | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment