Skip to content

Instantly share code, notes, and snippets.

@MattMoony
Created March 14, 2021 11:43
Show Gist options
  • Select an option

  • Save MattMoony/a1d1084e55ede9c61dd0d2bff23a8f82 to your computer and use it in GitHub Desktop.

Select an option

Save MattMoony/a1d1084e55ede9c61dd0d2bff23a8f82 to your computer and use it in GitHub Desktop.

Revisions

  1. MattMoony created this gist Mar 14, 2021.
    309 changes: 309 additions & 0 deletions ibm-arnold.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,309 @@
    #!/usr/bin/env python3

    """
    Title: IBM-Arnold
    Date: March 2021
    Author: mattmoony (https://github.com/MattMoony)
    Quick implementation of a semi-smart assistant called 'Arnold'
    (even though the name can be changed) utilizing IBM Watson services.
    I'm well aware that the code isn't very clean - this is mostly
    due to the fact that I wrote this Poc in a rather short amount of time :P
    If you want to try it yourself, simply create an IBM Watson account,
    make a `.env.json` file of the following format and paste your apikeys
    and endpoint URLs in there:
    `.env.json`:
    {
    "visual-recognition": {
    "key": "[APIKEY]",
    "url": "[ENDPOINT-URL]"
    },
    "text-to-speech": {
    "key": "[APIKEY]",
    "url": "[ENDPOINT-URL]"
    },
    "speech-to-text": {
    "key": "[APIKEY]",
    "url": "[ENDPOINT-URL]"
    }
    }
    """

    import os
    import io
    import cv2
    import time
    import json
    import random
    import shutil
    import pyaudio
    import traceback
    import requests as req
    from rich import print
    from rich.table import Table
    from threading import Thread
    from queue import Queue, Full
    from ibm_watson import SpeechToTextV1, TextToSpeechV1
    from ibm_watson.websocket import RecognizeCallback, AudioSource, SynthesizeCallback
    from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
    from ibm_watson import VisualRecognitionV3
    from typing import *

    # CONSTANTS ====================================================== #

    CHUNK: int = 1024
    BUF_MAX_SIZE: int = 4 * CHUNK
    FORMAT: int = pyaudio.paInt16
    CHANNELS: int = 1
    RATE: int = 44100
    CAMERA_DEVICE: int = 1
    ARNOLD_QUOTES: List[str] = [ 'I\'ll be back!', 'Hasta la vista, baby!', 'Get to the chopper!', 'I\'m Hercules, son of Zeus!',
    'Knock-knock', 'I lied.', 'Come with me if you want to live!', ]
    ENV: Dict[str, Any] = json.load(open(os.path.join(os.path.dirname(__file__), '.env.json'), 'r'))

    # HELPERS ======================================================== #

    def random_joke() -> Dict[str, Any]:
    return json.loads(req.get('https://official-joke-api.appspot.com/random_joke').text)

    def random_fun_fact() -> Dict[str, Any]:
    return json.loads(req.get('https://uselessfacts.jsph.pl/random.json?language=en').text)

    # COMMANDS ======================================================= #

    def introduction(cmd: str, transcriber: "AudioTranscriber", *args, **kwargs) -> Callable:
    print(f'[green][!][/green] Hello, I\'m {transcriber.wakeword}!')
    transcriber.say(f'Hello, I\'m {transcriber.wakeword}!')
    return introduction

    def who_am_i(cmd: str, transcriber: "AudioTranscriber", *args, **kwargs) -> Callable:
    authenticator = IAMAuthenticator(ENV['visual-recognition']['key'])
    service = VisualRecognitionV3(
    version='2020-02-26',
    authenticator=authenticator
    )
    service.set_service_url(ENV['visual-recognition']['url'])
    cap = cv2.VideoCapture(CAMERA_DEVICE)
    ret, frame = cap.read()
    cv2.imwrite(os.path.join(os.path.dirname(__file__), 'tmp.jpg'), frame)
    with open('tmp.jpg', 'rb') as f:
    classes: Dict[str, Any] = service.classify(images_file=f, threshold='0.6').get_result()
    labels: List[Dict[str, Any]] = classes['images'][0]['classifiers'][0]['classes']
    if not labels:
    print('[red][-][/red] Nothing detected ... ')
    transcriber.say('I didn\'t see anything!')
    return
    table: Table = Table(show_header=True, header_style='bold magenta')
    table.add_column('Class')
    table.add_column('Score')
    for lb in labels:
    table.add_row(lb['class'], str(lb['score']))
    print(table)
    transcriber.say('Here you go! Now, get to the chopper!')
    return who_am_i

    def please_stop(cmd: str, transcriber: "AudioTranscriber", *args, **kwargs) -> Callable:
    print('[yellow][!][/yellow] Sorry, at the moment you have to press `Ctrl + C` to stop ... ')
    transcriber.say(random.choice(ARNOLD_QUOTES))
    return please_stop

    def tell_a_joke(cmd: str, transcriber: "AudioTranscriber", *args, **kwargs) -> Callable:
    jk: Dict[str, Any] = random_joke()
    print(f'[green][!][/green] {jk["setup"]}')
    transcriber.say(jk['setup'])
    time.sleep(3)
    print(f'[green][!][/green] {jk["punchline"]}')
    transcriber.say(jk['punchline'])
    return tell_a_joke

    def again(cmd: str, transcriber: "AudioTranscriber", *args, last_cmd: Callable[[str, "AudioTranscriber"], None] = None, **kwargs) -> Callable:
    if not last_cmd:
    return
    last_cmd(cmd, transcriber)
    return last_cmd

    def fun_fact(cmd: str, transcriber: "AudioTranscriber", *args, **kwargs) -> Callable:
    ff: Dict[str, Any] = random_fun_fact()
    print(f'[green][!][/green] {ff["text"]} (source: {ff["source"]})')
    transcriber.say(f'From {ff["source"]}: {ff["text"]}')
    return fun_fact

    def change_name(cmd: str, transcriber: "AudioTranscriber", *args, trigger: str = None, **kwargs) -> Callable:
    if not trigger:
    return
    transcriber.wakeword = cmd.replace(trigger.replace('%wakeword%', transcriber.wakeword), '').strip()
    return introduction(cmd, transcriber, *args, trigger, **kwargs)

    # CLASSES ======================================================== #

    class AudioPlayer(object):
    def __init__(self):
    self.format: int = pyaudio.paInt16
    self.channels: int = 1
    self.rate: int = 22050
    self.chunk: int = 1024
    self.pyaudio: pyaudio.PyAudio = None
    self.stream: pyaudio.Stream = None

    def start(self) -> None:
    self.pyaudio = pyaudio.PyAudio()
    self.stream = self.__new_stream()
    self.stream.start_stream()

    def __new_stream(self) -> pyaudio.Stream:
    return self.pyaudio.open(
    format=self.format,
    channels=self.channels,
    rate=self.rate,
    output=True,
    frames_per_buffer=self.chunk,
    start=False
    )

    def write(self, frames: bytes) -> None:
    self.stream.write(frames)

    def finish(self) -> None:
    self.stream.stop_stream()
    self.stream.close()
    self.pyaudio.terminate()

    class AudioSynthesizer(SynthesizeCallback):
    def __init__(self):
    super().__init__()
    self.player: AudioPlayer = AudioPlayer()

    def on_connected(self) -> None:
    self.player.start()

    def on_error(self, error: Any) -> None:
    print(f'[red][-][/red] Error received: {error}')

    def on_audio_stream(self, frames: bytes) -> None:
    self.player.write(frames)

    def on_close(self) -> None:
    self.player.finish()

    class AudioTranscriber(RecognizeCallback):
    cmds: Dict[str, Any] = {
    'who are you': introduction,
    'what\'s this': introduction,
    'what is this': introduction,
    '%wakeword% who am i': who_am_i,
    '%wakeword% please stop': please_stop,
    '%wakeword% tell me a joke': tell_a_joke,
    '%wakeword% say something funny': tell_a_joke,
    '%wakeword% again': again,
    'again': again,
    '%wakeword% tell me a fun fact': fun_fact,
    '%wakeword% give me a fun fact': fun_fact,
    '%wakeword% i\'m bored': fun_fact,
    '%wakeword% i am bored': fun_fact,
    '%wakeword% you are now': change_name,
    '%wakeword% you\'re now': change_name,
    '%wakeword% i will call you': change_name,
    '%wakeword% i\'ll call you': change_name,
    '%wakeword% i shall call you': change_name,
    }

    def __init__(self):
    super().__init__()
    self.wakeword: str = 'arnold'
    self.tts: TextToSpeechV1 = TextToSpeechV1(authenticator=IAMAuthenticator(ENV['text-to-speech']['key']))
    self.tts.set_service_url(ENV['text-to-speech']['url'])
    self.synthesizer: AudioSynthesizer = AudioSynthesizer()
    self.last_cmd: Callable[[str, "AudioTranscriber"], Callable] = None

    def on_transcription(self, transcript: List[Dict[str, Any]]) -> None:
    print()
    res: Dict[str, Any] = transcript[0]
    if res['confidence'] < .2:
    print('[red][-][/red] I didn\'t quite catch that, please repeat what you said!')
    self.say('I didn\'t quite catch that, please repeat what you said!')
    return
    cmd: str = res['transcript'].replace('%HESITATION', '').replace(' ', ' ').lower().strip()
    print(f'[green][+][/green] You said: [bold magenta]{cmd}[/bold magenta]')
    matches: List[str] = list(filter(lambda s: cmd.startswith(s.replace('%wakeword%', self.wakeword)), AudioTranscriber.cmds.keys()))
    if not matches or len(matches) > 1:
    print(f'[red][-][/red] Be more specific! :wink:')
    self.say('Be more specific!')
    return
    try:
    self.last_cmd = AudioTranscriber.cmds[matches[0]](cmd, self, last_cmd=self.last_cmd, trigger=matches[0])
    except Exception as e:
    with open('errors.log', 'a') as f:
    traceback.print_exc(file=f)
    print('[red][-][/red] Oops, something went wrong ... ')
    self.say('Oops, something went wrong.')

    def say(self, text: str) -> None:
    self.tts.synthesize_using_websocket(f'<speak>{text}</speak>', self.synthesizer, accept='audio/wav', voice='en-US_KevinV3Voice')

    def on_error(self, error: Any) -> None:
    print(f'[red][-][/red] Error received: {error}')

    def on_listening(self) -> None:
    print('[green][+][/green] Now listening for your lovely voice!')

    def on_hypothesis(self, hypothesis: str) -> None:
    print(f'[yellow][*][/yellow] You\'re saying: [bold]{hypothesis}[/bold]'.ljust(shutil.get_terminal_size().columns-1), end='\r')

    def on_data(self, data: Dict[str, Any]) -> None:
    pass

    def on_close(self) -> None:
    print('[yellow][!][/yellow] Stopped listening ... Shutting down now ... ')

    # MAIN =========================================================== #

    def main() -> None:
    q: Queue = Queue(maxsize=BUF_MAX_SIZE//CHUNK)
    src: AudioSource = AudioSource(q, True, True)
    auth: IAMAuthenticator = IAMAuthenticator(ENV['speech-to-text']['key'])
    stt: SpeechToTextV1 = SpeechToTextV1(authenticator=auth)
    stt.set_service_url(ENV['speech-to-text']['url'])
    audio: pyaudio.PyAudio = pyaudio.PyAudio()

    def audio_cb(in_data: bytes, frame_count: int, time_info: Dict[str, float], status_flags: int) -> Tuple[Optional[bytes], int]:
    try:
    q.put(in_data)
    except:
    pass
    return (None, pyaudio.paContinue)

    def start_stt(*args):
    cb: AudioTranscriber = AudioTranscriber()
    stt.recognize_using_websocket(audio=src,
    content_type=f'audio/l16; rate={RATE}',
    recognize_callback=cb,
    interim_results=True,
    )

    stream: pyaudio.Stream = audio.open(format=FORMAT,
    channels=CHANNELS,
    rate=RATE,
    input=True,
    frames_per_buffer=CHUNK,
    stream_callback=audio_cb,
    start=False)
    stream.start_stream()

    try:
    Thread(target=start_stt).start()
    while True:
    pass
    except KeyboardInterrupt:
    pass
    finally:
    stream.stop_stream()
    stream.close()
    audio.terminate()
    src.completed_recording()

    if __name__ == '__main__':
    main()