# This code is based on tutorial by slicktechies modified as needed to use oauth token from Twitch. # You can read more details at: https://www.junian.net/2017/01/how-to-record-twitch-streams.html # original code is from https://slicktechies.com/how-to-watchrecord-twitch-streams-using-livestreamer/ import requests import os import time import json import sys import subprocess import datetime import getopt class TwitchRecorder: def __init__(self): # global configuration self.client_id = "jzkbprff40iqj646a697cyrvl0zt2m6" # don't change this # get oauth token value by typing `streamlink --twitch-oauth-authenticate` in terminal self.oauth_token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" self.ffmpeg_path = 'ffmpeg' self.refresh = 30.0 self.root_path = "/Users/junian/Documents/twitch" # user configuration self.username = "juniantr" self.quality = "best" def run(self): # path to recorded stream self.recorded_path = os.path.join(self.root_path, "recorded", self.username) # path to finished video, errors removed self.processed_path = os.path.join(self.root_path, "processed", self.username) # create directory for recordedPath and processedPath if not exist if(os.path.isdir(self.recorded_path) is False): os.makedirs(self.recorded_path) if(os.path.isdir(self.processed_path) is False): os.makedirs(self.processed_path) # make sure the interval to check user availability is not less than 15 seconds if(self.refresh < 15): print("Check interval should not be lower than 15 seconds.") self.refresh = 15 print("System set check interval to 15 seconds.") # fix videos from previous recording session try: video_list = [f for f in os.listdir(self.recorded_path) if os.path.isfile(os.path.join(self.recorded_path, f))] if(len(video_list) > 0): print('Fixing previously recorded files.') for f in video_list: recorded_filename = os.path.join(self.recorded_path, f) print('Fixing ' + recorded_filename + '.') try: subprocess.call([self.ffmpeg_path, '-err_detect', 'ignore_err', '-i', recorded_filename, '-c', 'copy', os.path.join(self.processed_path,f)]) os.remove(recorded_filename) except Exception as e: print(e) except Exception as e: print(e) print("Checking for", self.username, "every", self.refresh, "seconds. Record with", self.quality, "quality.") self.loopcheck() def check_user(self): # 0: online, # 1: offline, # 2: not found, # 3: error url = 'https://api.twitch.tv/kraken/streams/' + self.username info = None status = 3 try: r = requests.get(url, headers = {"Client-ID" : self.client_id}, timeout = 15) r.raise_for_status() info = r.json() if info['stream'] == None: status = 1 else: status = 0 except requests.exceptions.RequestException as e: if e.response: if e.response.reason == 'Not Found' or e.response.reason == 'Unprocessable Entity': status = 2 return status, info def loopcheck(self): while True: status, info = self.check_user() if status == 2: print("Username not found. Invalid username or typo.") time.sleep(self.refresh) elif status == 3: print(datetime.datetime.now().strftime("%Hh%Mm%Ss")," ","unexpected error. will try again in 5 minutes.") time.sleep(300) elif status == 1: print(self.username, "currently offline, checking again in", self.refresh, "seconds.") time.sleep(self.refresh) elif status == 0: print(self.username, "online. Stream recording in session.") filename = self.username + " - " + datetime.datetime.now().strftime("%Y-%m-%d %Hh%Mm%Ss") + " - " + (info['stream']).get("channel").get("status") + ".mp4" # clean filename from unecessary characters filename = "".join(x for x in filename if x.isalnum() or x in [" ", "-", "_", "."]) recorded_filename = os.path.join(self.recorded_path, filename) # start streamlink process subprocess.call(["streamlink", "--twitch-oauth-token", self.oauth_token, "twitch.tv/" + self.username, self.quality, "-o", recorded_filename]) print("Recording stream is done. Fixing video file.") if(os.path.exists(recorded_filename) is True): try: subprocess.call([self.ffmpeg_path, '-err_detect', 'ignore_err', '-i', recorded_filename, '-c', 'copy', os.path.join(self.processed_path, filename)]) os.remove(recorded_filename) except Exception as e: print(e) else: print("Skip fixing. File not found.") print("Fixing is done. Going back to checking..") time.sleep(self.refresh) def main(argv): twitch_recorder = TwitchRecorder() usage_message = 'twitch-recorder.py -u -q ' try: opts, args = getopt.getopt(argv,"hu:q:",["username=","quality="]) except getopt.GetoptError: print (usage_message) sys.exit(2) for opt, arg in opts: if opt == '-h': print(usage_message) sys.exit() elif opt in ("-u", "--username"): twitch_recorder.username = arg elif opt in ("-q", "--quality"): twitch_recorder.quality = arg twitch_recorder.run() if __name__ == "__main__": main(sys.argv[1:])