Last active
September 18, 2022 06:56
-
-
Save xssfox/3fc32fca37e91478b3a25610c84511aa to your computer and use it in GitHub Desktop.
Revisions
-
xssfox revised this gist
Sep 18, 2022 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,5 +1,5 @@ """ Requires an RTSP stream (I used rtsp-simple-server) and rigctld running requirements pyaudio, pillow, numpy, pysstv, opencv-python -
xssfox created this gist
Sep 18, 2022 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,125 @@ """ Requires an RTSP stream (I used rtsp-simple-server) requirements pyaudio, pillow, numpy, pysstv, opencv-python Update output_device_index to be your audio sink (I use pulse) """ import cv2 from PIL import Image, ImageOps, ImageDraw, ImageFont import numpy as np from pysstv import color, grayscale import pyaudio import io import time import socket import logging import sys, traceback mode = color.Robot36 class Rigctld: """ rotctld (hamlib) communication class """ # Note: This is a massive hack. def __init__(self, hostname="localhost", port=4532, poll_rate=5, timeout=5): """ Open a connection to rotctld, and test it for validity """ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(timeout) self.hostname = hostname self.port = port self.connect() logging.debug(f"Rigctl intialized") def get_model(self): """ Get the rotator model from rotctld """ model = self.send_command(b"_") return model def connect(self): """ Connect to rotctld instance """ self.sock.connect((self.hostname, self.port)) model = self.get_model() if model == None: # Timeout! self.close() raise Exception("Timeout!") else: return model def close(self): self.sock.close() def send_command(self, command): """Send a command to the connected rotctld instance, and return the return value. """ self.sock.sendall(command + b"\n") try: return self.sock.recv(1024) except: return None def ptt_enable(self): logging.debug(f"PTT enabled") self.send_command(b"T 1") def ptt_disable(self): logging.debug(f"PTT disabled") self.send_command(b"T 0") def get_image(): cap = cv2.VideoCapture('rtsp://streamip:8554/mystream') ret, frame = cap.read() img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) return Image.fromarray(img) def generate_sstv(image): wav_bytes = io.BytesIO() mode(image,48000,16).write_wav(wav_bytes) wav_bytes.seek(0) audio = wav_bytes return audio rig = Rigctld() p = pyaudio.PyAudio() stream = p.open(format = pyaudio.paInt16, channels = 1, rate = 48000, output = True, output_device_index=13 ) try: while(1): image = get_image() # resize image = image.resize((image.width, image.height)) image = ImageOps.contain(image, (mode.WIDTH, mode.HEIGHT) ) image = ImageOps.pad(image, (mode.WIDTH, mode.HEIGHT)) audio = generate_sstv(image) rig.ptt_enable() while(audio): data = audio.read(24) if data: stream.write(data) else: break #stream.write(audio) # play_obj = sa.play_buffer(audio, 1, 2, 48000) # play_obj.wait_done() rig.ptt_disable() time.sleep(25) except: traceback.print_exc(file=sys.stderr) rig.ptt_disable()