Last active
January 14, 2023 09:40
-
-
Save RicterZ/ed3517b944a04e18c91aacfe3d9aff24 to your computer and use it in GitHub Desktop.
Revisions
-
RicterZ revised this gist
Jan 14, 2023 . 1 changed file with 32 additions and 12 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,5 +1,5 @@ # Bilibili TV Controller # # Requirements # - qrencode_ascii # - zio @@ -123,6 +123,14 @@ def bv_decode(code): return (r - add) ^ xor def restore(): pass def save(obj): pass class BilibiliTV(): io = None @@ -262,6 +270,7 @@ def dispatch(self): if command == 'Command.OnProgress': self.position = arguments[0]['position'] self.playing = True elif command == 'Command.SpeedChanged': self.speed = arguments[0]['currSpeed'] @@ -286,6 +295,7 @@ def dispatch(self): elif state == 4: logger.debug('Video playing resumed') elif state == 7: self.playing = False logger.debug('Video playing stopped') elif command == 'Command.OnQnSwitch': @@ -295,7 +305,7 @@ def dispatch(self): elif command == 'Command.OnEpisodeSwitch': self.info = arguments[0] logger.debug('Playing video {} (av{})'.format(self.info['title'], self.info['playItem']['aid'])) elif command == 'Command.Error': logger.warning('Error: {}'.format(arguments[0]['errorCode'])) @@ -323,7 +333,7 @@ def discover(self): resp = requests.get(link).text # bilibili if 'Bilibili Inc.' in resp: name = re.findall('<friendlyName>(.*)</friendlyName>', resp) name = 'Unknown' if not name else name[0] @@ -346,10 +356,6 @@ def setup(self, addr=None): return self.target = addr logger.info('Initialize connection ...') self.io = zio((self.target), print_write=False, print_read=False) self.io.write(SETUP_MSG) @@ -425,7 +431,7 @@ def play(self, aid=0, biz_id=0, room_id=0, epid=0, season_id=0, cid=0): count = 0 while True: if self.info is not None: logger.info('Playing video: {}(av{})'.format(self.info['title'], self.info['playItem']['aid'])) text = '' @@ -490,7 +496,7 @@ def set_speed(self, speed): def get_speed(self): return self.speed @check_settle(playing=False) def toggle_danmaku(self): self.danmaku = not self.danmaku logger.info('Toggle danmaku to {} ...'.format(self.danmaku)) @@ -523,7 +529,7 @@ def parse_command(self, command): self.dispatcher_thread.not_stop = False raise SystemExit elif command in ('pause', 'resume', 'result', 'danmaku', 'stop', 'discover'): if command == 'danmaku': command = 'toggle_danmaku' @@ -560,6 +566,12 @@ def parse_command(self, command): elif command == 'debug': logger.setLevel(logging.DEBUG) elif command == 'login': if len(args) > 0: self.login(args[0]) else: self.login() def interactive(self): while True: try: @@ -570,7 +582,11 @@ def interactive(self): self.parse_command(command) def login(self, access_key=None): if access_key is not None: self.access_key = access_key return def get_sign(data): s = '' @@ -608,17 +624,21 @@ def get_sign(data): if ret['code'] == 0: self.access_key = ret['data']['access_token'] logger.info('Login successfully with user {}'.format(ret['data']['mid'])) logger.info('Access key: {}'.format(self.access_key)) break time.sleep(1) if __name__ == '__main__': logger.setLevel(logging.INFO) handler = logging.StreamHandler() handler.setFormatter(ColorFormatter()) logger.addHandler(handler) b = BilibiliTV() # b.discover() b.setup(('192.168.31.198', 9958)) b.login(access_key='xxxxxxxx') b.interactive() -
RicterZ created this gist
Dec 17, 2022 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,624 @@ # Bilibili TV Controller # # Requirements # - qrencode_ascii # - zio import threading import json import time import logging import sys import requests import re import socket import struct import random import os import hashlib import qrencode_ascii from urllib.parse import urlparse from functools import wraps from enum import Enum from zio import zio, b8, b32 logger = logging.getLogger() SETUP_MSG = '''SETUP /projection NVA/1.0\r Session: deadbeef-1337-1337-1337-deadbeef1337\r Connection: Keep-Alive\r \r ''' class ColorFormatter(logging.Formatter): grey = "\x1b[38;20m" blue = "\x1b[34;20m" yellow = "\x1b[33;20m" red = "\x1b[31;20m" bold_red = "\x1b[31;1m" reset = "\x1b[0m" format = "[%(levelname)s]: %(message)s" FORMATS = { logging.DEBUG: grey + format + reset, logging.INFO: blue + format + reset, logging.WARNING: yellow + format + reset, logging.ERROR: red + format + reset, logging.CRITICAL: bold_red + format + reset } def format(self, record): log_fmt = self.FORMATS.get(record.levelno) formatter = logging.Formatter(log_fmt) return formatter.format(record) class Flag(Enum): # ping command from TV every 1 second PING = b'\xe4' # flag of commands sent from client COMMAND = b'\xe0' # flag of commands sent from TV RESPONSE = b'\xc0' def check_settle(playing=True): def _func(func): def _wrapper(func): @wraps(func) def wrapper(*args, **kwargs): if not args[0].settle: logger.info('TV not settle, please run setup first') return if playing and not args[0].playing: logger.info('Video not playing, cannot change states') return return func(*args, **kwargs) return wrapper return _wrapper(func) return _func def get_epid_and_ssid(aid): url = 'https://www.bilibili.com/video/av{}/'.format(aid) headers = requests.head(url, allow_redirects=False).headers if not 'location' in headers: return 0, 0 epid = headers['location'].split('/')[-1] if not epid.startswith('ep'): return 0, 0 resp = requests.get(headers['location']).text ssid = re.findall('https://www.bilibili.com/bangumi/play/ss(\d+)/', resp) if ssid: return epid[2:], ssid[0] return 0, 0 def bv_decode(code): table = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF' tr = {} for i in range(58): tr[table[i]] = i s = [11, 10, 3, 8, 4, 6] xor = 177451812 add = 8728348608 r = 0 for i in range(6): r += tr[code[s[i]]] * 58 ** i return (r - add) ^ xor class BilibiliTV(): io = None target = None discovered = [] danmaku = False # is video playing playing = False # video position position = 0 # video info info = None # video speed speed = 1 supported_speeds = None # video quality quality = None supported_qualities = None # video volume volume = 0 # setup settle = False dispatcher_thread = None command_response = None access_key = None def __init__(self, addr=None): self.target = addr self.command_response = {} def ping(self): msg = Flag.RESPONSE.value msg += b'\x00' msg += b32(0x1337) self.io.write(msg) def make_command(self, command, args=None): # message flag while True: serial = random.randint(0, 0xffffffff) if serial not in self.command_response: break logger.debug('Make command {} with serial {} ...'.format(command, serial)) msg = Flag.COMMAND.value # message parts count if args is None: args = () else: # for now, only 1 argument been used args = [json.dumps(args)] msg += b8(len(args) + 2) # message serial number msg += b32(serial) msg += b'\x01' # command string msg += b'\x07Command' msg += b8(len(command)) msg += command.encode() # arguments if not isinstance(args, (list, tuple)): args = (args, ) for arg in args: if not isinstance(arg, bytes): arg = str(arg).encode() msg += b32(len(arg)) msg += arg return serial, msg def dispatch(self): while getattr(self.dispatcher_thread, 'not_stop', True): flag = self.io.read(1) if flag not in Flag._value2member_map_: logger.error('Unknown message flag: {}'.format(repr(flag))) raise Exception('Unknown message flag') parts = int.from_bytes(self.io.read(1), 'big') serial = int.from_bytes(self.io.read(4), 'big') logger.debug('Get {} message with serial {}'.format(Flag._value2member_map_[flag].name, serial)) if flag == Flag.PING.value: self.settle = True if parts == 0: self.command_response[serial] = None continue # one more byte in commands from client if flag == Flag.COMMAND.value: _ = self.io.read(1) result = [] for i in range(parts): # skip Command XXXX which length is only 1 byte c = 1 if i <= 1 and flag != Flag.RESPONSE.value else 4 length = int.from_bytes(self.io.read(c), 'big') data = self.io.read(length) result.append(data) self.command_response[serial] = result if len(result) == 1: ret = json.loads(result[0]) if 'volume' in ret: self.volume = ret['volume'] else: logger.info(ret) continue command = '{}.{}'.format(result[0].decode(), result[1].decode()) logger.debug('Received {}'.format(command)) arguments = [] if len(result) >= 2: for arg in result[2:]: arguments.append(json.loads(arg)) logger.debug('argument: {}'.format(arg.decode())) if command == 'Command.OnProgress': self.position = arguments[0]['position'] elif command == 'Command.SpeedChanged': self.speed = arguments[0]['currSpeed'] self.supported_speeds = arguments[0]['supportSpeedList'] logger.debug('Current speed: {}, supported speeds: {}'.format( arguments[0]['currSpeed'], ' / '.join(map(str, arguments[0]['supportSpeedList']))) ) elif command == 'Command.OnDanmakuSwitch': self.danmaku = arguments[0]['open'] logger.debug('Danmaku switch is {}'.format(arguments[0]['open'])) elif command == 'Command.PLAY_SUCCESS': self.playing = True logger.debug('Start playing video command successfully') elif command == 'Command.OnPlayState': state = arguments[0]['playState'] if state == 5: logger.debug('Video playing paused') elif state == 4: logger.debug('Video playing resumed') elif state == 7: logger.debug('Video playing stopped') elif command == 'Command.OnQnSwitch': text = [] self.quality = arguments[0]['curQn'] self.supported_qualities = arguments[0]['supportQnList'] elif command == 'Command.OnEpisodeSwitch': self.info = arguments[0] logger.debug('Playing video {} (av{})'.format(self.info['title'], self.info['playItem']['aid'])) elif command == 'Command.Error': logger.warning('Error: {}'.format(arguments[0]['errorCode'])) else: logger.debug('Unimplemented command {}'.format(command)) def discover(self): logger.info('Starting discovering Bilibili TV ...') sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(('239.255.255.250', 1900)) mreq = struct.pack("4sl", socket.inet_aton('239.255.255.250'), socket.INADDR_ANY) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) count = 0 while True: data = sock.recv(1024).decode() for line in data.split('\n'): line = line.strip() if line.lower().startswith('location'): link = line.split(':', 1)[1].strip() resp = requests.get(link).text # bilibili if 'ZVT9255BGA' in resp: name = re.findall('<friendlyName>(.*)</friendlyName>', resp) name = 'Unknown' if not name else name[0] udn = re.findall('<UDN>uuid:(.*)</UDN>', resp)[0] if udn not in self.discovered: self.discovered.append(udn) addr = (urlparse(link).hostname, urlparse(link).port) logger.info('Found Bilibili TV: {}:{} ({})'.format(*addr, name)) break time.sleep(1) count += 1 if count == 5: logger.info('Discover finished') break def setup(self, addr=None): if self.target is None and addr is None: logger.error('Target address not be set') return self.target = addr if self.settle: logger.info('Already settle') return logger.info('Initialize connection ...') self.io = zio((self.target), print_write=False, print_read=False) self.io.write(SETUP_MSG) self.io.read_until('\r\n\r\n') self.dispatcher_thread = threading.Thread(target=self.dispatch) self.dispatcher_thread.start() c = 0 while not self.settle: self.ping() if c >= 3: logger.error('setup failed for no response') return c += 1 time.sleep(1) self.get_volume() logger.info('Setup TV connection successfully') @check_settle(playing=False) def set_volume(self, volume): logger.info('Set the volume to {}'.format(volume)) if not volume.isdigit(): return self.volume = int(volume) _, msg = self.make_command('SetVolume', {'volume': self.volume}) self.io.write(msg) @check_settle(playing=False) def get_volume(self): serial, msg = self.make_command('GetVolume') self.io.write(msg) while serial in self.command_response: return self.volume @check_settle(playing=False) def play(self, aid=0, biz_id=0, room_id=0, epid=0, season_id=0, cid=0): self.speed = 1 self.quality = None self.supported_speeds = None self.supported_qualities = None self.info = None if epid == 0 and season_id == 0: epid, season_id = get_epid_and_ssid(aid) content_type = 0 if not epid else 1 data = { 'biz_id': biz_id, 'roomId': room_id, 'oid': aid, 'aid': aid, 'epId': epid, 'cid': cid, 'seasonId': season_id, 'type': 0, 'contentType': content_type, 'autoNext': 'true', 'userDesireQn': '112', 'accessKey': self.access_key } if self.access_key is None: logger.warning('access_key is not be set, cannot play high quality videos, try login') logger.info('Sending play video command ...') serial, msg = self.make_command('Play', data) self.io.write(msg) count = 0 while True: if self.info is not None: logger.info('Playing video: {}(av{})'.format(self.info['title'], self.info['playItem']['aid'])) text = '' current = '' for k in self.info['qnDesc']['supportQnList']: if self.info['qnDesc']['curQn'] == k['quality']: current = k['description'] text += '{}({}) / '.format(k['description'], k['quality']) logger.info('Current quality: {}'.format(current)) logger.info('Supported quality: {}'.format(text[:-2])) break if count == 3: logger.error('Play video failed, please check the video ID') break time.sleep(1) count += 1 @check_settle() def stop(self): logger.info('Stop playing video ...') self.playing = False _, msg = self.make_command('Stop') self.io.write(msg) self.dispatcher_thread.not_stop = False @check_settle() def pause(self): logger.info('Pause video playing ...') _, msg = self.make_command('Pause') self.io.write(msg) @check_settle() def resume(self): logger.info('Resume video playing ...') _, msg = self.make_command('Resume') self.io.write(msg) @check_settle() def seek(self, t): logger.info('Seek to {} sec ...'.format(t)) _, msg = self.make_command('Seek', {'seekTs': t}) self.io.write(msg) def set_seek(self, position): self.seek(position) def get_seek(self): return self.position @check_settle() def set_speed(self, speed): logger.info('Set speed to {} ...'.format(speed)) if float(speed) not in self.supported_speeds: logger.warning('Unsupported speed specified') return _, msg = self.make_command('SwitchSpeed', {'speed': speed}) self.io.write(msg) @check_settle() def get_speed(self): return self.speed @check_settle() def toggle_danmaku(self): self.danmaku = not self.danmaku logger.info('Toggle danmaku to {} ...'.format(self.danmaku)) _, msg = self.make_command('SwitchDanmaku', {'open': self.danmaku}) self.io.write(msg) @check_settle() def set_quality(self, quality): if not self.playing: return if int(quality) not in map(lambda k: k['quality'], self.supported_qualities): logger.warning('Unsupported quality specified') return logger.info('Set quality to {} ...'.format(quality)) _, msg = self.make_command('SwitchQn', {'qn': quality}) self.io.write(msg) @check_settle() def get_quality(self): return self.quality def parse_command(self, command): args = command[1:] command = command[0].lower() if command in ('quit', 'exit'): if self.dispatcher_thread: self.dispatcher_thread.not_stop = False raise SystemExit elif command in ('pause', 'resume', 'result', 'danmaku', 'stop', 'discover', 'login'): if command == 'danmaku': command = 'toggle_danmaku' self.__getattribute__(command)() elif command in ('quality', 'speed', 'volume', 'seek'): if len(args) == 0: ret = self.__getattribute__('get_{}'.format(command))() if ret: print(ret) else: self.__getattribute__('set_{}'.format(command))(*args) elif command == 'play': if not args: print('Usage: play [avXXXXXXX]') return if args[0].lower().startswith('bv'): av = bv_decode(args[0]) elif args[0].startswith('av'): av = args[0][2:] else: av = args[0] self.play(aid=av) elif command == 'setup': if len(args) < 2: print('Usage: setup [host] [port]') return self.setup((args[0], int(args[1]))) elif command == 'debug': logger.setLevel(logging.DEBUG) def interactive(self): while True: try: command = input('📺 >>> ').split(' ') except (EOFError, KeyboardInterrupt): self.dispatcher_thread.not_stop = False break self.parse_command(command) def login(self): def get_sign(data): s = '' for k, v in data.items(): s += '{}={}&'.format(k, v) s = s[:-1] s += '59b43e04ad6965f34319062b478f83dd' return hashlib.md5(s.encode()).hexdigest() base = 'https://passport.bilibili.com/x/passport-tv-login/qrcode/' url = base + 'auth_code' data = { 'appkey': '4409e2ce8ffd12b8', 'local_id': 0, 'ts': 0, } data['sign'] = get_sign(data) ret = requests.post(url, data=data).json() qrcode = qrencode_ascii.encode(ret['data']['url']) logger.info('Please scan qrcode and scan via Bilibili app') print(qrcode) url = base + '/poll' data = { 'appkey': '4409e2ce8ffd12b8', 'auth_code': ret['data']['auth_code'], 'local_id': 0, 'ts': 0, } data['sign'] = get_sign(data) while True: ret = requests.post(url, data=data).json() if ret['code'] == 0: self.access_key = ret['data']['access_token'] logger.info('Login successfully with user {}'.format(ret['data']['mid'])) break time.sleep(1) if __name__ == '__main__': logger.setLevel(logging.INFO) handler = logging.StreamHandler() handler.setFormatter(ColorFormatter()) logger.addHandler(handler) b = BilibiliTV() b.setup(('192.168.1.3', 9958)) b.interactive()