Created
March 14, 2021 11:43
-
-
Save MattMoony/a1d1084e55ede9c61dd0d2bff23a8f82 to your computer and use it in GitHub Desktop.
Revisions
-
MattMoony created this gist
Mar 14, 2021 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,309 @@ #!/usr/bin/env python3 """ Title: IBM-Arnold Date: March 2021 Author: mattmoony (https://github.com/MattMoony) Quick implementation of a semi-smart assistant called 'Arnold' (even though the name can be changed) utilizing IBM Watson services. I'm well aware that the code isn't very clean - this is mostly due to the fact that I wrote this Poc in a rather short amount of time :P If you want to try it yourself, simply create an IBM Watson account, make a `.env.json` file of the following format and paste your apikeys and endpoint URLs in there: `.env.json`: { "visual-recognition": { "key": "[APIKEY]", "url": "[ENDPOINT-URL]" }, "text-to-speech": { "key": "[APIKEY]", "url": "[ENDPOINT-URL]" }, "speech-to-text": { "key": "[APIKEY]", "url": "[ENDPOINT-URL]" } } """ import os import io import cv2 import time import json import random import shutil import pyaudio import traceback import requests as req from rich import print from rich.table import Table from threading import Thread from queue import Queue, Full from ibm_watson import SpeechToTextV1, TextToSpeechV1 from ibm_watson.websocket import RecognizeCallback, AudioSource, SynthesizeCallback from ibm_cloud_sdk_core.authenticators import IAMAuthenticator from ibm_watson import VisualRecognitionV3 from typing import * # CONSTANTS ====================================================== # CHUNK: int = 1024 BUF_MAX_SIZE: int = 4 * CHUNK FORMAT: int = pyaudio.paInt16 CHANNELS: int = 1 RATE: int = 44100 CAMERA_DEVICE: int = 1 ARNOLD_QUOTES: List[str] = [ 'I\'ll be back!', 'Hasta la vista, baby!', 'Get to the chopper!', 'I\'m Hercules, son of Zeus!', 'Knock-knock', 'I lied.', 'Come with me if you want to live!', ] ENV: Dict[str, Any] = json.load(open(os.path.join(os.path.dirname(__file__), '.env.json'), 'r')) # HELPERS ======================================================== # def random_joke() -> Dict[str, Any]: return json.loads(req.get('https://official-joke-api.appspot.com/random_joke').text) def random_fun_fact() -> Dict[str, Any]: return json.loads(req.get('https://uselessfacts.jsph.pl/random.json?language=en').text) # COMMANDS ======================================================= # def introduction(cmd: str, transcriber: "AudioTranscriber", *args, **kwargs) -> Callable: print(f'[green][!][/green] Hello, I\'m {transcriber.wakeword}!') transcriber.say(f'Hello, I\'m {transcriber.wakeword}!') return introduction def who_am_i(cmd: str, transcriber: "AudioTranscriber", *args, **kwargs) -> Callable: authenticator = IAMAuthenticator(ENV['visual-recognition']['key']) service = VisualRecognitionV3( version='2020-02-26', authenticator=authenticator ) service.set_service_url(ENV['visual-recognition']['url']) cap = cv2.VideoCapture(CAMERA_DEVICE) ret, frame = cap.read() cv2.imwrite(os.path.join(os.path.dirname(__file__), 'tmp.jpg'), frame) with open('tmp.jpg', 'rb') as f: classes: Dict[str, Any] = service.classify(images_file=f, threshold='0.6').get_result() labels: List[Dict[str, Any]] = classes['images'][0]['classifiers'][0]['classes'] if not labels: print('[red][-][/red] Nothing detected ... ') transcriber.say('I didn\'t see anything!') return table: Table = Table(show_header=True, header_style='bold magenta') table.add_column('Class') table.add_column('Score') for lb in labels: table.add_row(lb['class'], str(lb['score'])) print(table) transcriber.say('Here you go! Now, get to the chopper!') return who_am_i def please_stop(cmd: str, transcriber: "AudioTranscriber", *args, **kwargs) -> Callable: print('[yellow][!][/yellow] Sorry, at the moment you have to press `Ctrl + C` to stop ... ') transcriber.say(random.choice(ARNOLD_QUOTES)) return please_stop def tell_a_joke(cmd: str, transcriber: "AudioTranscriber", *args, **kwargs) -> Callable: jk: Dict[str, Any] = random_joke() print(f'[green][!][/green] {jk["setup"]}') transcriber.say(jk['setup']) time.sleep(3) print(f'[green][!][/green] {jk["punchline"]}') transcriber.say(jk['punchline']) return tell_a_joke def again(cmd: str, transcriber: "AudioTranscriber", *args, last_cmd: Callable[[str, "AudioTranscriber"], None] = None, **kwargs) -> Callable: if not last_cmd: return last_cmd(cmd, transcriber) return last_cmd def fun_fact(cmd: str, transcriber: "AudioTranscriber", *args, **kwargs) -> Callable: ff: Dict[str, Any] = random_fun_fact() print(f'[green][!][/green] {ff["text"]} (source: {ff["source"]})') transcriber.say(f'From {ff["source"]}: {ff["text"]}') return fun_fact def change_name(cmd: str, transcriber: "AudioTranscriber", *args, trigger: str = None, **kwargs) -> Callable: if not trigger: return transcriber.wakeword = cmd.replace(trigger.replace('%wakeword%', transcriber.wakeword), '').strip() return introduction(cmd, transcriber, *args, trigger, **kwargs) # CLASSES ======================================================== # class AudioPlayer(object): def __init__(self): self.format: int = pyaudio.paInt16 self.channels: int = 1 self.rate: int = 22050 self.chunk: int = 1024 self.pyaudio: pyaudio.PyAudio = None self.stream: pyaudio.Stream = None def start(self) -> None: self.pyaudio = pyaudio.PyAudio() self.stream = self.__new_stream() self.stream.start_stream() def __new_stream(self) -> pyaudio.Stream: return self.pyaudio.open( format=self.format, channels=self.channels, rate=self.rate, output=True, frames_per_buffer=self.chunk, start=False ) def write(self, frames: bytes) -> None: self.stream.write(frames) def finish(self) -> None: self.stream.stop_stream() self.stream.close() self.pyaudio.terminate() class AudioSynthesizer(SynthesizeCallback): def __init__(self): super().__init__() self.player: AudioPlayer = AudioPlayer() def on_connected(self) -> None: self.player.start() def on_error(self, error: Any) -> None: print(f'[red][-][/red] Error received: {error}') def on_audio_stream(self, frames: bytes) -> None: self.player.write(frames) def on_close(self) -> None: self.player.finish() class AudioTranscriber(RecognizeCallback): cmds: Dict[str, Any] = { 'who are you': introduction, 'what\'s this': introduction, 'what is this': introduction, '%wakeword% who am i': who_am_i, '%wakeword% please stop': please_stop, '%wakeword% tell me a joke': tell_a_joke, '%wakeword% say something funny': tell_a_joke, '%wakeword% again': again, 'again': again, '%wakeword% tell me a fun fact': fun_fact, '%wakeword% give me a fun fact': fun_fact, '%wakeword% i\'m bored': fun_fact, '%wakeword% i am bored': fun_fact, '%wakeword% you are now': change_name, '%wakeword% you\'re now': change_name, '%wakeword% i will call you': change_name, '%wakeword% i\'ll call you': change_name, '%wakeword% i shall call you': change_name, } def __init__(self): super().__init__() self.wakeword: str = 'arnold' self.tts: TextToSpeechV1 = TextToSpeechV1(authenticator=IAMAuthenticator(ENV['text-to-speech']['key'])) self.tts.set_service_url(ENV['text-to-speech']['url']) self.synthesizer: AudioSynthesizer = AudioSynthesizer() self.last_cmd: Callable[[str, "AudioTranscriber"], Callable] = None def on_transcription(self, transcript: List[Dict[str, Any]]) -> None: print() res: Dict[str, Any] = transcript[0] if res['confidence'] < .2: print('[red][-][/red] I didn\'t quite catch that, please repeat what you said!') self.say('I didn\'t quite catch that, please repeat what you said!') return cmd: str = res['transcript'].replace('%HESITATION', '').replace(' ', ' ').lower().strip() print(f'[green][+][/green] You said: [bold magenta]{cmd}[/bold magenta]') matches: List[str] = list(filter(lambda s: cmd.startswith(s.replace('%wakeword%', self.wakeword)), AudioTranscriber.cmds.keys())) if not matches or len(matches) > 1: print(f'[red][-][/red] Be more specific! :wink:') self.say('Be more specific!') return try: self.last_cmd = AudioTranscriber.cmds[matches[0]](cmd, self, last_cmd=self.last_cmd, trigger=matches[0]) except Exception as e: with open('errors.log', 'a') as f: traceback.print_exc(file=f) print('[red][-][/red] Oops, something went wrong ... ') self.say('Oops, something went wrong.') def say(self, text: str) -> None: self.tts.synthesize_using_websocket(f'<speak>{text}</speak>', self.synthesizer, accept='audio/wav', voice='en-US_KevinV3Voice') def on_error(self, error: Any) -> None: print(f'[red][-][/red] Error received: {error}') def on_listening(self) -> None: print('[green][+][/green] Now listening for your lovely voice!') def on_hypothesis(self, hypothesis: str) -> None: print(f'[yellow][*][/yellow] You\'re saying: [bold]{hypothesis}[/bold]'.ljust(shutil.get_terminal_size().columns-1), end='\r') def on_data(self, data: Dict[str, Any]) -> None: pass def on_close(self) -> None: print('[yellow][!][/yellow] Stopped listening ... Shutting down now ... ') # MAIN =========================================================== # def main() -> None: q: Queue = Queue(maxsize=BUF_MAX_SIZE//CHUNK) src: AudioSource = AudioSource(q, True, True) auth: IAMAuthenticator = IAMAuthenticator(ENV['speech-to-text']['key']) stt: SpeechToTextV1 = SpeechToTextV1(authenticator=auth) stt.set_service_url(ENV['speech-to-text']['url']) audio: pyaudio.PyAudio = pyaudio.PyAudio() def audio_cb(in_data: bytes, frame_count: int, time_info: Dict[str, float], status_flags: int) -> Tuple[Optional[bytes], int]: try: q.put(in_data) except: pass return (None, pyaudio.paContinue) def start_stt(*args): cb: AudioTranscriber = AudioTranscriber() stt.recognize_using_websocket(audio=src, content_type=f'audio/l16; rate={RATE}', recognize_callback=cb, interim_results=True, ) stream: pyaudio.Stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK, stream_callback=audio_cb, start=False) stream.start_stream() try: Thread(target=start_stt).start() while True: pass except KeyboardInterrupt: pass finally: stream.stop_stream() stream.close() audio.terminate() src.completed_recording() if __name__ == '__main__': main()